Nacos源码解析

官网:https://nacos.io/zh-cn/docs/architecture.html

官方架构原理文档:https://developer.aliyun.com/ebook/36/10664?spm=a2c6h.26392459.ebook-detail.4.31b934e2j9LrWh

官网架构

架构

总流程

流程

官方文档中并未着重介绍动态刷新的原理,结合@RefreshScope和ContextRefresher来引出HostReactor

为何要看源码

1
2
3
4
5
6
提升技术功底:学习源码里的优秀设计思想,比如一些疑难问题的解决思路,还有一些优秀的设计模式,整体提升自己的技术功底
深度掌握技术框架:源码看多了,对于一个新技术或框架的掌握速度会有大幅提升,看下框架demo大致就能知道底层的实现,技术框架更新再快也不怕
快速定位线上问题:遇到线上问题,特别是框架源码里的问题(比如bug),能够快速定位,这就是相比其他没看过源码的人的优势
对面试大有裨益:面试一线互联网公司对于框架技术一般都会问到源码级别的实现
知其然知其所以然:对技术有追求的人必做之事,使用了一个好的框架,很想知道底层是如何实现的
拥抱开源社区:参与到开源项目的研发,结识更多大牛,积累更多优质人脉

一、入口分析

服务注册与发现@EnableDiscoveryClient

1
2
3
4
5
6
7
8
9
10
11
12
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)//@Import注解用来帮助我们把一些需要定义为Bean的类导入到IOC容器里面
public @interface EnableDiscoveryClient {

/**
* If true, the ServiceRegistry will automatically register the local server.默认为true
*/
boolean autoRegister() default true;
}

在调用autoRegister()方法时默认返回true,在本接口中还引入了ImportSelector类即@Import(EnableDiscoveryClientImportSelector.class),其中selectImports调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public String[] selectImports(AnnotationMetadata metadata) {
String[] imports = super.selectImports(metadata);

AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
//获取本类的autoRegister属性
boolean autoRegister = attributes.getBoolean("autoRegister");

if (autoRegister) {
List<String> importsList = new ArrayList<>(Arrays.asList(imports));
importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = importsList.toArray(new String[0]);
} else {
Environment env = getEnvironment();
if(ConfigurableEnvironment.class.isInstance(env)) {
ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
//注入yml文件属性
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource(
"springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}

}

return imports;
}

二、META-INF/spring.factories

1
2
3
4
5
6
7
8
9
10
11
12
13
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.loadbalancer.LoadBalancerNacosAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.context.ApplicationListener=\
com.alibaba.cloud.nacos.discovery.logging.NacosLoggingListener

流程
NacosDiscoveryAutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}

@Bean
@ConditionalOnMissingBean
public NacosServiceDiscovery nacosServiceDiscovery(
NacosDiscoveryProperties discoveryProperties,
NacosServiceManager nacosServiceManager) {
return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
}

}

三、服务自动注册

流程
流程
Spring Cloud 有 Euerka、ZK、Nacos 等多种注册中心的实现,想要达到实现统一必须有一套规范,而Spring Cloud Commons 就是定义了这一规范。
Spring Cloud Commons里面的org.springframework.cloud.client.serviceregistry包下面有 AutoServiceRegistration、Registration、ServiceRegistry这三个接口,这是服务注册的核心接口
其中Registration继承了ServiceInstance,在ServiceInstance中规范了一个服务实例属性
1、AutoServiceRegistration用于服务自动注册。自动注册的意思就是,服务启动后自动把服务信息注册到注册中心。它的存在就是要规范实现必须要有自动注册.

2、Registration存储服务信息,用于规范将什么信息注册到注册中心,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public interface Registration extends ServiceInstance {

}
public interface ServiceInstance {

/**
* @return The unique instance ID as registered.
*/
//注册的唯一ID
default String getInstanceId() {
return null;
}

/**
* @return The service ID as registered.
*/
String getServiceId();

/**
* @return The hostname of the registered service instance.
*/
String getHost();

/**
* @return The port of the registered service instance.
*/
int getPort();

/**
* @return Whether the port of the registered service instance uses HTTPS.
*/
是否使用HTTPS
boolean isSecure();

/**
* @return The service URI address.
*/
URI getUri();

/**
* @return The key / value pair metadata associated with the service instance.
*/
Map<String, String> getMetadata();

/**
* @return The scheme of the service instance.
*/
注册服务的约束
default String getScheme() {
return null;
}

}

3、ServiceRegistry用来注册或者注销服务,下线服务以及心跳检测等都可在此循迹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public interface ServiceRegistry<R extends Registration> {

/**
* Registers the registration. A registration typically has information about an
* instance, such as its hostname and port.
* @param registration registration meta data
*/
void register(R registration);

/**
* Deregisters the registration.
* @param registration registration meta data
*/
void deregister(R registration);

/**
* Closes the ServiceRegistry. This is a lifecycle method.
*/
void close();

/**
* Sets the status of the registration. The status values are determined by the
* individual implementations.
* @param registration The registration to update.
* @param status The status to set.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
void setStatus(R registration, String status);

/**
* Gets the status of a particular registration.
* @param registration The registration to query.
* @param <T> The type of the status.
* @return The status of the registration.
* @see org.springframework.cloud.client.serviceregistry.endpoint.ServiceRegistryEndpoint
*/
<T> T getStatus(R registration);

}

