點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
本文主要基於 Eureka 1.8.X 版本
-
1. 概述
-
2. Eureka-Client 發起續租
-
2.1 初始化定時任務
-
2.2 HeartbeatThread
-
2.3 TimedSupervisorTask
-
3. Eureka-Server 接收續租
-
3.1 接收續租請求
-
3.2 續租應用實體資訊
-
666. 彩蛋
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 續租應用實體的過程。
FROM 《深度剖析服務發現元件Netflix Eureka》 二次編輯
-
藍框部分,為本文重點。
-
非藍框部分,Eureka-Server 叢集間複製註冊的應用實體資訊,不在本文內容範疇。
推薦 Spring Cloud 書籍:
-
請支援正版。下載盜版,等於主動編寫低階 BUG 。
-
程式猿DD —— 《Spring Cloud微服務實戰》
-
周立 —— 《Spring Cloud與Docker微服務架構實戰》
推薦 Spring Cloud 影片:
-
Java 微服務實踐 – Spring Boot
-
Java 微服務實踐 – Spring Cloud
-
Java 微服務實踐 – Spring Boot / Spring Cloud
2. Eureka-Client 發起續租
Eureka-Client 向 Eureka-Server 發起註冊應用實體成功後獲得租約 ( Lease )。
Eureka-Client 固定間隔向 Eureka-Server 發起續租( renew ),避免租約過期。
預設情況下,租約有效期為 90 秒,續租頻率為 30 秒。兩者比例為 1 : 3 ,保證在網路異常等情況下,有三次重試的機會。
2.1 初始化定時任務
Eureka-Client 在初始化過程中,建立心跳執行緒,固定間隔向 Eureka-Server 發起續租( renew )。實現程式碼如下:
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider backupRegistryProvider) {
// ... 省略無關程式碼
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略無關程式碼
// 【3.2.14】初始化定時任務
initScheduledTasks();
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(續租)執行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 續租頻率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// ... 省略無關程式碼
}
// ... 省略無關程式碼
}
-
scheduler
,定時任務服務,用於定時觸發心跳( 續租 )。細心如你,會發現任務提交的方式是ScheduledExecutorService#schedule(...)
方法,只延遲執行一次心跳,說好的固定頻率執行心跳呢!!!答案在 「2.3 TimedSupervisorTask」 揭曉。 -
heartbeatExecutor
,心跳任務執行執行緒池。為什麼有scheduler
的情況下,還有heartbeatExecutor
???答案也在 「2.3 TimedSupervisorTask」 揭曉。 -
HeartbeatThread,心跳執行緒,在「2.2 TimedSupervisorTask」 詳細解析。
2.2 HeartbeatThread
com.netflix.discovery.DiscoveryClient.HeartbeatThread
,心跳執行緒,實現執行 Eureka-Client 向 Eureka-Server 發起續租( renew )請求。實現程式碼如下:
// DiscoveryClient.java
/**
* 最後成功向 Eureka-Server 心跳時間戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
-
呼叫
#renew
方法,執行續租邏輯。實現程式碼如下:// DiscoveryClient.java
boolean renew() {
EurekaHttpResponsehttpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 發起註冊
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
} -
PUT 請求 Eureka-Server 的
apps/${APP_NAME}/${INSTANCE_INFO_ID}
介面,引數為status
、lastDirtyTimestamp
、overriddenstatus
,實現續租。 -
呼叫
AbstractJerseyEurekaHttpClient#sendHeartBeat(...)
方法,發起續租請求,實現程式碼如下:// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponsesendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuildereurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).essay-headers(essay-headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
} -
呼叫
AbstractJerseyEurekaHttpClient#register(...)
方法,當 Eureka-Server 不存在租約時,重新發起註冊,在《Eureka 原始碼解析 —— 應用實體註冊發現 (一)之註冊》有詳細解析。
2.3 TimedSupervisorTask
com.netflix.discovery.TimedSupervisorTask
,監管定時任務的任務。
A supervisor task that schedules subtasks while enforce a timeout.
建立 TimedSupervisorTask 程式碼如下:
public class TimedSupervisorTask extends TimerTask {
private final Counter timeoutCounter;
private final Counter rejectedCounter;
private final Counter throwableCounter;
private final LongGauge threadPoolLevelGauge;
/**
* 定時任務服務
*/
private final ScheduledExecutorService scheduler;
/**
* 執行子任務執行緒池
*/
private final ThreadPoolExecutor executor;
/**
* 子任務執行超時時間
*/
private final long timeoutMillis;
/**
* 子任務
*/
private final Runnable task;
/**
* 當前任子務執行頻率
*/
private final AtomicLong delay;
/**
* 最大子任務執行頻率
*
* 子任務執行超時情況下使用
*/
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counters and register.
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
}
-
scheduler
,定時任務服務,用於定時【發起】子任務。 -
executor
,執行子任務執行緒池,用於【提交】子任務執行。 -
task
,子任務。 -
timeoutMillis
,子任務執行超時時間,單位:毫秒。 -
delay
,當前子任務執行頻率,單位:毫秒。值等於timeout
引數。 -
maxDelay
,最大子任務執行頻率,單位:毫秒。值等於timeout * expBackOffBound
引數。
-
scheduler
初始化延遲執行 TimedSupervisorTask 。 -
TimedSupervisorTask 執行時,提交
task
到executor
執行任務。 -
當
task
執行正常,TimedSupervisorTask 再次提交自己到scheduler
延遲timeoutMillis
執行。 -
當
task
執行超時,重新計算延遲時間( 不允許超過maxDelay
),再次提交自己到scheduler
延遲執行。
實現程式碼如下:
// TimedSupervisorTask.java
1: @Override
2: public void run() {
3: Future> future = null;
4: try {
5: // 提交 任務
6: future = executor.submit(task);
7: //
8: threadPoolLevelGauge.set((long) executor.getActiveCount());
9: // 等待任務 執行完成 或 超時
10: future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
11: // 設定 下一次任務執行頻率
12: delay.set(timeoutMillis);
13: //
14: threadPoolLevelGauge.set((long) executor.getActiveCount());
15: } catch (TimeoutException e) {
16: logger.error("task supervisor timed out", e);
17: timeoutCounter.increment(); //
18:
19: // 設定 下一次任務執行頻率
20: long currentDelay = delay.get();
21: long newDelay = Math.min(maxDelay, currentDelay * 2);
22: delay.compareAndSet(currentDelay, newDelay);
23:
24: } catch (RejectedExecutionException e) {
25: if (executor.isShutdown() || scheduler.isShutdown()) {
26: logger.warn("task supervisor shutting down, reject the task", e);
27: } else {
28: logger.error("task supervisor rejected the task", e);
29: }
30:
31: rejectedCounter.increment(); //
32: } catch (Throwable e) {
33: if (executor.isShutdown() || scheduler.isShutdown()) {
34: logger.warn("task supervisor shutting down, can't accept the task");
35: } else {
36: logger.error("task supervisor threw an exception", e);
37: }
38:
39: throwableCounter.increment(); //
40: } finally {
41: // 取消 未完成的任務
42: if (future != null) {
43: future.cancel(true);
44: }
45:
46: // 排程 下次任務
47: if (!scheduler.isShutdown()) {
48: scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49: }
50: }
51: }
-
第 5 至 6 行 :提交子任務
task
到執行子任務執行緒池executor
。 -
第 9 至 10 行 :等待子任務
task
執行完成或執行超時。 -
第 11 至 12 行 :子任務
task
執行完成,設定下一次執行延遲delay
。 -
第 19 至 22 行 :子任務
task
執行超時,重新計算下一次執行延遲delay
。計算公式為Math.min(maxDelay, currentDelay * 2)
。如果多次超時,超時時間不斷乘以 2 ,不允許超過最大延遲時間(maxDelay
)。 -
第 41 至 44 行 :強制取消未完成的子任務。
-
第 46 至 49 行 :排程下一次 TimedSupervisorTask 。
3. Eureka-Server 接收續租
3.1 接收續租請求
com.netflix.eureka.resources.InstanceResource
,處理單個應用實體資訊的請求操作的 Resource ( Controller )。
續租應用實體資訊的請求,對映 InstanceResource#renewLease()
方法,實現程式碼如下:
1: @PUT
2: public Response renewLease(
3: @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
4: @QueryParam("overriddenstatus") String overriddenStatus,
5: @QueryParam("status") String status,
6: @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
7: boolean isFromReplicaNode = "true".equals(isReplication);
8: // 續租
9: boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10:
11: // 續租失敗
12: // Not found in the registry, immediately ask for a register
13: if (!isSuccess) {
14: logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
15: return Response.status(Status.NOT_FOUND).build();
16: }
17:
18: // 比較 InstanceInfo 的 lastDirtyTimestamp 屬性
19: // Check if we need to sync based on dirty time stamp, the client
20: // instance might have changed some value
21: Response response = null;
22: if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
23: response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
24: // Store the overridden status since the validation found out the node that replicates wins
25: if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
26: && (overriddenStatus != null)
27: && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
28: && isFromReplicaNode) {
29: registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
30: }
31: } else { // 成功
32: response = Response.ok().build();
33: }
34: logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
35: return response;
36: }
-
第 8 至 9 行 :呼叫
PeerAwareInstanceRegistryImpl#renew(...)
方法,續租。實現程式碼如下:// PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) { // 續租
// Eureka-Server 複製
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
} -
呼叫父類
AbstractInstanceRegistry#renew(...)
方法,註冊應用實體資訊。 -
第 11 至 16 行 :續租失敗,傳回 404 響應。當 Eureka-Client 收到 404 響應後,會重新發起 InstanceInfo 的註冊。
-
第 18 至 30 行 :比較請求的
lastDirtyTimestamp
和 Server 的 InstanceInfo 的lastDirtyTimestamp
屬性差異,需要配置eureka.syncWhenTimestampDiffers = true
( 預設開啟 )。 -
第 7 至 11 行 :請求的
lastDirtyTimestamp
較大,意味著請求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 叢集內的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的資料不一致,傳回 404 響應。請求方收到 404 響應後重新發起註冊。 -
第 16 至 21 行 :《Eureka 原始碼解析 —— Eureka-Server 叢集同步》 有詳細解析。
-
第 22 至 24 行 :Server 的
lastDirtyTimestamp
較大,並且請求方為 Eureka-Client,續租成功,傳回 200 成功響應。 -
第 29 行 :
lastDirtyTimestamp
一致,傳回 200 成功響應。 -
第 23 行 :呼叫
#validateDirtyTimestamp(...)
方法,比較lastDirtyTimestamp
的差異。實現程式碼如下:// InstanceResource.java
1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
2: // 獲取 InstanceInfo
3: InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
4: if (appInfo != null) {
5: if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
6: Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
7: // 請求 的 較大
8: if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
9: logger.debug("Time to sync, since the last dirty timestamp differs -"
10: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
11: return Response.status(Status.NOT_FOUND).build();
12: // Server 的 較大
13: } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
14: // In the case of replication, send the current instance info in the registry for the
15: // replicating node to sync itself with this one.
16: if (isReplication) {
17: logger.debug(
18: "Time to sync, since the last dirty timestamp differs -"
19: + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
20: args);
21: return Response.status(Status.CONFLICT).entity(appInfo).build();
22: } else {
23: return Response.ok().build();
24: }
25: }
26: }
27:
28: }
29: return Response.ok().build();
30: } -
第 24 至 30 行 :《Eureka 原始碼解析 —— Eureka-Server 叢集同步》 有詳細解析。
-
第 31 至 33 行 :續租成功,傳回 200 成功響應。
3.2 續租應用實體資訊
呼叫 AbstractInstanceRegistry#renew(...)
方法,續租應用實體資訊,實現程式碼如下:
1: public boolean renew(String appName, String id, boolean isReplication) {
2: // 增加 續租次數 到 監控
3: RENEW.increment(isReplication);
4: // 獲得 租約
5: Map> gMap = registry.get(appName);
6: Lease leaseToRenew = null;
7: if (gMap != null) {
8: leaseToRenew = gMap.get(id);
9: }
10: // 租約不存在
11: if (leaseToRenew == null) {
12: RENEW_NOT_FOUND.increment(isReplication);
13: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
14: return false;
15: } else {
16: InstanceInfo instanceInfo = leaseToRenew.getHolder();
17: if (instanceInfo != null) {
18: // touchASGCache(instanceInfo.getASGName());
19: // override status
20: InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
21: instanceInfo, leaseToRenew, isReplication);
22: if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
23: logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
24: + "; re-register required", instanceInfo.getId());
25: RENEW_NOT_FOUND.increment(isReplication);
26: return false;
27: }
28: if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
29: Object[] args = {
30: instanceInfo.getStatus().name(),
31: instanceInfo.getOverriddenStatus().name(),
32: instanceInfo.getId()
33: };
34: logger.info(
35: "The instance status {} is different from overridden instance status {} for instance {}. "
36: + "Hence setting the status to overridden status", args);
37: instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
38: }
39: }
40: // 新增 續租每分鐘次數
41: renewsLastMin.increment();
42: // 設定 租約最後更新時間(續租)
43: leaseToRenew.renew();
44: return true;
45: }
46: }
-
第 2 至 3 行 :增加續租次數到監控。配合 Netflix Servo 實現監控資訊採集。
-
第 4 至 9 行 :獲得租約( Lease )。
-
第 10 至 14 行 :租約不存在,傳回續租失敗(
false
)。 -
第 19 至 21 行 :獲得應用實體的最終狀態。在《應用實體註冊發現 (八)之改寫狀態》詳細解析。
-
第 22 至 27 行 :應用實體的最終狀態為
UNKNOWN
,無法續約,傳回false
。在《應用實體註冊發現 (八)之改寫狀態》詳細解析。 -
第 28 至 37 行 :應用實體的狀態與最終狀態不相等,使用最終狀態改寫應用實體的狀態。在《應用實體註冊發現 (八)之改寫狀態》詳細解析。
-
第 40 至 41 行 :新增續租每分鐘次數(
renewsLastMin
)。com.netflix.eureka.util.MeasuredRate
,速度測量類,實現程式碼如下:// AbstractInstanceRegistry.java
/**
* 續租每分鐘次數
*/
private final MeasuredRate renewsLastMin;
// MeasuredRate.java
public class MeasuredRate {
/**
* 上一個間隔次數
*/
private final AtomicLong lastBucket = new AtomicLong(0);
/**
* 當前間隔次數
*/
private final AtomicLong currentBucket = new AtomicLong(0);
/**
* 間隔
*/
private final long sampleInterval;
/**
* 定時器
*/
private final Timer timer;private volatile boolean isActive;
public MeasuredRate(long sampleInterval) {
this.sampleInterval = sampleInterval;
this.timer = new Timer("Eureka-MeasureRateTimer", true);
this.isActive = false;
}
public synchronized void start() {
if (!isActive) {
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
public synchronized void stop() {
if (isActive) {
timer.cancel();
isActive = false;
}
}
/**
* Returns the count in the last sample interval.
*/
public long getCount() {
return lastBucket.get();
}
/**
* Increments the count in the current sample interval.
*/
public void increment() {
currentBucket.incrementAndGet();
}}
-
配合 Netflix Servo 實現監控資訊採集續租每分鐘次數。
-
Eureka-Server 運維介面的顯示續租每分鐘次數。
-
自我保護機制,在 《Eureka 原始碼解析 —— 應用實體註冊發現 (四)之自我保護機制》 詳細解析。
-
timer
,定時器,負責每個sampleInterval
間隔重置當前次數(currentBucket
),並將原當前次數設定到上一個次數(lastBucket
)。 -
#increment()
方法,傳回當前次數(currentBucket
)。 -
#getCount()
方法,傳回上一個次數(lastBucket
)。 -
renewsLastMin
有如下用途: -
第 42 至 43 行 :呼叫
Lease#renew()
方法,設定租約最後更新時間( 續租 ),實現程式碼如下:public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
} -
x
-
第 44 行 :傳回續租成功(
true
)。 -
整個過程修改的租約的過期時間,即使併發請求,也不會對資料的一致性產生不一致的影響,因此像註冊操作一樣加鎖。
666. 彩蛋
效率比想象的低一些,加油繼續更新下一篇。
胖友,分享我的公眾號( 芋道原始碼 ) 給你的胖友可好?