返回

Spring Cloud Alibaba——Nacos AP一致性策略 Distro

发布时间:2023-02-05 22:33:19 292
# java# spring# java# 数据# 信息

Eureka 一致性策略

Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存

@Singleton
public class DiscoveryClient implements EurekaClient {

private boolean fetchRegistryFromBackup() {
        try {
            @SuppressWarnings("deprecation")
            BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
            if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
                backupRegistryInstance = backupRegistryProvider.get();
            }

            if (null != backupRegistryInstance) {
                Applications apps = null;
                if (isFetchingRemoteRegionRegistries()) {
                    String remoteRegionsStr = remoteRegionsToFetch.get();
                    if (null != remoteRegionsStr) {
                        apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                    }
                } else {
                    apps = backupRegistryInstance.fetchRegistry();
                }
                if (apps != null) {
                    final Applications applications = this.filterAndShuffle(apps);
                    applications.setAppsHashCode(applications.getReconcileHashCode());
                    localRegionApps.set(applications);
                    logTotalInstances();
                    logger.info("Fetched registry successfully from the backup");
                    return true;
                }
            } else {
                logger.warn("No backup registry instance defined & unable to find any discovery servers.");
            }
        } catch (Throwable e) {
            logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
        }
        return false;
    }
}

正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)

Nacos AP一致性策略——Distro

Nacos在AP模式下的一致性策略就类似于Eureka,采用Server之间互相的数据同步来实现数据在集群中的同步、复制操作。

触发数据广播

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {

    private final GlobalConfig globalConfig;
    
    private final DistroProtocol distroProtocol;

    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                globalConfig.getTaskDispatchPeriod() / 2);
    }
}

当服务注册和注销实例,即InstanceController的register和deregister方法被调用时,ServiceManager类中addInstance或removeInstance方法调用ConsistencyService接口定义的put、remove方法时,涉及到了Server端数据的变更,此时会创建一个任务DistroDelayTask,将数据的key封装到DistroDelayTask中。

@Component
public class DistroProtocol {

    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        for (Member each : memberManager.allMembersWithoutSelf()) {
            //遍历所有服务ip。进行数据同步
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                    each.getAddress());
            //创建延迟任务
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            //其实也是一个定时任务
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }
}

然后将DistroDelayTask提交到Nacos的延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中存储,并且NacosDelayTaskExecuteEngine在其构造方法中初始化ScheduledExecutorService线程池并提交一个ProcessRunnable任务去取出tasks中的AbstractDelayTask任务进行处理。

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine {
    
    private final ScheduledExecutorService processingExecutor;
    
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
    
    protected final ReentrantLock lock = new ReentrantLock();
	
	//NacosDelayTaskExecuteEngine实例化
    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        //初始化tasks
        tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
        //创建定时任务线程池
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        //每间隔一段时间定时执行任务ProcessRunnable
        processingExecutor
                .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }   

    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
			//存在相同任务则合并
            if (null != existTask) {
                newTask.merge(existTask);
            }
			//将AbstractDelayTask存入tasks中
            tasks.put(key, newTask);
        } finally {
            lock.unlock();
        }
    }

ProcessRunnable线程处理存入延时任务执行引擎NacosDelayTaskExecuteEngine中的ConcurrentHashMap<Object, AbstractDelayTask> tasks中的任务

public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    
    private final ScheduledExecutorService processingExecutor;
    
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
	
    /**
     * process tasks in execute engine.
     */
    protected void processTasks() {
        //获取tasks中的所有key
        Collection<Object> keys = getAllTaskKeys();
        //遍历任务key集合
        for (Object taskKey : keys) {
            //删除tasks中的AbstractDelayTask任务
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //获取NacosTaskProcessor,这里实质获取的是DistroDelayTaskProcessor
            //在DistroHttpRegistry中的doRegister方法中提前设置进去了DistroHttpDelayTaskProcessor
            //但是taskKey最为key会取不到DistroHttpDelayTaskProcessor
            //而DistroTaskEngineHolder类的构造函数中设置了DistroDelayTaskExecuteEngine类
            //中的DefaultTaskProcessor为DistroDelayTaskProcessor
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                //DistroHttpDelayTaskProcessor执行AbstractDelayTask任务
                if (!processor.process(task)) {
                    //执行失败,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                //执行异常,更新最后执行时间,并将该任务重新设置进tasks中,便于后续继续执行
                retryFailedTask(taskKey, task);
            }
        }
    }

    private void retryFailedTask(Object key, AbstractDelayTask task) {
        //更新最后执行时间
        task.setLastProcessTime(System.currentTimeMillis());
        //并将该任务重新设置进tasks中,便于后续继续执行
        addTask(key, task);
    }
	
    private class ProcessRunnable implements Runnable {
        
        @Override
        public void run() {
            try {
				//处理任务
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }
}
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线