三、Nacos实现

NacosRegistration

1
public class NacosRegistration implements Registration, ServiceInstance 

NacosServiceRegistry

1
public class NacosServiceRegistry implements ServiceRegistry<Registration> 

将配置文件封装为Instance实例,调用namingService.registerInstance(serviceId, instance)方法将服务注册到注册中心,在2.1.1当中则是通过NacosServiceManager来管理NameService的

1
2
3
4
5
6
7
8
try {
Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
nacosServiceManager.getNamingMaintainService(nacosProperties).updateInstance(
serviceId, nacosDiscoveryProperties.getGroup(), instance);
}
catch (Exception e) {
throw new RuntimeException("update nacos instance status fail", e);
}

NacosAutoServiceRegistration

1
2
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> 
-> public abstract class AbstractAutoServiceRegistration<R extends Registration>implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {

通过间接实现ApplicationListener在服务启动时调用onApplicationEvent将服务注册到服务中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);

static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
return (event) -> {
consumer.accept(event.getPayload());
};
}
}

@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}

public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}


public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}

// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}

}

publishiEvent则是在AbstractApplicationContext中调用的

1
2
3
4
5
监听器在消息源之后初始化,以便能够以在监听器实现中访问它。因此消息源实现不能发布事件。
@Override
public void publishEvent(Object event) {
publishEvent(event, null);
}

客户端如何向服务端注册

com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
public void register(Registration registration) {

if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}

NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

Instance instance = getNacosInstanceFromRegistration(registration);

try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);

final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));

reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {

params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}

NacosException exception = new NacosException();

if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());

for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size();
}
}

NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
exception.getErrMsg());

throw new NacosException(exception.getErrCode(),
"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

}

HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);


@CanDistro//在Distro协议中加以判断是否处理本服务。
@PostMapping
@Secured(action = ActionTypes.WRITE) //com.alibaba.nacos.auth.annotation.Secured权限控制
public String register(HttpServletRequest request) throws Exception {

final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
instance.getIp(), instance.getPort()));
return "ok";
}

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);

boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}

在NacosServiceRegistry实例化的时候实例化nacosServiceManager,然后调用nacosServiceManager的getNamingService方法,构造实例化参数后
流程

把服务放入serviceMap服务注册表中

初始化服务,创建一个健康检查的任务(主线–健康检查代码)

向一个队列中添加一个监听器(RecordListener类型)当监听到某些事件时会执行里面的onChange方法,Nacos大量运用了观察者设计模式,比如实例的注册、剔除等会被抽象成一个个的任务放到一个阻塞队列中,当监听到有任务时进来时,监听器会处理这些任务,执行onChange方法

com.alibaba.nacos.naming.consistency.RecordListener#onChange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void onChange(String key, Service service) throws Exception {
try {
if (service == null) {
Loggers.SRV_LOG.warn("received empty push from raft, key: {}", key);
return;
}

if (StringUtils.isBlank(service.getNamespaceId())) {
service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
}

Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}", key, service);

Service oldDom = getService(service.getNamespaceId(), service.getName());

if (oldDom != null) {
oldDom.update(service);
// re-listen to handle the situation when the underlying listener is removed:
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true),
oldDom);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false),
oldDom);
} else {
putServiceAndInit(service);
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);
}
}

addInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);

consistencyService.put(key, instances);
}
}

consistencyService

AP

流程
clientOperationService.registerInstance(service, instance, clientId)中调用了ServiceManager

CP

流程

Service:主要关心实例的变动

ServiceManager:主要关心服务的变动

SwitchManager:主要关心AP模型、CP模型的切换

重要切换AP,CP命令

1
curl -X PUT '$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'

Nacos 2.0源码分析-健康检查机制

Nacos支持众多健康检查类型,心跳、HTTP、TCP、MySQL等类型
验证来自com.alibaba.nacos.api.naming.pojo.healthcheck.HealthCheckType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* TCP type.
*/
TCP(Tcp.class),
/**
* HTTP type.
*/
HTTP(Http.class),
/**
* MySQL type.
*/
MYSQL(Mysql.class),
/**
* No check.
*/
NONE(AbstractHealthChecker.None.class);

流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class BeatTask implements Runnable {

BeatInfo beatInfo;

public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}

@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
//发送心跳续约
//lightBeatEnabled默认值为false
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get(CLIENT_BEAT_INTERVAL_FIELD).asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map<String, String> params = new HashMap<>(16);
Map<String, String> bodyMap = new HashMap<>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put(IP_PARAM, beatInfo.getIp());
params.put(PORT_PARAM, String.valueOf(beatInfo.getPort()));
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}

服务端处理心跳续约

com.alibaba.nacos.naming.controllers.InstanceController#beat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public ObjectNode beat(HttpServletRequest request) throws Exception {

ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
String clusterName = WebUtils
.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
if (clientBeat != null) {
if (StringUtils.isNotBlank(clientBeat.getCluster())) {
clusterName = clientBeat.getCluster();
} else {
// fix #2533
clientBeat.setCluster(clusterName);
}
ip = clientBeat.getIp();
port = clientBeat.getPort();
}
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务名格式
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
serviceName, namespaceId);
BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
builder.setRequest(request);
//获取实例信息
int resultCode = getInstanceOperator()
.handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
result.put(CommonParams.CODE, resultCode);
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}

