🍊 Java学习:社区快速通道
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年5月25日🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
分布式系统是由多个计算机节点构成的系统,这些节点之间通过网络进行通信和协作。由于节点之间的网络连接不可靠,因此在分布式系统中,一个节点可能会因为网络故障或其他原因而失去与其他节点的联系。为了解决这个问题,分布式系统引入了心跳机制。

心跳机制是指每个节点定期向其他节点发送“心跳”消息,以表明自己的存在和正常运行。如果一个节点在一段时间内没有收到来自其他节点的心跳消息,那么它就会认为这些节点已经失去了联系,并采取相应的措施,例如重新选举领导节点或者启动备份节点。
心跳机制的实现方式可以分为基于UDP协议、基于TCP协议和基于HTTP协议三种。
基于UDP协议的心跳机制具有实现简单、网络开销小等优点。但由于UDP协议本身不可靠,因此可能会出现一些误判情况。
基于TCP协议的心跳机制具有可靠性高、误判率低等优点。但由于TCP协议本身的特性,可能会出现一些网络延迟等问题。
基于HTTP协议的心跳机制具有易于实现、可扩展性好等优点。但由于HTTP协议本身的特性,可能会出现一些网络延迟等问题。
在实际应用中,需要根据具体情况选择合适的心跳机制实现方式。
对于实时性要求较高的应用场景,可以选择基于UDP协议的心跳机制;对于对可靠性要求较高的应用场景,可以选择基于TCP协议的心跳机制。
SpringCloud也借助“心跳”来知晓服务的可用性,心跳检测有以下四种特点:
服务续约分为两步:
instance在注册中心记录的同步时间。服务节点向注册中心发送续约请求:
DiscoverClient类,它是所有操作的入口。所以续约服务就从这个类的renew方法开始lastDirtyTimeStamplastDirtyTimeStamp :由于重新注册意味着服务节点和注册中心的信息不同步,因此需要将当前系统时间更新到lastDirtyTimeStamplastDirtyTimeStamp不会清除,因为这个属性将会在后面的服务续约中作为参数发给注册中心,以便服务中心判断节点的同步状态。通过本章节,可以了解到:
在真正阅读之前,不妨先尝试回答一下上面的3个问题,带着疑问看文章,收获会更大 ~ 下面开始进入正题
打开DiscoveryClient,入口便是构造函数:

这里只关注服务的心跳是怎么发送的
通过方法名就可以看出这是一个在后台定时触发的任务
private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; if (this.clientConfig.shouldFetchRegistry()) { renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); } // 从这里开始看 if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs); this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); } else { DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent); } DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } 可以直接从上面的第10行开始关注
this.scheduler.schedule( new TimedSupervisorTask( "heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread() ), (long)renewalIntervalInSecs, TimeUnit.SECONDS); 是定时启动后台任务的方法
renewalIntervalInSecs表示每多少秒启动一次定时任务expBackOffBound是用来计算最大delay时间的this.maxDelay = this.timeoutMillis * (long)expBackOffBound; new DiscoveryClient.HeartbeatThread()是发送心跳的具体逻辑
private class HeartbeatThread implements Runnable { private HeartbeatThread() { } public void run() { if (DiscoveryClient.this.renew()) { DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } 其中renew相当于续约的逻辑,心跳和续约是一套相互作用的机制,renew在客户端是发送了一个心跳,服务端接收了心跳之后会进行服务的续约
boolean renew() { try { EurekaHttpResponse httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null); logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { this.REREGISTER_COUNTER.increment(); logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName()); long timestamp = this.instanceInfo.setIsDirtyWithTime(); boolean success = this.register(); if (success) { this.instanceInfo.unsetIsDirty(timestamp); } return success; } else { return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } } catch (Throwable var5) { logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5); return false; } } EurekaHttpResponse httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null); 和前面的服务注册一样一层层嵌套,第一层嵌套先是SessionEurekaClient:
下一层是retry,再下一层是redirective,再下一层是matrix…和服务注册一模一样
直接进到最后一层AbstractJerseyEurekaHttpClient
public EurekaHttpResponse sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; EurekaHttpResponse var10; try { WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); this.addExtraHeaders(requestBuilder); response = (ClientResponse)requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } var10 = eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", new Object[]{this.serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()}); } if (response != null) { response.close(); } } return var10; } 构造服务请求路径

