官网架构
总流程
官方文档中并未着重介绍动态刷新的原理,结合@RefreshScope和ContextRefresher来引出HostReactor 为何要看源码 1 2 3 4 5 6 提升技术功底:学习源码里的优秀设计思想,比如一些疑难问题的解决思路,还有一些优秀的设计模式,整体提升自己的技术功底 深度掌握技术框架:源码看多了,对于一个新技术或框架的掌握速度会有大幅提升,看下框架demo大致就能知道底层的实现,技术框架更新再快也不怕 快速定位线上问题:遇到线上问题,特别是框架源码里的问题(比如bug),能够快速定位,这就是相比其他没看过源码的人的优势 对面试大有裨益:面试一线互联网公司对于框架技术一般都会问到源码级别的实现 知其然知其所以然:对技术有追求的人必做之事,使用了一个好的框架,很想知道底层是如何实现的 拥抱开源社区:参与到开源项目的研发,结识更多大牛,积累更多优质人脉
一、入口分析 服务注册与发现@EnableDiscoveryClient
1 2 3 4 5 6 7 8 9 10 11 12 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { boolean autoRegister () default true ; }
在调用autoRegister()方法时默认返回true,在本接口中还引入了ImportSelector类即@Import(EnableDiscoveryClientImportSelector.class),其中selectImports调用如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public String[] selectImports(AnnotationMetadata metadata) { String[] imports = super.selectImports(metadata); AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true )); / / 获取本类的autoRegister属性 boolean autoRegister = attributes.getBoolean("autoRegister"); if (autoRegister) { List< String> importsList = new ArrayList<> (Arrays.asList(imports)); importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0 ]); } else { Environment env = getEnvironment(); if(ConfigurableEnvironment.class.isInstance(env)) { ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env; LinkedHashMap< String, Object> map = new LinkedHashMap<> (); / / 注入yml文件属性 map.put("spring.cloud.service-registry.auto-registration.enabled", false ); MapPropertySource propertySource = new MapPropertySource( "springCloudDiscoveryClient", map); configEnv.getPropertySources().addLast(propertySource); } } return imports; }
1 2 3 4 5 6 7 8 9 10 11 12 13 org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\ com.alibaba.cloud.nacos.loadbalancer.LoadBalancerNacosAutoConfiguration,\ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration org.springframework.context.ApplicationListener=\ com.alibaba.cloud.nacos.discovery.logging.NacosLoggingListener
NacosDiscoveryAutoConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration(proxyBeanMethods = false) @ConditionalOnDiscoveryEnabled @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryAutoConfiguration { @Bean @ConditionalOnMissingBean public NacosDiscoveryProperties nacosProperties () { return new NacosDiscoveryProperties(); } @Bean @ConditionalOnMissingBean public NacosServiceDiscovery nacosServiceDiscovery ( NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) { return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager); } }
三、服务自动注册 Spring Cloud 有 Euerka、ZK、Nacos 等多种注册中心的实现,想要达到实现统一必须有一套规范,而Spring Cloud Commons 就是定义了这一规范。 Spring Cloud Commons里面的org.springframework.cloud.client.serviceregistry包下面有 AutoServiceRegistration、Registration、ServiceRegistry这三个接口,这是服务注册的核心接口 其中Registration继承了ServiceInstance,在ServiceInstance中规范了一个服务实例属性 1、AutoServiceRegistration用于服务自动注册。自动注册的意思就是,服务启动后自动把服务信息注册到注册中心。它的存在就是要规范实现必须要有自动注册.
2、Registration存储服务信息,用于规范将什么信息注册到注册中心,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public interface Registration extends ServiceInstance {} public interface ServiceInstance { default String getInstanceId () { return null ; } String getServiceId () ; String getHost () ; int getPort () ; 是否使用HTTPS boolean isSecure () ; URI getUri () ; Map<String, String> getMetadata () ; 注册服务的约束 default String getScheme () { return null ; } }
3、ServiceRegistry用来注册或者注销服务,下线服务以及心跳检测等都可在此循迹
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public interface ServiceRegistry <R extends Registration > { void register (R registration) ; void deregister (R registration) ; void close () ; void setStatus (R registration, String status) ; <T> T getStatus (R registration) ; }
三、Nacos实现 NacosRegistration 1 public class NacosRegistration implements Registration , ServiceInstance
NacosServiceRegistry 1 public class NacosServiceRegistry implements ServiceRegistry <Registration >
将配置文件封装为Instance实例,调用namingService.registerInstance(serviceId, instance)方法将服务注册到注册中心,在2.1.1当中则是通过NacosServiceManager来管理NameService的
1 2 3 4 5 6 7 8 try { Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties(); nacosServiceManager.getNamingMaintainService(nacosProperties).updateInstance( serviceId, nacosDiscoveryProperties.getGroup(), instance); } catch (Exception e) { throw new RuntimeException("update nacos instance status fail" , e); }
NacosAutoServiceRegistration 1 2 public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> -> public abstract class AbstractAutoServiceRegistration<R extends Registration>implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
通过间接实现ApplicationListener在服务启动时调用onApplicationEvent将服务注册到服务中心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @FunctionalInterface public interface ApplicationListener <E extends ApplicationEvent > extends EventListener { void onApplicationEvent (E event) ; static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) { return (event) -> { consumer.accept(event.getPayload()); }; } } @Override @SuppressWarnings("deprecation") public void onApplicationEvent (WebServerInitializedEvent event) { bind(event); } public void bind (WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management" .equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return ; } } this .port.compareAndSet(0 , event.getWebServer().getPort()); this .start(); } public void start () { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting" ); } return ; } if (!this .running.get()) { this .context.publishEvent(new InstancePreRegisteredEvent(this , getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this .context.publishEvent(new InstanceRegisteredEvent<>(this , getConfiguration())); this .running.compareAndSet(false , true ); } }
publishiEvent则是在AbstractApplicationContext中调用的
1 2 3 4 5 监听器在消息源之后初始化,以便能够以在监听器实现中访问它。因此消息源实现不能发布事件。 @Override public void publishEvent (Object event) { publishEvent(event, null ); }
客户端如何向服务端注册 com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 public void register (Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..." ); return ; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished" , group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { if (nacosDiscoveryProperties.isFailFast()) { log.error("nacos registry, {} register failed...{}," , serviceId, registration.toString(), e); rethrowRuntimeException(e); } else { log.warn("Failfast is false. {} register failed...{}," , serviceId, registration.toString(), e); } } } public void registerInstance (String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); } public void registerService (String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}" , namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16 ); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip" , instance.getIp()); params.put("port" , String.valueOf(instance.getPort())); params.put("weight" , String.valueOf(instance.getWeight())); params.put("enable" , String.valueOf(instance.isEnabled())); params.put("healthy" , String.valueOf(instance.isHealthy())); params.put("ephemeral" , String.valueOf(instance.isEphemeral())); params.put("metadata" , JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi (String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(NacosException.INVALID_PARAM, "no server available" ); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0 ; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed." , nacosDomain, e); } } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0 ; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed." , server, e); } } index = (index + 1 ) % servers.size(); } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}" , api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); } HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); @CanDistro @PostMapping @Secured(action = ActionTypes.WRITE) public String register (HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build(); getInstanceOperator().registerInstance(namespaceId, serviceName, instance); NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "" , false , namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName), instance.getIp(), instance.getPort())); return "ok" ; } @Override public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); boolean ephemeral = instance.isEphemeral(); String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral); createIpPortClientIfAbsent(clientId); Service service = getService(namespaceId, serviceName, ephemeral); clientOperationService.registerInstance(service, instance, clientId); }
在NacosServiceRegistry实例化的时候实例化nacosServiceManager,然后调用nacosServiceManager的getNamingService方法,构造实例化参数后
把服务放入serviceMap服务注册表中
初始化服务,创建一个健康检查的任务(主线–健康检查代码)
向一个队列中添加一个监听器(RecordListener类型)当监听到某些事件时会执行里面的onChange方法,Nacos大量运用了观察者设计模式,比如实例的注册、剔除等会被抽象成一个个的任务放到一个阻塞队列中,当监听到有任务时进来时,监听器会处理这些任务,执行onChange方法
com.alibaba.nacos.naming.consistency.RecordListener#onChange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void onChange (String key, Service service) throws Exception { try { if (service == null ) { Loggers.SRV_LOG.warn("received empty push from raft, key: {}" , key); return ; } if (StringUtils.isBlank(service.getNamespaceId())) { service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID); } Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}" , key, service); Service oldDom = getService(service.getNamespaceId(), service.getName()); if (oldDom != null ) { oldDom.update(service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), oldDom); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), oldDom); } else { putServiceAndInit(service); } } catch (Throwable e) { Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update" , e); } }
addInstance 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
consistencyService AP clientOperationService.registerInstance(service, instance, clientId)中调用了ServiceManager
CP
Service:主要关心实例的变动
ServiceManager:主要关心服务的变动
SwitchManager:主要关心AP模型、CP模型的切换
重要切换AP,CP命令 1 curl -X PUT '$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'
Nacos 2.0源码分析-健康检查机制 Nacos支持众多健康检查类型,心跳、HTTP、TCP、MySQL等类型 验证来自com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckType
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 TCP(Tcp.class), HTTP(Http.class), MYSQL(Mysql.class), NONE(AbstractHealthChecker.None.class);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask (BeatInfo beatInfo) { this .beatInfo = beatInfo; } @Override public void run () { if (beatInfo.isStopped()) { return ; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this .lightBeatEnabled); long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong(); boolean lightBeatEnabled = false ; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this .lightBeatEnabled = lightBeatEnabled; if (interval > 0 ) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true ); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}" , JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } catch (Exception unknownEx) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}" , JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx); } finally { executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } } public JsonNode sendBeat (BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}" , namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<>(16 ); Map<String, String> bodyMap = new HashMap<>(2 ); if (!lightBeatEnabled) { bodyMap.put("beat" , JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put(IP_PARAM, beatInfo.getIp()); params.put(PORT_PARAM, String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat" , params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
服务端处理心跳续约 com.alibaba.nacos.naming.controllers.InstanceController#beat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public ObjectNode beat (HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat" , StringUtils.EMPTY); RsInfo clientBeat = null ; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip" , StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port" , "0" )); if (clientBeat != null ) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}" , clientBeat, serviceName, namespaceId); BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder(); builder.setRequest(request); int resultCode = getInstanceOperator() .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder); result.put(CommonParams.CODE, resultCode); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
在2.1.1当中getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);封装了对Service的处理包含开启续约,更新客户端心跳时间 分为客户端和服务端 对于服务端我们看下service#handleBeat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override public int handleBeat (String namespaceId, String serviceName, String ip, int port, String cluster, RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException { com.alibaba.nacos.naming.core.Instance instance = serviceManager .getInstance(namespaceId, serviceName, cluster, ip, port); if (instance == null ) { if (clientBeat == null ) { return NamingResponseCode.RESOURCE_NOT_FOUND; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}" , clientBeat, serviceName); instance = parseInstance(builder.setBeatInfo(clientBeat).setServiceName(serviceName).build()); serviceManager.registerInstance(namespaceId, serviceName, instance); } Service service = serviceManager.getService(namespaceId, serviceName); serviceManager.checkServiceIsNull(service, namespaceId, serviceName); if (clientBeat == null ) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(cluster); } service.processClientBeat(clientBeat); return NamingResponseCode.OK; } public static ScheduledFuture<?> scheduleNow(Runnable task) { return GlobalExecutor.scheduleNamingHealth(task, 0 , TimeUnit.MILLISECONDS); } 将心跳时间设置为当前时间
健康检查 com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void putServiceAndInit (Service service) throws NacosException { putService(service); service = getService(service.getNamespaceId(), service.getName()); service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true ), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false ), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}" , service.toJson()); } public void init () { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this ); entry.getValue().init(); } }
健康检查任务 ClientBeatCheckTask 1 2 3 4 5 6 7 8 9 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) getInstanceHeartBeatTimeOut 15秒 public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30); public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Override public void run () { try { if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { return ; } if (!getDistroMapper().responsible(service.getName())) { return ; } if (!getSwitchDomain().isHealthCheckEnabled()) { return ; } List<Instance> instances = service.allIPs(true ); for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false ); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}" , instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); } } } } if (!getGlobalConfig().isExpireInstance()) { return ; } for (Instance instance : instances) { if (instance.isMarked()) { continue ; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}" , service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out." , e); } }
总结:Zookeeper中在进行服务注册的时候,发起一个长连接,比如用Nio或者Netty,会一直占用管道,而Nacos只是发起一个http请求,发起请求后就结束了,Nacos在1.4.x版本中是典型的短链接(当然2.0后改用gRpc长连接),而Zk采用长连接方式建立通道,如果在客户端服务器非常非常多,会比较耗性能的,Nacos相比会轻量不少哦,Zk为了保证服务变动的一致性,监听回调机制就会立刻通知到客户端,响应是很及时的,所以Zk保证了Cp,Nacos有两块保证了心跳,一块是客户端的定时拉取,一块是udp反向推送,即便udp丢失了,也有定时任务兜底。
TcpHealthCheckProcessor com.alibaba.nacos.naming.healthcheck.v2.processor.TcpHealthCheckProcessor
1 2 3 4 5 6 7 8 9 10 11 public class TcpSuperSenseProcessor implements HealthCheckProcessor , Runnable { public static final String TYPE = "TCP" ; @Autowired private HealthCheckCommon healthCheckCommon; @Autowired private SwitchDomain switchDomain; }
除了定义类型之外都会有的共同属性healthCheckCommon和switchDomain,其实现实在内部类的Beat.finishCheck方法中 判断是否健康
1 2 3 public boolean isHealthy () { return System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(30L ); }
这里checkok的方法也是调用Distro协议来判断当前ip在集群中的服务是否需要处理,当需要时返回true,此时就会将ip设置为健康,并将服务的最后一次修改时间设置为当前时间,服务监听器推送当前服务
1 2 3 NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(), service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(), ip.getPort(), true , msg));
Nacos一致性协议 分布式一致性协议有很多,例如Paxos协议,Zab协议,Raft协议,而Nacos采用的是Distro协议和Raft协议。对于非临时数据,Nacos采用的是Raft协议,而临时数据Nacos采用的是Distro协议。简单说一下Distro,Distro协议被定位为临时数据的一致性协议:该类型协议不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话,该会话只要存在,数据就不会丢失
Distro协议 Distro 协议是 Nacos 对于临时实例数据开发的一致性协议。其数据存储在缓存中,并且会在启动时进行全量数据同步,并定期进行数据校验。
在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请求场景主要分为三种情况:
1、当该节点接收到属于该节点负责的实例的写请求时,直接写入。
2、当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。
3、当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。
Distro 协议作为 Nacos 的内嵌临时实例一致性协议,保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和一致性。
Nacos中有CP和AP两种模式,而这两种模式在实现数据一致性方案上面是完全不一样的,对于CP模式而言,使用的是raft这种强一致性协议,对于AP模式而言,则是使用阿里自创的Distro协议 在上文中对服务的注册中,调用instanceController当中的register方法时实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @CanDistro @PostMapping @Secured(action = ActionTypes.WRITE) public String register (HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build(); getInstanceOperator().registerInstance(namespaceId, serviceName, instance); NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "" , false , namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName), instance.getIp(), instance.getPort())); return "ok" ; } @Override public void registerInstance (String namespaceId, String serviceName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); boolean ephemeral = instance.isEphemeral(); String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral); createIpPortClientIfAbsent(clientId); Service service = getService(namespaceId, serviceName, ephemeral); clientOperationService.registerInstance(service, instance, clientId); }
其中CanDistro则是跟Distro协议有关 来自nacos2.1.1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Override public void doFilter (ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { ReuseHttpServletRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest); HttpServletResponse resp = (HttpServletResponse) servletResponse; String urlString = req.getRequestURI(); if (StringUtils.isNotBlank(req.getQueryString())) { urlString += "?" + req.getQueryString(); } try { Method method = controllerMethodsCache.getMethod(req); String path = new URI(req.getRequestURI()).getPath(); if (method == null ) { throw new NoSuchMethodException(req.getMethod() + " " + path); } if (!method.isAnnotationPresent(CanDistro.class)) { filterChain.doFilter(req, resp); return ; } String distroTag = distroTagGenerator.getResponsibleTag(req); if (distroMapper.responsible(distroTag)) { filterChain.doFilter(req, resp); return ; } String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER); if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) { Loggers.SRV_LOG.error("receive invalid redirect request from peer {}" , req.getRemoteAddr()); resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "receive invalid redirect request from peer " + req.getRemoteAddr()); return ; } final String targetServer = distroMapper.mapSrv(distroTag); List<String> headerList = new ArrayList<>(16 ); Enumeration<String> headers = req.getHeaderNames(); while (headers.hasMoreElements()) { String headerName = headers.nextElement(); headerList.add(headerName); headerList.add(req.getHeader(headerName)); } final String body = IoUtils.toString(req.getInputStream(), StandardCharsets.UTF_8.name()); final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap()); RestResult<String> result = HttpClient .request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(), req.getMethod()); String data = result.ok() ? result.getData() : result.getMessage(); try { WebUtils.response(resp, data, result.getCode()); } catch (Exception ignore) { Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString); } } catch (AccessControlException e) { resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e)); } catch (NoSuchMethodException e) { resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, "no such api:" + req.getMethod() + ":" + req.getRequestURI()); } catch (Exception e) { Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: " , e); resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server failed, " + ExceptionUtil.getAllExceptionMsg(e)); } }
DistroFilter这个类是实现了Filter接口,所以表明了它是一个过滤器,在请求来的时候,会经过doFilter方法,在doFilter方法中大概有下面4个过程:
1 2 3 4 5 6 7 1. 根据请求路径从controllerMethodsCache中获取到对应的controller方法2. 判断这个controller方法是否有@CanDistro 注解,如果有的话再调用distroMapper.responsible()方法去判断当前nacos节点是否需要处理这个请求3. 如果controller方法没有@CanDistro 注解,或者有@CanDistro 注解并且当前nacos节点需要处理这个请求,那么就直接放行这个请求到controller端4. 反之如果controller方法有@CanDistro 注解并且当前nacos节点不需要处理这个请求,那么就会把这个请求转发到对应的其他节点去处理
其中第一点中从controllerMethodsCache中获取对应的controller方法,那么是怎么获取的呢?所以我们要看下ControllerMethodsCache中的getMethod方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private static final Logger LOGGER = LoggerFactory.getLogger(ControllerMethodsCache.class ); private ConcurrentMap<RequestMappingInfo, Method> methods = new ConcurrentHashMap<>(); private final ConcurrentMap<String, List <RequestMappingInfo>> urlLookup = new ConcurrentHashMap<>(); public Method getMethod(HttpServletRequest request) { String path = getPath(request); String httpMethod = request.getMethod(); String urlKey = httpMethod + REQUEST_PATH_SEPARATOR + path.replaceFirst(EnvUtil.getContextPath(), "" ); List <RequestMappingInfo> requestMappingInfos = urlLookup.get(urlKey); if (CollectionUtils.isEmpty(requestMappingInfos)) { return null; } List <RequestMappingInfo> matchedInfo = findMatchedInfo(requestMappingInfos, request); if (CollectionUtils.isEmpty(matchedInfo)) { return null; } RequestMappingInfo bestMatch = matchedInfo.get(0 ); if (matchedInfo.size() > 1 ) { RequestMappingInfoComparator comparator = new RequestMappingInfoComparator(); matchedInfo.sort(comparator); bestMatch = matchedInfo.get(0 ); RequestMappingInfo secondBestMatch = matchedInfo.get(1 ); if (comparator.compare(bestMatch, secondBestMatch) == 0 ) { throw new IllegalStateException( "Ambiguous methods mapped for '" + request.getRequestURI() + "': {" + bestMatch + ", " + secondBestMatch + "}" ); } } return methods.get(bestMatch); }
调用链如下 可以看到源头是在一个叫ConsoleConfig的类的init方法开始的,并且这个init方法加了@PostConstruct注解,表示在spring容器启动的时候就能够被调用该方法,在init方法中,调用了4次ControllerMethodsCache的initClassMethod方法,分别传了不同的包名,在initClassMethod方法中会根据传入的包名然后找到加了@RequestMapping注解的类,然后寻找每一个类中加了@RequestMapping注解的方法,然后构造出一个RequestMappingInfo对象,其中给这个RequestMappingInfo对象设置两个校验,一个是请求路径的校验,一个是请求参数的校验,然后把urlKey和RequestMappingInfo对象放到urlLookup这个map中,再把RequestMappingInfo对象和controller方法放到methods这个map中。所以经过上面的分析,我们可以做一个小总结,在spring容器启动的时候,nacos就会在指定的几个包名下找到所有加了@RequestMapping注解的controller类,然后再找到这些类下面加了@RequestMapping注解的方法,再构造出一个RequestMappingInfo校验对象用来对请求路径和请求参数进行校验匹配,而请求路径的检验是根据@RequestMapping注解指定的请求方式以及请求路径去构造出一个urlKey作为校验匹配的条件,请求参数校验则是根据@RequestMapping注解中的params属性作为检验匹配的条件,最终就会把这个RequestMappingInfo校验对象和对应的controller方法放到methods这个map中了。所以当有请求过来的时候,DistroFilter会进行拦截,首先会根据请求路径构造出urlKey,再根据urlKey找到对应的RequestMappingInfo检验对象,然后使用这个RequestMappingInfo校验对象对这个请求参数进行校验,如果校验不通过则返回null,校验通过则再根据这个RequestMappingInfo对象找到对应的controller方法
Distro弱一致性协议实现原理 String distroTag = distroTagGenerator.getResponsibleTag(req); 通过上面我们知道在DistroFilter中会根据请求找到对应的controller方法,然后会去判断这个controller方法上是否有@CanDistro注解,如果有的话会再判断当前的nacos节点是否需要对这个请求进行处理,而这个判断就是通过distroMapper.responsible()这个方法去判断的,那么这个方法具体是干什么的呢?其实这个方法就是实现distro弱一致性协议的核心,我们看下这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean responsible (String responsibleTag) { final List<String> servers = healthyList; if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) { return true ; } if (CollectionUtils.isEmpty(servers)) { return false ; } String localAddress = EnvUtil.getLocalAddress(); int index = servers.indexOf(localAddress); int lastIndex = servers.lastIndexOf(localAddress); if (lastIndex < 0 || index < 0 ) { return true ; } int target = distroHash(responsibleTag) % servers.size(); return target >= index && target <= lastIndex; }
首先这个方法的作用是判断当前nacos节点是否需要负责处理指定的服务,如果不负责处理就返回true,反之就返回false。在开始的时候会去判断当前是否开启了distro协议,如果没有开启就返回true,以及会去判断这个nacos节点是否是单机模式,如果是单机模式就返回true,也就是说在单机模式下,distro协议是不起作用的,很好理解,因为distro协议就是解决了集群之间数据同步一致性的一种方案,而单机模式也没有所谓的数据同步,自然distro协议是不需要的。然后就是会去获取到当前nacos节点在整个nacos集群中的索引位置,并且对指定的服务名通过distroHash方法获取到一个值,把这个值与整个nacos集群节点数进行取模得到一个target值,如果这个target值是等于当前nacos节点所在集群的索引位置值,那么就返回true,反之就返回false。所以对于每一个服务,它都会通过上面这种方式分配到具体的nacos节点,也就是说每一个nacos节点都会负责一部分的服务,那么这这难道nacos集群是分布式集群吗 ?很显然不是的,虽然说每一个nacos节点只会负责一部分的服务请求,但是nacos之间会进行数据的同步,也就是nacos集群的每一个节点数据是最终一致性的,所以这也就是什么说distro协议是一个弱一致性的协议了。而如果这个服务请求根据distro协议的规则判断之后发现不归当前这个nacos节点负责处理怎么办呢?这时候就需要对这个服务请求进行转发了,此时会通过distro协议的规则重新计算找出负责处理这个服务请求的nacos节点,然后当前nacos节点就把这个请求重转发到指定的nacos节点,这样整个distro协议的实现流程就完成了
Nacos集群选举 Raft协议演示地址:http://thesecretlivesofdata.com/raft/
Raft一致性协议 Nacos支持集群模式,很显然。 而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?
nacos的集群类似于zookeeper,它分为leader角色和follower角色,那么从这个角色的名字可以看出来,这个集群存在选举的机制。 因为如果自己不具备选举功能,角色的命名可能就是master/slave了,当然这只是我基于这么多组件的命名的一个猜测。
Raft协议是一种强一致性、去中心化、高可用的分布式协议,它是用来解决分布式一致性问题的,相对于大名鼎鼎的Paxos协议,Raft协议更容易理解,并且在性能、可靠性、可用性方面是不输于Paxos协议的。许多中间件都是利用Raft协议来保证分布式一致性的,例如Redis的sentinel,CP模式的Nacos的leader选举都是通过Raft协议来实现的。因为Nacos的一致性协议是采用的Raft协议。
在Raft中,节点有三种角色:
Leader:负责接收客户端的请求 Candidate:用于选举Leader的一种角色 Follower:负责响应来自Leader或者Candidate的请求 选举分为两个时间点:
服务启动的时候 leader挂了的时候 所有节点启动的时候,都是follower状态。 如果在一段时间内如果没有收到leader的心跳(可能是没有 leader,也可能是leader挂了),那么follower会变成Candidate。然后发起选举,选举之前,会增加 term,这个term和zookeeper中的epoch的道理是一样的。
follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复
在这个过程中,可能出现几种情况
收到过半的票数通过,则成为leader 被告知其他节点已经成为leader,则自己切换为follower 一段时间内没有收到过半的投票,则重新发起选举 选举的几种情况:
第一种情况,赢得选举之后,leader会给所有节点发送消息,避免其他节点触发新的选举 第二种情况,比如有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一 票,当B的消息到达C时,已经不能满足上面提到的第一个约束,即C不会给B投票,而A和B显然都不会给对方投票。A胜出之后,会给B,C发心跳消息,节点B发现节点A的term不低于自己的term,知道有已经有Leader了,于是转换成follower。 第三种情况,没有任何节点获得majority(超过半数的)投票,可能是平票的情况。加入总共有四个节点 (A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投 了Node C一票,这就出现了平票 split vote的情况。这个时候大家都在等啊等,直到超时后重新发 起选举。如果出现平票的情况,那么就延长了系统不可用的时间,因此raft引入了randomized election timeouts来尽量避免平票情况.
RaftCore初始化 这里有几个核心概念或组件:
1.peer:代表每台nocas机器,记录着一台server的投票相关的元数据信息,比如本机的ip,投票给谁(votefor),AtomicLong类型的term,记录本地服务第几次发起的投票,状体(leader/follower),leader选举间隔时间等。
2.peers:是个RaftPeerSet类型,实际上记录了整个集群所有peer的信息。
3.notifier:一个线程,用作事件通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @DependsOn("ProtocolManager") @Component public class RaftCore { private final ScheduledExecutorService executor = ExecutorFactory.Managed .newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class), new NameThreadFactory("com.alibaba.nacos.naming.raft.notifier" )); @PostConstruct public void init () throws Exception { Loggers.RAFT.info("initializing Raft sub-system" ); executor.submit(notifier); final long start = System.currentTimeMillis(); raftStore.loadDatums(notifier, datums); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term" ), 0L )); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}" , datums.size(), peers.getTerm()); while (true ) { if (notifier.tasks.size() <= 0 ) { break ; } Thread.sleep(1000L ); } initialized = true ; Loggers.RAFT.info("finish to load data from disk, cost: {} ms." , (System.currentTimeMillis() - start)); GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}" , GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); } }
RaftCore.MasterElection 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class MasterElection implements Runnable { @Override public void run () { try { if (!peers.isReady()) { return ; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0 ) { return ; } local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}" , e); } } private void sendVote () { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}" , JacksonUtils.toJson(getLeader()), local.term); peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1 ); params.put("vote" , JacksonUtils.toJson(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null , params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted (Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .error("NACOS-RAFT vote failed: {}, url: {}" , response.getResponseBody(), url); return 1 ; } RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}" , JacksonUtils.toJson(peer)); peers.decideLeader(peer); return 0 ; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}" , server); } } } }
RaftController.vote 收到投票的票据
1 2 3 4 5 6 @PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class)); return JacksonUtils.transferToJsonNode(peer);
RaftCore.receivedVote 这个方法主要就是处理自己的选票的,当收到其他机器拉票的请求的时候,会比较term,如果自身的term大于全程请求机器的term,并且自己的选票没有还没投出去的时候,就把选票投给自己。 否则将选票投给远程请求的机器,并且把自己的状态设置为follower,并且把信息返回出去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public synchronized RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } //获取本机的节点信息 RaftPeer local = peers.get(NetUtils.localServer()); //如果请求的任期小于自己的任期并且还没有投出选票,那么将票投给自己 if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); //如果voteFor为空,表示在此之前没有收到其他节点的票据。则把remote节点的票据设置到自己的节点上 if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } //如果上面if不成立,说明请求的任期>本地的任期 ,remote机器率先发起的投票,那么就认同他的投票 local.resetLeaderDue(); //重置本地机器的选举间隔时间 local.state = RaftPeer.State.FOLLOWER; //设置本机机器为follower,并且为请求过来的机器投票 local.voteFor = remote.ip;//本地机器投票给remote的机器 local.term.set(remote.term.get());;//同步remote的term Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }
decideLeader decideLeader,表示用来决策谁能成为leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public RaftPeer decideLeader (RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0 ; String maxApprovePeer = null ; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue ; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this , leader, local())); Loggers.RAFT.info("{} has become the LEADER" , leader.ip); } } return leader; }
数据同步
addInstance 比如我们在注册服务时,调用addInstance之后,最后会调用 consistencyService.put(key, instances); 这个方法,来实现数据一致性的同步。
InstanceController.register—->registerInstance—–>addInstance——>consistencyService.put(key, instances);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void addInstance (String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
RaftConsistencyServiceImpl.put 调用 consistencyService.put 用来发布类容,也就是实现数据的一致性同步。
1 2 3 4 5 6 7 8 9 10 @Override public void put (String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed." , e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } }
RaftCore.signalPublish public static final Lock OPERATE_LOCK = new ReentrantLock();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public void signalPublish (String key, Record value) throws Exception { if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key" , key); params.replace("value" , JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1 ); parameters.put("key" , key); final RaftPeer leader = getLeader(); raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return ; } try { OPERATE_LOCK.lock(); final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null ) { datum.timestamp.set(1L ); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum" , JacksonUtils.transferToJsonNode(datum)); json.replace("source" , JacksonUtils.transferToJsonNode(peers.local())); onPublish(datum, peers.local()); final String content = json.toString(); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue ; } final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted (Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}" , datum.key, server, response.getStatusCode()); return 1 ; } latch.countDown(); return 0 ; } @Override public STATE onContentWriteCompleted () { return STATE.CONTINUE; } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}" , key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}" , (end - start), key); } finally { OPERATE_LOCK.unlock(); } }
onPublish 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<>();public void onPublish (Datum datum, RaftPeer source) throws Exception { RaftPeer local = peers.local(); if (datum.value == null ) { Loggers.RAFT.warn("received empty datum" ); throw new IllegalStateException("received empty datum" ); } if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}" , JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader" ); } if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}" , JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } local.resetLeaderDue(); if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); notifier.addTask(datum.key, ApplyAction.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}" , datum.key, local.term); }
我们看其他节点在接受到leader请求时是如何处理的,我们查看/v1/ns/raft/datum/commit接口的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @PostMapping("/datum/commit") public String onPublish (HttpServletRequest request, HttpServletResponse response) throws Exception { response.setHeader("Content-Type" , "application/json; charset=" + getAcceptEncoding(request)); response.setHeader("Cache-Control" , "no-cache" ); response.setHeader("Content-Encode" , "gzip" ); String entity = IoUtils.toString(request.getInputStream(), "UTF-8" ); String value = URLDecoder.decode(entity, "UTF-8" ); JsonNode jsonObject = JacksonUtils.toObj(value); String key = "key" ; RaftPeer source = JacksonUtils.toObj(jsonObject.get("source" ).toString(), RaftPeer.class); JsonNode datumJson = jsonObject.get("datum" ); Datum datum = null ; if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) { datum = JacksonUtils.toObj(jsonObject.get("datum" ).toString(), new TypeReference<Datum<Instances>>() { }); } else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) { datum = JacksonUtils.toObj(jsonObject.get("datum" ).toString(), new TypeReference<Datum<SwitchDomain>>() { }); } else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) { datum = JacksonUtils.toObj(jsonObject.get("datum" ).toString(), new TypeReference<Datum<Service>>() { }); } raftConsistencyService.onPut(datum, source); return "ok" ; }
主要的核心在于 raftConsistencyService.onPut(datum, source);我们进入到该方法中
1 2 3 4 5 6 7 8 9 10 public void onPut (Datum datum, RaftPeer source) throws NacosException { try { raftCore.onPublish(datum, source); } catch (Exception e) { Loggers.RAFT.error("Raft onPut failed." , e); throw new NacosException(NacosException.SERVER_ERROR, "Raft onPut failed, datum:" + datum + ", source: " + source, e); } }
SpringcloudConfigServer SpringCloudConfigServer为外部配置(名称-值对或等效的YAML内容)提供了基于HTTP的API。通过使用@EnableConfigServer注释,服务器可以嵌入到Spring Boot应用程序中
1 Spring Cloud Config Server provides an HTTP resource-based API for external configuration (name-value pairs or equivalent YAML content). The server is embeddable in a Spring Boot application, by using the @EnableConfigServer annotation. Consequently, the following application is a config server:
1 2 3 4 5 6 7 8 ConfigServer.java @SpringBootApplication @EnableConfigServer public class ConfigServer { public static void main (String[] args) { SpringApplication.run(ConfigServer.class, args); } }
Spring Cloud Config Client 1 A Spring Boot application can take immediate advantage of the Spring Config Server (or other external property sources provided by the application developer). It also picks up some additional useful features related to Environment change events.
@RefreshScope和ContextRefresher 通过外部化配置(.properties)的刷新,在应用不需要重启的情况下热加载新的外部化配置的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 通过 Spring Cloud 原生注解 @RefreshScope 实现配置自动更新: @RestController @RequestMapping("/config") @RefreshScope public class ConfigController { @Value("${useLocalCache:false}") private boolean useLocalCache; @RequestMapping("/get") public boolean get() { return useLocalCache; } }
单独管理Bean生命周期 创建Bean的时候如果是RefreshScope就缓存在一个专门管理的ScopeMap中,这样就可以管理Scope是Refresh的Bean的生命周期了(所以含RefreshScope的其实一共创建了两个bean)。 重新创建Bean 外部化配置刷新之后,会触发一个动作,这个动作将上面的ScopeMap中的Bean清空,这样这些Bean就会重新被IOC容器创建一次,使用最新的外部化配置的值注入类中,达到热加载新值的效果。
在SpringIOC中,BeanScope(Bean的作用域)影响了Bean的管理方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @ManagedResource public class RefreshScope extends GenericScope implements ApplicationContextAware , ApplicationListener <ContextRefreshedEvent >, Ordered public enum ScopedProxyMode { DEFAULT, NO, INTERFACES, TARGET_CLASS; }
作用域
描述
singleton(单例)
每一个Spring IoC容器都拥有唯一的一个实例对象(默认作用域) 。
prototype(原型)
一个Bean定义,任意多个对象
request(请求)
每一个HTTP请求都有自己的Bean实例(只在基于web的Spring ApplicationContext中可用)
session(会话)
一个Bean的作用域为HTTPsession的生命周期(只有基于web的Spring ApplicationContext才能使用)
global session(全局会话)
一个Bean的作用域为全局HTTPSession的生命周期。通常用于门户网站场景(只有基于web的Spring ApplicationContext才能使用) 。
Nacos动态更新地址原理 Nacos客户端中有一个HostReactor类,它的功能是实现服务的动态更新,基本原理是:
客户端发起时间订阅后,在HostReactor中有一个UpdateTask线程,每10s发送一次Pull请求,获得服务端最新的地址列表 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个Push消息给Nacos客户端,也就是服务端消费者 服务消费者收到请求之后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表
@RefreshScope使用注意事项 @RefreshScope作用的类,不能是final类,否则启动时会报错。 @RefreshScope不能单独使用,需要和其他其他bean注解结合使用,如:@Controller、@Service、@Component、@Repository、@Configuration等。 @RefreshScope 最好不要修饰在 @Scheduled、listener、Timmer等类中,因为配置的刷新会导致原来的对象被清除,需要重新使用对象才能出发生成新对象(但因为对象没了,又没法重新使用对象,死循环)
@RefreshScope动态刷新失效 考虑使用的bean是否是@RefreshScope生成的那个scopedTarget.beanName的 bean springboot某些低版本貌似有问题,在Controller类上使用不会生效(网上有这么说的,没具体研究) 解决方法1:注解上加属性@RefreshScope(proxyMode = ScopedProxyMode.DEFAULT) 解决方法2:直接使用其他类单独封装配置参数,使用@RefreshScope+@Value方式 解决方法3:直接使用@ConfigurationProperties
不使用@RefreshScope也能实现动态刷新 直接使用@ConfigurationProperties,并不需要加@RefreshScope就能实现动态更新。
@ConfigurationProperties实现动态刷新的原理: @ConfigurationProperties有ConfigurationPropertiesRebinder这个监听器,监听着EnvironmentChangeEvent事件。当发生EnvironmentChange事件后,会重新构造原来的加了@ConfigurationProperties注解的Bean对象。这个是Spring Cloud的默认实现。
@Autowired方式注入的是代理对象 beanName的得到的是代理对象 scopedTarget.beanName的得到的@RefreshScope生成的那个原类对象 代理对象不会随着配置刷新而更新 @RefreshScope生成的那个原类对象会随着配置的刷新而更新(属性时清除原来的,使用时才生成新的)