在2.1.1当中getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);封装了对Service的处理包含开启续约,更新客户端心跳时间
分为客户端和服务端
流程
对于服务端我们看下service#handleBeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,
RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {
com.alibaba.nacos.naming.core.Instance instance = serviceManager
.getInstance(namespaceId, serviceName, cluster, ip, port);

if (instance == null) {
if (clientBeat == null) {
return NamingResponseCode.RESOURCE_NOT_FOUND;
}

Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
instance = parseInstance(builder.setBeatInfo(clientBeat).setServiceName(serviceName).build());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
//获取服务实例
Service service = serviceManager.getService(namespaceId, serviceName);
//检查服务是否为空
serviceManager.checkServiceIsNull(service, namespaceId, serviceName);

if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(cluster);
}
//服务续约
service.processClientBeat(clientBeat);
return NamingResponseCode.OK;
}

/**
* Schedule client beat check task without a delay.
*
* @param task health check task
* @return scheduled future
*/
public static ScheduledFuture<?> scheduleNow(Runnable task) {
return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
}
将心跳时间设置为当前时间

健康检查

com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void putServiceAndInit(Service service) throws NacosException {
putService(service);
service = getService(service.getNamespaceId(), service.getName());
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
/**
* Init service.
*/
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}

健康检查任务

ClientBeatCheckTask

1
2
3
4
5
6
7
8
9
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) 
getInstanceHeartBeatTimeOut
15秒

public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);

public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);

public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
if (!getDistroMapper().responsible(service.getName())) {
return;
}

if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}

List<Instance> instances = service.allIPs(true);

// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
}
}
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

// then remove obsolete instances:
for (Instance instance : instances) {

if (instance.isMarked()) {
continue;
}
//当超过30秒无响应则删除
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}

} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}

}

总结:Zookeeper中在进行服务注册的时候,发起一个长连接,比如用Nio或者Netty,会一直占用管道,而Nacos只是发起一个http请求,发起请求后就结束了,Nacos在1.4.x版本中是典型的短链接(当然2.0后改用gRpc长连接),而Zk采用长连接方式建立通道,如果在客户端服务器非常非常多,会比较耗性能的,Nacos相比会轻量不少哦,Zk为了保证服务变动的一致性,监听回调机制就会立刻通知到客户端,响应是很及时的,所以Zk保证了Cp,Nacos有两块保证了心跳,一块是客户端的定时拉取,一块是udp反向推送,即便udp丢失了,也有定时任务兜底。

TcpHealthCheckProcessor

HealthCheckProcess
com.alibaba.nacos.naming.healthcheck.v2.processor.TcpHealthCheckProcessor

1
2
3
4
5
6
7
8
9
10
11
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable{

public static final String TYPE = "TCP";

@Autowired
private HealthCheckCommon healthCheckCommon;

@Autowired
private SwitchDomain switchDomain;

}

除了定义类型之外都会有的共同属性healthCheckCommon和switchDomain,其实现实在内部类的Beat.finishCheck方法中
判断是否健康

1
2
3
public boolean isHealthy() {
return System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(30L);
}

这里checkok的方法也是调用Distro协议来判断当前ip在集群中的服务是否需要处理,当需要时返回true,此时就会将ip设置为健康,并将服务的最后一次修改时间设置为当前时间,服务监听器推送当前服务

1
2
3
NotifyCenter.publishEvent(new HealthStateChangeTraceEvent(System.currentTimeMillis(),
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(), ip.getPort(),
true, msg));

Nacos一致性协议

分布式一致性协议有很多,例如Paxos协议,Zab协议,Raft协议,而Nacos采用的是Distro协议和Raft协议。对于非临时数据,Nacos采用的是Raft协议,而临时数据Nacos采用的是Distro协议。简单说一下Distro,Distro协议被定位为临时数据的一致性协议:该类型协议不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话,该会话只要存在,数据就不会丢失

Distro协议

Distro 协议是 Nacos 对于临时实例数据开发的一致性协议。其数据存储在缓存中,并且会在启动时进行全量数据同步,并定期进行数据校验。

在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请求场景主要分为三种情况:

1、当该节点接收到属于该节点负责的实例的写请求时,直接写入。

2、当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。

3、当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。

Distro 协议作为 Nacos 的内嵌临时实例一致性协议,保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和一致性。

Nacos中有CP和AP两种模式,而这两种模式在实现数据一致性方案上面是完全不一样的,对于CP模式而言,使用的是raft这种强一致性协议,对于AP模式而言,则是使用阿里自创的Distro协议
在上文中对服务的注册中,调用instanceController当中的register方法时实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@CanDistro//在Distro协议中加以判断是否处理本服务。
@PostMapping
@Secured(action = ActionTypes.WRITE)//com.alibaba.nacos.auth.annotation.Secured权限控制
public String register(HttpServletRequest request) throws Exception {

final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//服务名中不能带有@@,服务名不能为空
NamingUtils.checkServiceNameFormat(serviceName);

final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
instance.getIp(), instance.getPort()));
return "ok";
}

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);

boolean ephemeral = instance.isEphemeral();
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
createIpPortClientIfAbsent(clientId);
Service service = getService(namespaceId, serviceName, ephemeral);
clientOperationService.registerInstance(service, instance, clientId);
}