之后构造WebResource对象
WebResource webResource = this.jerseyClient.resource(this.serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp() .toString()); 这里的serviceUrl是注册中心的url,前面的是当前机器的url,“lastDirtyTimestamp”是一个核心的属性
之后就是组装参数的流程,最后将请求发送出去,至此客户端发送心跳的逻辑就结束了。
服务端使用InstanceResource中的renewLease方法来接收心跳包:
public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode); if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id); return Response.status(Status.NOT_FOUND).build(); } else { Response response; if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) { this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()}); return response; } } 当前心跳包是来自服务的提供者,并不是冗余备份,所以isFromReplicaNode是false。下面代码
boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode); 是续约的方法
public boolean renew(final String appName, final String serverId, boolean isReplication) { this.log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); List applications = this.getSortedApplications(); Iterator var5 = applications.iterator(); while(var5.hasNext()) { Application input = (Application)var5.next(); if (input.getName().equals(appName)) { InstanceInfo instance = null; Iterator var8 = input.getInstances().iterator(); while(var8.hasNext()) { InstanceInfo info = (InstanceInfo)var8.next(); if (info.getId().equals(serverId)) { instance = info; break; } } this.publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } return super.renew(appName, serverId, isReplication); } 方法的入参serverId一定是唯一的
其中
List applications = this.getSortedApplications(); 获取所有的application,判断哪一个服务需要续约的时候是通过遍历的方式,当list里的ApplicationName和传入的name相同时再把appication下的所有instance全部拿到,找出instance的id和serverId相同的就知道该为哪一个instance进行续约了
this.publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); 发布一个续约成功的event
最后进入到return后面调用的renew函数里
public boolean renew(String appName, String id, boolean isReplication) { if (super.renew(appName, id, isReplication)) { this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Heartbeat, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication); return true; } else { return false; } } replicateToPeers表示高可用注册中心有多个中心节点,需要向peer同步,继续进到父类的renew方法:
public boolean renew(String appName, String id, boolean isReplication) { EurekaMonitors.RENEW.increment(isReplication); Map> gMap = (Map)this.registry.get(appName); Lease leaseToRenew = null; if (gMap != null) { leaseToRenew = (Lease)gMap.get(id); } if (leaseToRenew == null) { EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = (InstanceInfo)leaseToRenew.getHolder(); if (instanceInfo != null) { InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}; re-register required", instanceInfo.getId()); EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info("The instance status {} is different from overridden instance status {} for instance {}. Hence setting the status to overridden status", new Object[]{instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()}); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } this.renewsLastMin.increment(); leaseToRenew.renew(); return true; } } Map> gMap = (Map)this.registry.get(appName); 通过appName得到所有的租约,因为现在只有一个节点,所以租约只是1,如果租约不为空则通过serverId拿到租约
leaseToRenew = (Lease)gMap.get(id); 租约不为空,先获得到instance的信息:
InstanceInfo instanceInfo = (InstanceInfo)leaseToRenew.getHolder(); instance的状态是UNKNOWN,则EurekaMonitors.RENEW_NOT_FOUND增加isReplicationinstance和当前的instance不相同(之前是down,现在发来心跳包是up),需要执行instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); public synchronized void setStatusWithoutDirty(InstanceInfo.InstanceStatus status) { if (this.status != status) { this.status = status; } } 这里是将status设置到instanceInfo里。
this.renewsLastMin.increment(); leaseToRenew.renew(); public void renew() { this.lastUpdateTimestamp = System.currentTimeMillis() + this.duration; } 这里仅仅是将lastUpdateTimestamp进行更新
回到InstanceResource的renewLease方法里:
此时如果renew的逻辑不成功,那么返回给客户端NOT_FOUND;renew成功则继续流程
Response response; if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) { this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()}); return response; lastDirtyTimestamp表示最近一次和服务端出现脏数据的时间戳,是从客户端发来的
如果lastDirtyTimestamp不为空且设置了需要做数据同步,则进入if逻辑,先验证一下
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) { InstanceInfo appInfo = this.registry.getInstanceByAppAndId(this.app.getName(), this.id, false); if (appInfo != null && lastDirtyTimestamp != null && !lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp())) { Object[] args = new Object[]{this.id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication}; if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) { logger.debug("Time to sync, since the last dirty timestamp differs - ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); return Response.status(Status.NOT_FOUND).build(); } if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) { if (isReplication) { logger.debug("Time to sync, since the last dirty timestamp differs - ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args); return Response.status(Status.CONFLICT).entity(appInfo).build(); } return Response.ok().build(); } } return Response.ok().build(); } 先根据appName和serverId获取InstanceInfo,如果产生了一段时间不同步的情况
NOT_FOUNDCONFLICT;如果是客户端发过来的则直接返回OK回到InstanceResource的renewLease方法里:
if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) { this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus)); } 这个if进不去,此后服务续约的流程就完成了