更新时间:2021-09-18 00:00:10
参考资料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;
Nacos 客户端将自己注册进 Nacos 服务器。《1. 服务如何注册进 Nacos 注册中心》主要从 Nacos 客户端角度解释如何发送信息给 Nacos 服务器;《2. Nacos 服务器注册服务》主要从 Nacos 服务器角度解释注册原理;
《3. 客户端查询所有服务实例》将从服务消费者和提供者的角度,解释服务消费者如何获取提供者的所有实例。服务消费者和提供者都是 Nacos 的客户端;
《4. 客户端监听 Nacos 服务器以动态获取服务实例》从消费者客户端角度出发监听 Nacos 服务器,以动态获知提供者的变化;
public interface ServiceRegistry<R extends Registration>{
void register(R registration);
void deregister(R registration);
void close();
void setStatus(R registration,String status);
<T> T getstatus(R registration);
}
META-INF/spring.factories
中包含自动装配的配置信息。即约定 Spring Cloud 启动时,会将那些类自动注入到容器中:@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
value = {"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
//自动注册类
@Autowired(required = false)
private AutoServiceRegistration autoServiceRegistration;
//自动注册类的配置文件
@Autowired
private AutoServiceRegistrationProperties properties;
public AutoServiceRegistrationAutoConfiguration() {
}
//初始化函数
@PostConstruct
protected void init() {
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
}
}
}
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}
AbstractAutoServiceRegistration.bind()
方法实现了该接口,用来监听 WebServerInitializedEvent(服务初始化事件);@Override
@SuppressWarnings("deprecation")
//【断点步入】
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
我们给 AbstractAutoServiceRegistration.bind()
方法打上断点,启动服务提供者,可以发现:
AbstractAutoServiceRegistration.bind()
方法将服务注册进 Nacos 注册中心;这里能说明什么时候服务会将自己的信息发给 Nacos 服务器;
AbstractAutoServiceRegistration.bind()
方法,发现在 AbstractAutoServiceRegistration(服务注册器抽象类) 里调用 NacosServiceRegistry.register()
方法后, Nacos 服务器上出现服务实例;NacosServiceRegistry.register()
方法,方法的逻辑如下:@Override
public void register(Registration registration) {
//判断是否有服务 ID
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId(); //服务 ID(service-provider)
Instance instance = getNacosInstanceFromRegistration(registration); //服务实例(里面有 ip、port 等信息)
try {
//【断点步入】注册的方法
namingService.registerInstance(serviceId, instance);
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
namingService.registerInstance()
方法(实现逻辑在 NacosNamingService.registerInstance())得到注册的具体逻辑,如下:public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
//用心跳 BeatInfo 封装服务实例信息
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//【断点步入 1.4.1】将 beatInfo 心跳信息放进 beatReactor 心跳发送器(发送心跳后,Nacos 服务器仍然没有服务实例)
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//【断点步入 1.4.2】使用 namingProxy 命名代理方式将服务实例信息以 POST 请求方式发送服务实例
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
即注册服务实例的逻辑分两步:
beatReactor.addBeatInfo
创建心跳信息实现健康检测;BeatReactor.addBeatInfo()
方法一探心跳机制,源码如下:public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
//【核心】定时向服务端发送一个心跳包 beatInfo
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
//【核心】
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
BeatReactor.addBeatInfo()
方法主要做了两件事:
ScheduledExecutorService.schedule()
接口方法(靠 ScheduledThreadPoolExecutor(计划线程执行器) 实现)执行定时任务,在每个任务周期内定时向服务端发送一个心跳包 beatInfo;MetricsMonitor.getDom2BeatSizeMonitor()
方法获取一个 心跳测量监视器(实际为 Gauge),不断检测服务端的回应,如果在设定时间内没有收到服务端的回应,则认为服务器出现了故障;NamingProxy.registerService()
方法,源码如下:public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(9);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
//【断点步入】这步执行完后,Nacos 服务器才出现服务实例信息
this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
NamingProxy.reqAPI()
方法拼凑注册服务的 API。 NamingProxy.reqAPI()
方法源码如下: public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new IllegalArgumentException("no server available");
} else {
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for(int i = 0; i < servers.size(); ++i) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
} else {
int i = 0;
while(i < 3) {
try {
return this.callServer(api, params, this.nacosDomain);
} catch (Exception var13) {
exception = var13;
LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
++i;
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
}
}
}
POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'
NacosServiceRegistry.register()
注册逻辑;注册原理分两步:
POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'
register
方法用来处理客户端的注册,源码如下:@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
//省略其他代码
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//从请求体里解析出 namespaceId 命名空间(本例中是 public)
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//解析 serviceName 服务名(本例中是 DEFAULT_GROUP@@service-provider,实际就是service-provider)
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
//【断点步入】在服务器控制台注册服务实例的方法
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
//省略其他代码
}
ServiceManager.registerInstance()
方法的源码如下:public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//【断点步入 2.3】创建空服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//添加服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
这段代码主要做了三个逻辑的内容:
createEmptyService()
:创建空服务(用于在 Nacos 服务器控制台的“服务列表”中展示服务信息),实际上是初始化一个 serviceMap,它是一个 ConcurrentHashMap 集合;getService()
:从 serviceMap 中根据 namespaceld 和 serviceName 得到一个服务对象;addInstance()
:把当前注册的服务实例保存到 Service 中;ServiceManager.createServiceIfAbsent()
方法,源码如下:public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
//通过命名空间 ID 和服务名从缓存中获取 service 服务,第一次是没有的,进入 if 语句创建 service
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//判断服务是否有效,不生效将抛出异常
service.validate();
//【断点步入】添加与初始化服务
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
ServiceManager.putServiceAndInit()
方法;private void putServiceAndInit(Service service) throws NacosException {
//【断点步入 2.3.1 】将服务添加到缓存
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//【断点步入 2.3.2 】建立心跳机制
service.init();
//【断点步入 2.3.3 】实现数据一致性的监听,将服务数据进行同步
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
//将 Service 保存到 serviceMap 中
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
public void init() {
//【断点步入】定时检查心跳
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
HealthCheckReactor.scheduleCheck()
方法;public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
// 这个键 key 会被两个服务监听
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
//持久一致性服务
persistentConsistencyService.listen(key, listener);
//短暂一致性服务
ephemeralConsistencyService.listen(key, listener);
return;
}
mapConsistencyService(key).listen(key, listener);
}
Nacos 客户端通过 Open API 的形式发送服务注册请求,服务端收到请求后,做以下三件事:
向 Nacos 服务器发送请求也是由两种形式,Open API 和 SDK。其中 SDK 最终也是以 Open API 方式发送的:
GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider
List<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;
GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider
请求,来模拟消费者客户端获取提供者实例;InstanceController.list()
接口处理该请求:@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
//解析命名空间 ID
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//解析服务名
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//解析获取一堆信息
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
//【断点步入 3.2】获取所有服务的所有信息
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
InstanceController.doSrvIpxt()
方法很长,笔者删去一些非重点源码方便理解:public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
//省略一些非核心代码
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//根据 namespaceld、serviceName 获得 Service 实例
Service service = serviceManager.getService(namespaceId, serviceName);
List<Instance> srvedIPs;
//获取指定服务下的所有实例,从Service实例中基于srvIPs得到所有服务提供者的实例信息
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
//使用选择器过滤一些 ip
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
//过滤不健康实例
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
//遍历完成 JSON 字符串的封装
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
//删除不可用的实例
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
该方法的主要逻辑主要有三步:
客户端服务有两种调用方式:
void subscribe(String serviceName, EventListener listener) throws NacosException
;public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe)
,其中 subscribe 置为 true;META-INF/spring.factories
文件中自动装配了一个和服务注册相关的配置类 DubboServiceRegistrationNonWebApplicationAutoConfiguration(Dubbo 注册自动配置类);@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {
@Autowired
private ServiceRegistry serviceRegistry; //实现类为 NacosServiceRegistry
...
//监听 ApplicationStartedEvent 事件,该事件在刷新上下文之后,调用 application 命令前触发;
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {
setServerPort();
//最终调用 NacosServiceRegistry.registry 方法实现服务注册
register();
}
private void register() {
if (registered) {
return;
}
//这里即 NacosServiceRegistry.registry()
serviceRegistry.register(registration);
registered = true;
}
...
}
NacosServiceRegistry.registry()
的原理详情请见本篇《1.4 注册服务实例的逻辑 NacosServiceRegistry.register()》;由此可以得出结论:
NacosServiceRegistry.register()
实现的;源码结构图大致与本篇目录一致;
AbstractAutoServiceRegistration.bind():监听服务初始化事件;
NacosServiceRegistry.register():注册服务实例;
NacosNamingService.registerInstance():通过Nacos 命名服务注册服务实例;
BeatReactor.addBeatInfo():心跳机制;
NamingProxy.registerService():以 Open API 方式发送注册请求;
NamingProxy.reqAPI():拼凑注册服务的 API;
InstanceController.register():服务器接收请求;
ServiceManager.registerInstance():在服务器控制台注册服务实例;
ServiceManager.createEmptyService():创建空服务;
ServiceManager.createServiceIfAbsent():如果空缺就创建服务;
ServiceManager.putServiceAndInit():初始化服务;
ServiceManager.addInstance():添加服务实例;
InstanceController.list():服务器接收请求;
InstanceController.doSrvIpxt():获取所有服务的所有信息;