其中CanDistro则是跟Distro协议有关
DistroFilter
来自nacos2.1.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
ReuseHttpServletRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
HttpServletResponse resp = (HttpServletResponse) servletResponse;

String urlString = req.getRequestURI();

if (StringUtils.isNotBlank(req.getQueryString())) {
urlString += "?" + req.getQueryString();
}

try {
//获取方法
Method method = controllerMethodsCache.getMethod(req);

String path = new URI(req.getRequestURI()).getPath();
if (method == null) {
throw new NoSuchMethodException(req.getMethod() + " " + path);
}
//是否有CanDistro注解
if (!method.isAnnotationPresent(CanDistro.class)) {
filterChain.doFilter(req, resp);
return;
}
String distroTag = distroTagGenerator.getResponsibleTag(req);
//判断是否处理这个请求,需要处理则直接放行
if (distroMapper.responsible(distroTag)) {
filterChain.doFilter(req, resp);
return;
}

// proxy request to other server if necessary:
String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);

if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
// This request is sent from peer server, should not be redirected again:
Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"receive invalid redirect request from peer " + req.getRemoteAddr());
return;
}

final String targetServer = distroMapper.mapSrv(distroTag);

List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
String headerName = headers.nextElement();
headerList.add(headerName);
headerList.add(req.getHeader(headerName));
}

final String body = IoUtils.toString(req.getInputStream(), StandardCharsets.UTF_8.name());
final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());

RestResult<String> result = HttpClient
.request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(), req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
}
} catch (AccessControlException e) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
} catch (NoSuchMethodException e) {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
"no such api:" + req.getMethod() + ":" + req.getRequestURI());
} catch (Exception e) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ", e);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Server failed, " + ExceptionUtil.getAllExceptionMsg(e));
}

}

DistroFilter这个类是实现了Filter接口,所以表明了它是一个过滤器,在请求来的时候,会经过doFilter方法,在doFilter方法中大概有下面4个过程:

1
2
3
4
5
6
7
1.根据请求路径从controllerMethodsCache中获取到对应的controller方法

2.判断这个controller方法是否有@CanDistro注解,如果有的话再调用distroMapper.responsible()方法去判断当前nacos节点是否需要处理这个请求

3.如果controller方法没有@CanDistro注解,或者有@CanDistro注解并且当前nacos节点需要处理这个请求,那么就直接放行这个请求到controller端

4.反之如果controller方法有@CanDistro注解并且当前nacos节点不需要处理这个请求,那么就会把这个请求转发到对应的其他节点去处理

其中第一点中从controllerMethodsCache中获取对应的controller方法,那么是怎么获取的呢?所以我们要看下ControllerMethodsCache中的getMethod方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerMethodsCache.class);

private ConcurrentMap<RequestMappingInfo, Method> methods = new ConcurrentHashMap<>();

private final ConcurrentMap<String, List<RequestMappingInfo>> urlLookup = new ConcurrentHashMap<>();


public Method getMethod(HttpServletRequest request) {
String path = getPath(request);
String httpMethod = request.getMethod();
String urlKey = httpMethod + REQUEST_PATH_SEPARATOR + path.replaceFirst(EnvUtil.getContextPath(), "");
List<RequestMappingInfo> requestMappingInfos = urlLookup.get(urlKey);
if (CollectionUtils.isEmpty(requestMappingInfos)) {
return null;
}
List<RequestMappingInfo> matchedInfo = findMatchedInfo(requestMappingInfos, request);
if (CollectionUtils.isEmpty(matchedInfo)) {
return null;
}
RequestMappingInfo bestMatch = matchedInfo.get(0);
if (matchedInfo.size() > 1) {
RequestMappingInfoComparator comparator = new RequestMappingInfoComparator();
matchedInfo.sort(comparator);
bestMatch = matchedInfo.get(0);
RequestMappingInfo secondBestMatch = matchedInfo.get(1);
if (comparator.compare(bestMatch, secondBestMatch) == 0) {
throw new IllegalStateException(
"Ambiguous methods mapped for '" + request.getRequestURI() + "': {" + bestMatch + ", "
+ secondBestMatch + "}");
}
}
return methods.get(bestMatch);
}

调用链如下
ConsoleConfig
可以看到源头是在一个叫ConsoleConfig的类的init方法开始的,并且这个init方法加了@PostConstruct注解,表示在spring容器启动的时候就能够被调用该方法,在init方法中,调用了4次ControllerMethodsCache的initClassMethod方法,分别传了不同的包名,在initClassMethod方法中会根据传入的包名然后找到加了@RequestMapping注解的类,然后寻找每一个类中加了@RequestMapping注解的方法,然后构造出一个RequestMappingInfo对象,其中给这个RequestMappingInfo对象设置两个校验,一个是请求路径的校验,一个是请求参数的校验,然后把urlKey和RequestMappingInfo对象放到urlLookup这个map中,再把RequestMappingInfo对象和controller方法放到methods这个map中。所以经过上面的分析,我们可以做一个小总结,在spring容器启动的时候,nacos就会在指定的几个包名下找到所有加了@RequestMapping注解的controller类,然后再找到这些类下面加了@RequestMapping注解的方法,再构造出一个RequestMappingInfo校验对象用来对请求路径和请求参数进行校验匹配,而请求路径的检验是根据@RequestMapping注解指定的请求方式以及请求路径去构造出一个urlKey作为校验匹配的条件,请求参数校验则是根据@RequestMapping注解中的params属性作为检验匹配的条件,最终就会把这个RequestMappingInfo校验对象和对应的controller方法放到methods这个map中了。所以当有请求过来的时候,DistroFilter会进行拦截,首先会根据请求路径构造出urlKey,再根据urlKey找到对应的RequestMappingInfo检验对象,然后使用这个RequestMappingInfo校验对象对这个请求参数进行校验,如果校验不通过则返回null,校验通过则再根据这个RequestMappingInfo对象找到对应的controller方法

Distro弱一致性协议实现原理

String distroTag = distroTagGenerator.getResponsibleTag(req);
通过上面我们知道在DistroFilter中会根据请求找到对应的controller方法,然后会去判断这个controller方法上是否有@CanDistro注解,如果有的话会再判断当前的nacos节点是否需要对这个请求进行处理,而这个判断就是通过distroMapper.responsible()这个方法去判断的,那么这个方法具体是干什么的呢?其实这个方法就是实现distro弱一致性协议的核心,我们看下这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean responsible(String responsibleTag) {
final List<String> servers = healthyList;
// 条件成立:没有开启distro协议,或者是nacos服务是单机模式
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
// 返回true表示需要响应处理这个service
return true;
}

if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
// 获取到当前nacos服务在集群中的位置索引
// index和lastIndex通常都会相等
String localAddress = EnvUtil.getLocalAddress();
int index = servers.indexOf(localAddress);
int lastIndex = servers.lastIndexOf(localAddress);
if (lastIndex < 0 || index < 0) {
return true;
}

int target = distroHash(responsibleTag) % servers.size();
return target >= index && target <= lastIndex;
}

首先这个方法的作用是判断当前nacos节点是否需要负责处理指定的服务,如果不负责处理就返回true,反之就返回false。在开始的时候会去判断当前是否开启了distro协议,如果没有开启就返回true,以及会去判断这个nacos节点是否是单机模式,如果是单机模式就返回true,也就是说在单机模式下,distro协议是不起作用的,很好理解,因为distro协议就是解决了集群之间数据同步一致性的一种方案,而单机模式也没有所谓的数据同步,自然distro协议是不需要的。然后就是会去获取到当前nacos节点在整个nacos集群中的索引位置,并且对指定的服务名通过distroHash方法获取到一个值,把这个值与整个nacos集群节点数进行取模得到一个target值,如果这个target值是等于当前nacos节点所在集群的索引位置值,那么就返回true,反之就返回false。所以对于每一个服务,它都会通过上面这种方式分配到具体的nacos节点,也就是说每一个nacos节点都会负责一部分的服务,那么这这难道nacos集群是分布式集群吗 ?很显然不是的,虽然说每一个nacos节点只会负责一部分的服务请求,但是nacos之间会进行数据的同步,也就是nacos集群的每一个节点数据是最终一致性的,所以这也就是什么说distro协议是一个弱一致性的协议了。而如果这个服务请求根据distro协议的规则判断之后发现不归当前这个nacos节点负责处理怎么办呢?这时候就需要对这个服务请求进行转发了,此时会通过distro协议的规则重新计算找出负责处理这个服务请求的nacos节点,然后当前nacos节点就把这个请求重转发到指定的nacos节点,这样整个distro协议的实现流程就完成了

Nacos集群选举

Raft协议演示地址:http://thesecretlivesofdata.com/raft/

Raft一致性协议

Nacos支持集群模式,很显然。 而一旦涉及到集群,就涉及到主从,那么nacos是一种什么样的机制来实现的集群呢?

nacos的集群类似于zookeeper,它分为leader角色和follower角色,那么从这个角色的名字可以看出来,这个集群存在选举的机制。 因为如果自己不具备选举功能,角色的命名可能就是master/slave了,当然这只是我基于这么多组件的命名的一个猜测。

Raft协议是一种强一致性、去中心化、高可用的分布式协议,它是用来解决分布式一致性问题的,相对于大名鼎鼎的Paxos协议,Raft协议更容易理解,并且在性能、可靠性、可用性方面是不输于Paxos协议的。许多中间件都是利用Raft协议来保证分布式一致性的,例如Redis的sentinel,CP模式的Nacos的leader选举都是通过Raft协议来实现的。因为Nacos的一致性协议是采用的Raft协议。

在Raft中,节点有三种角色:

Leader:负责接收客户端的请求
Candidate:用于选举Leader的一种角色
Follower:负责响应来自Leader或者Candidate的请求
选举分为两个时间点:

服务启动的时候
leader挂了的时候
所有节点启动的时候,都是follower状态。 如果在一段时间内如果没有收到leader的心跳(可能是没有 leader,也可能是leader挂了),那么follower会变成Candidate。然后发起选举,选举之前,会增加 term,这个term和zookeeper中的epoch的道理是一样的。

follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复

在这个过程中,可能出现几种情况

收到过半的票数通过,则成为leader
被告知其他节点已经成为leader,则自己切换为follower
一段时间内没有收到过半的投票,则重新发起选举
选举的几种情况:

第一种情况,赢得选举之后,leader会给所有节点发送消息,避免其他节点触发新的选举
第二种情况,比如有三个节点A B C。A B同时发起选举,而A的选举消息先到达C,C给A投了一 票,当B的消息到达C时,已经不能满足上面提到的第一个约束,即C不会给B投票,而A和B显然都不会给对方投票。A胜出之后,会给B,C发心跳消息,节点B发现节点A的term不低于自己的term,知道有已经有Leader了,于是转换成follower。
第三种情况,没有任何节点获得majority(超过半数的)投票,可能是平票的情况。加入总共有四个节点 (A/B/C/D),Node C、Node D同时成为了candidate,但Node A投了NodeD一票,NodeB投 了Node C一票,这就出现了平票 split vote的情况。这个时候大家都在等啊等,直到超时后重新发 起选举。如果出现平票的情况,那么就延长了系统不可用的时间,因此raft引入了randomized election timeouts来尽量避免平票情况.

RaftCore初始化

这里有几个核心概念或组件:

1.peer:代表每台nocas机器,记录着一台server的投票相关的元数据信息,比如本机的ip,投票给谁(votefor),AtomicLong类型的term,记录本地服务第几次发起的投票,状体(leader/follower),leader选举间隔时间等。

2.peers:是个RaftPeerSet类型,实际上记录了整个集群所有peer的信息。

3.notifier:一个线程,用作事件通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@DependsOn("ProtocolManager")
@Component
public class RaftCore {

//构建一个单线程池
private final ScheduledExecutorService executor = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.raft.notifier"));

@PostConstruct
public void init() throws Exception {

Loggers.RAFT.info("initializing Raft sub-system");
//开启一个notifier监听,这个线程中会遍历listeners,根据ApplyAction执行相应的逻辑

executor.submit(notifier);

final long start = System.currentTimeMillis();
//启动的时候先加载本地日志
//遍历/nacos/data/naming/data/文件件,也就是从磁盘中加载Datum到内存,用来做数据恢复。(数据同步采用2pc协议,leader收到请求会写写入到磁盘日志,然后再进行数据同步)
//Datum:kv对
//datums:ConcurrentMap<String, Datum>内存数据存储
raftStore.loadDatums(notifier, datums);

//设置term值,从/nacos/data/naming/meta.properties本地磁盘中读取term的值,如果为null,默认为0
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}

initialized = true;

Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
//开启定时任务,每500ms执行一次,用来判断是否需要发起leader选举
GlobalExecutor.registerMasterElection(new MasterElection());
//每500ms发起一次心跳
GlobalExecutor.registerHeartbeat(new HeartBeat());

Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

}

RaftCore.MasterElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

public class MasterElection implements Runnable {

@Override
public void run() {
try {
//如果还没有初始化完成
if (!peers.isReady()) {
return;
}
//获取当前机器上跑的这个peer节点信息
RaftPeer local = peers.local();
//leader选举触发间隔时间,第一次进来,会生成(0~15000毫秒)之间的一个随机数-500.
//后面由于500ms调度一次,所以每次该线程被调起,会将该leaderDueMs减去TICK_PERIOD_MS(500ms),直到小于0的时候会触发选举
//后面每次收到一次leader的心跳就会重置leaderDueMs = 15s+(随机0-5s)
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
//当间隔时间>0,直接返回,等到下一次500ms后再调用
if (local.leaderDueMs > 0) {
return;
}

// reset timeout
//重置选举间隔时间
local.resetLeaderDue();
//重置心跳间隔时间
local.resetHeartbeatDue();
//将本地选举投票通过http发送其他几台服务器
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}

}

private void sendVote() {
//获取本机的节点信息
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);
//重置peers,各个peer的voteFor与leader设为null
peers.reset();
//每一次投票,都累加一次term,表示当前投票的轮数,选举计数器,记录本地发起的是第几轮选举
local.term.incrementAndGet();
//投票选自己,此时peers中有一个votefor就是自己
local.voteFor = local.ip;
//本地server状态设置为CANDIDATE竞选状态
local.state = RaftPeer.State.CANDIDATE;

Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));//设置本机请求参数
//遍历除了本机ip之外的其他节点,把自己的票据发送给所有节点,将选自己的投票发送给其他servers,获取其他机器的选票信息
for (final String server : peers.allServersWithoutMySelf()) {
//API_VOTE: /raft/vote
final String url = buildUrl(server, API_VOTE);
try {
//发起投票
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
//获取其他server的响应
RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class);

Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
//计算leader
peers.decideLeader(peer);

return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}

RaftController.vote

收到投票的票据

1
2
3
4
5
6
@PostMapping("/vote")
public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {

RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));

return JacksonUtils.transferToJsonNode(peer);

RaftCore.receivedVote

这个方法主要就是处理自己的选票的,当收到其他机器拉票的请求的时候,会比较term,如果自身的term大于全程请求机器的term,并且自己的选票没有还没投出去的时候,就把选票投给自己。
否则将选票投给远程请求的机器,并且把自己的状态设置为follower,并且把信息返回出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public synchronized RaftPeer receivedVote(RaftPeer remote) {
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
//获取本机的节点信息
RaftPeer local = peers.get(NetUtils.localServer());
//如果请求的任期小于自己的任期并且还没有投出选票,那么将票投给自己
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;

Loggers.RAFT.info(msg);
//如果voteFor为空,表示在此之前没有收到其他节点的票据。则把remote节点的票据设置到自己的节点上
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}

return local;
}
//如果上面if不成立,说明请求的任期>本地的任期 ,remote机器率先发起的投票,那么就认同他的投票
local.resetLeaderDue(); //重置本地机器的选举间隔时间

local.state = RaftPeer.State.FOLLOWER; //设置本机机器为follower,并且为请求过来的机器投票
local.voteFor = remote.ip;//本地机器投票给remote的机器
local.term.set(remote.term.get());;//同步remote的term


Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);

return local;
}

decideLeader

decideLeader,表示用来决策谁能成为leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);

SortedBag ips = new TreeBag();
//选票最多的票数
int maxApproveCount = 0;
//选票最多的ip
String maxApprovePeer = null;
/**
* 假设3个节点:A,B,C
* local节点为A,假设A,B,C第一轮同时发起选举请求
* 第一轮投票结果:
* 第一次for循环是A自己的投票(投票给自己):maxApproveCount = 1,maxApprovePeer = A
* 第二次for循环是B服务器返回的投票,该投票投向B:
* 此时 if (ips.getCount(peer.voteFor) > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A
* 第三次for循环是C服务器返回的投票,该投票投向C
* 此时 if (ips.getCount(peer.voteFor) > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A
* 第二轮投票结果:
* 第一次for循环是A自己的投票(投票给自己):maxApproveCount = 1,maxApprovePeer = A
* 第二次for循环是B服务器返回的投票,该投票投向A:
* 此时 if (ips.getCount(peer.voteFor) > maxApproveCount) 条件成立,maxApproveCount = 2,maxApprovePeer = A
* 第三次for循环是C服务器返回的投票,该投票投向C
* 此时 if (ips.getCount(peer.voteFor) > maxApproveCount) 条件不成立,maxApproveCount = 1,maxApprovePeer = A
*
*/
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
//收集选票
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
//majorityCount()过半节点数:2(假设3个节点)
//第一轮:maxApproveCount = 1 if条件不成立,返回leader,此时leader为null,没有选举成功
//第二轮:maxApproveCount = 2 if条件成立,返回leader,此时leader为A,没有选举成功
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;//成为Leader

if (!Objects.equals(leader, peer)) {
leader = peer;
// 如果当前leader和选举出来的leader不是同一个,那么将选举的leader重置并且发布一个leader选举完成的事件
ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local()));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
//返回Leader
return leader;
}

数据同步

addInstance

比如我们在注册服务时,调用addInstance之后,最后会调用 consistencyService.put(key, instances); 这个方法,来实现数据一致性的同步。

InstanceController.register—->registerInstance—–>addInstance——>consistencyService.put(key, instances);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);
//数据同步
consistencyService.put(key, instances);
}
}

RaftConsistencyServiceImpl.put

调用 consistencyService.put 用来发布类容,也就是实现数据的一致性同步。

1
2
3
4
5
6
7
8
9
10
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}

RaftCore.signalPublish

public static final Lock OPERATE_LOCK = new ReentrantLock();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public void signalPublish(String key, Record value) throws Exception {
//如果接受的节点不是Leader节点
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
//获取Leader节点
final RaftPeer leader = getLeader();
//转发到Leader节点
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}

//如果自己是leader,则向所有节点发送onPublish请求。这个所有节点包含自己
try {
//加锁
OPERATE_LOCK.lock();
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}

ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//onPublish可以当做是一次心跳了,更新选举检查时间,然后一个重点就是term增加100了。
//当然还是就是更新内容了,先写文件,再更新内存缓存。(也就是先记录本地日志)
onPublish(datum, peers.local()); //发送数据到所有节点

final String content = json.toString();
//CountDownLatch 用于控制过半提交
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//遍历所有节点,发送事务提交请求,把记录在本地日志中的数据进行提交
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
//API_ON_PUB: /raft/datum/commit 采用的是二阶段提交
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}

@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});

}

if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}

long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}

onPublish

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private volatile ConcurrentMap<String, Datum> datums = new ConcurrentHashMap<>();

public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}

if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}

if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
//重置选举间隔时间
local.resetLeaderDue();

// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
//存储到本地磁盘中
raftStore.write(datum);
}
//并且存储到内存中
datums.put(datum.key, datum);
//如果是leader,term增加100
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
//更新本地磁盘文件meta.properties下的term值
raftStore.updateTerm(local.term.get());

notifier.addTask(datum.key, ApplyAction.CHANGE);

Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

我们看其他节点在接受到leader请求时是如何处理的,我们查看/v1/ns/raft/datum/commit接口的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {

response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");

String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");

JsonNode jsonObject = JacksonUtils.toObj(value);
String key = "key";

RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
JsonNode datumJson = jsonObject.get("datum");

Datum datum = null;
//根据不同数据类型进行处理
if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
});
} else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
});
} else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
});
}

raftConsistencyService.onPut(datum, source);
return "ok";
}

主要的核心在于 raftConsistencyService.onPut(datum, source);我们进入到该方法中

1
2
3
4
5
6
7
8
9
10
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
//在本地写入数据
raftCore.onPublish(datum, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onPut failed.", e);
throw new NacosException(NacosException.SERVER_ERROR,
"Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}

raft

SpringcloudConfigServer

SpringCloudConfigServer为外部配置(名称-值对或等效的YAML内容)提供了基于HTTP的API。通过使用@EnableConfigServer注释,服务器可以嵌入到Spring Boot应用程序中

1
Spring Cloud Config Server provides an HTTP resource-based API for external configuration (name-value pairs or equivalent YAML content). The server is embeddable in a Spring Boot application, by using the @EnableConfigServer annotation. Consequently, the following application is a config server:
1
2
3
4
5
6
7
8
ConfigServer.java
@SpringBootApplication
@EnableConfigServer
public class ConfigServer {
public static void main(String[] args) {
SpringApplication.run(ConfigServer.class, args);
}
}

Spring Cloud Config Client

1
A Spring Boot application can take immediate advantage of the Spring Config Server (or other external property sources provided by the application developer). It also picks up some additional useful features related to Environment change events.

@RefreshScope和ContextRefresher

通过外部化配置(.properties)的刷新,在应用不需要重启的情况下热加载新的外部化配置的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
通过 Spring Cloud 原生注解 @RefreshScope 实现配置自动更新:
@RestController
@RequestMapping("/config")
@RefreshScope
public class ConfigController {

@Value("${useLocalCache:false}")
private boolean useLocalCache;

@RequestMapping("/get")
public boolean get() {
return useLocalCache;
}
}

单独管理Bean生命周期
创建Bean的时候如果是RefreshScope就缓存在一个专门管理的ScopeMap中,这样就可以管理Scope是Refresh的Bean的生命周期了(所以含RefreshScope的其实一共创建了两个bean)。
重新创建Bean
外部化配置刷新之后,会触发一个动作,这个动作将上面的ScopeMap中的Bean清空,这样这些Bean就会重新被IOC容器创建一次,使用最新的外部化配置的值注入类中,达到热加载新值的效果。

在SpringIOC中,BeanScope(Bean的作用域)影响了Bean的管理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

@ManagedResource
public class RefreshScope extends GenericScope
implements ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, Ordered


public enum ScopedProxyMode {

/**
* Default typically equals {@link #NO}, unless a different default
* has been configured at the component-scan instruction level.
*/
DEFAULT,

/**
* Do not create a scoped proxy.
* <p>This proxy-mode is not typically useful when used with a
* non-singleton scoped instance, which should favor the use of the
* {@link #INTERFACES} or {@link #TARGET_CLASS} proxy-modes instead if it
* is to be used as a dependency.
*/
NO,

/**
* Create a JDK dynamic proxy implementing <i>all</i> interfaces exposed by
* the class of the target object.
*/
INTERFACES,

/**
* Create a class-based proxy (uses CGLIB).
*/
TARGET_CLASS;

}
作用域 描述
singleton(单例) 每一个Spring IoC容器都拥有唯一的一个实例对象(默认作用域)
prototype(原型) 一个Bean定义,任意多个对象
request(请求) 每一个HTTP请求都有自己的Bean实例(只在基于web的Spring ApplicationContext中可用)
session(会话) 一个Bean的作用域为HTTPsession的生命周期(只有基于web的Spring ApplicationContext才能使用)
global session(全局会话) 一个Bean的作用域为全局HTTPSession的生命周期。通常用于门户网站场景(只有基于web的Spring ApplicationContext才能使用)

Nacos动态更新地址原理

Nacos客户端中有一个HostReactor类,它的功能是实现服务的动态更新,基本原理是:

客户端发起时间订阅后,在HostReactor中有一个UpdateTask线程,每10s发送一次Pull请求,获得服务端最新的地址列表
对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个Push消息给Nacos客户端,也就是服务端消费者
服务消费者收到请求之后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表

  1. @RefreshScope使用注意事项
    @RefreshScope作用的类,不能是final类,否则启动时会报错。
    @RefreshScope不能单独使用,需要和其他其他bean注解结合使用,如:@Controller、@Service、@Component、@Repository、@Configuration等。
    @RefreshScope 最好不要修饰在 @Scheduled、listener、Timmer等类中,因为配置的刷新会导致原来的对象被清除,需要重新使用对象才能出发生成新对象(但因为对象没了,又没法重新使用对象,死循环)
  2. @RefreshScope动态刷新失效
    考虑使用的bean是否是@RefreshScope生成的那个scopedTarget.beanName的 bean
    springboot某些低版本貌似有问题,在Controller类上使用不会生效(网上有这么说的,没具体研究)
    解决方法1:注解上加属性@RefreshScope(proxyMode = ScopedProxyMode.DEFAULT)
    解决方法2:直接使用其他类单独封装配置参数,使用@RefreshScope+@Value方式
    解决方法3:直接使用@ConfigurationProperties
  3. 不使用@RefreshScope也能实现动态刷新
    直接使用@ConfigurationProperties,并不需要加@RefreshScope就能实现动态更新。

@ConfigurationProperties实现动态刷新的原理:
@ConfigurationProperties有ConfigurationPropertiesRebinder这个监听器,监听着EnvironmentChangeEvent事件。当发生EnvironmentChange事件后,会重新构造原来的加了@ConfigurationProperties注解的Bean对象。这个是Spring Cloud的默认实现。

@Autowired方式注入的是代理对象
beanName的得到的是代理对象
scopedTarget.beanName的得到的@RefreshScope生成的那个原类对象
代理对象不会随着配置刷新而更新
@RefreshScope生成的那个原类对象会随着配置的刷新而更新(属性时清除原来的,使用时才生成新的)

{% if post.top %} 置顶 | {% endif %}