摘要: 原創出處 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
排版又崩了,請【閱讀原文】。
本文主要基於 Hystrix 1.5.X 版本
-
1. 概述
-
2. HystrixCircuitBreaker
-
3. HystrixCircuitBreaker.Factory
-
4. HystrixCircuitBreakerImpl
-
4.1 構造方法
-
4.2 #subscribeToStream()
-
4.3 #attemptExecution()
-
4.4 #markSuccess()
-
4.5 #markNonSuccess()
-
4.6 #allowRequest()
-
4.7 #isOpen()
-
666. 彩蛋
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
1. 概述
本文主要分享 斷路器 HystrixCircuitBreaker。
HystrixCircuitBreaker 有三種狀態 :
-
CLOSED
:關閉 -
OPEN
:開啟 -
HALF_OPEN
:半開
其中,斷路器處於 OPEN
狀態時,鏈路處於非健康狀態,命令執行時,直接呼叫回退邏輯,跳過正常邏輯。
HystrixCircuitBreaker 狀態變遷如下圖 :
-
紅線 :初始時,斷路器處於
CLOSED
狀態,鏈路處於健康狀態。當滿足如下條件,斷路器從CLOSED
變成OPEN
狀態: -
週期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms
)內,總請求數超過一定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 。 -
錯誤請求佔總請求數超過一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50%
) 。 -
綠線 :斷路器處於
OPEN
狀態,命令執行時,若當前時間超過斷路器開啟時間一定時間(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds=5000ms
),斷路器變成HALF_OPEN
狀態,嘗試呼叫正常邏輯,根據執行是否成功,開啟或關閉熔斷器【藍線】。
推薦 Spring Cloud 書籍:
-
請支援正版。下載盜版,等於主動編寫低階 BUG 。
-
程式猿DD —— 《Spring Cloud微服務實戰》
-
周立 —— 《Spring Cloud與Docker微服務架構實戰》
-
兩書齊買,京東包郵。
2. HystrixCircuitBreaker
com.netflix.hystrix.HystrixCircuitBreaker
,Hystrix 斷路器介面。定義介面如下程式碼 :
public interface HystrixCircuitBreaker {
/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
boolean allowRequest();
/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
boolean isOpen();
/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markSuccess();
/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();
}
-
#allowRequest()
和#attemptExecution()
方法,方法目的基本類似,差別在於當斷路器滿足嘗試關閉條件時,前者不會將斷路器不會修改狀態(CLOSE=>HALF-OPEN
),而後者會。
HystrixCircuitBreaker 有兩個子類實現 :
-
NoOpCircuitBreaker :空的斷路器實現,用於不開啟斷路器功能的情況。
-
HystrixCircuitBreakerImpl :完整的斷路器實現。
在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,程式碼如下 :
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/**
* 斷路器
*/
protected final HystrixCircuitBreaker circuitBreaker;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略無關程式碼
// 初始化 斷路器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// ... 省略無關程式碼
}
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
}
-
當
HystrixCommandProperties.circuitBreakerEnabled=true
時,即斷路器功能開啟,使用 Factory 獲得 HystrixCircuitBreakerImpl 物件。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。 -
當
HystrixCommandProperties.circuitBreakerEnabled=false
時,即斷路器功能關閉,建立 NoOpCircuitBreaker 物件。另外,NoOpCircuitBreaker 程式碼簡單到腦殘,點選 連結 檢視實現。
3. HystrixCircuitBreaker.Factory
com.netflix.hystrix.HystrixCircuitBreaker.Factory
,HystrixCircuitBreaker 工廠,主要用於:
-
建立 HystrixCircuitBreaker 物件,目前只建立 HystrixCircuitBreakerImpl 。
-
HystrixCircuitBreaker 容器,基於 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例物件 的對映。程式碼如下 :
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
整體程式碼灰常清晰,點選 連結 檢視程式碼。
4. HystrixCircuitBreakerImpl
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl
,完整的斷路器實現。
我們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實現。
4.1 構造方法
構造方法,程式碼如下 :
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
}
-
Status 列舉類,斷路器的三種狀態。
-
status
屬性,斷路器的狀態。 -
circuitOpened
屬性,斷路器開啟,即狀態變成OPEN
的時間。 -
activeSubscription
屬性,基於 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。
4.2 #subscribeToStream()
#subscribeToStream()
方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。程式碼如下 :
private Subscription subscribeToStream() {
1: private Subscription subscribeToStream() {
2: /*
3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
4: */
5: return metrics.getHealthCountsStream()
6: .observe()
7: .subscribe(new Subscriber<HealthCounts>() {
8: @Override
9: public void onCompleted() {
10:
11: }
12:
13: @Override
14: public void onError(Throwable e) {
15:
16: }
17:
18: @Override
19: public void onNext(HealthCounts hc) {
20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用於除錯
21: // check if we are past the statisticalWindowVolumeThreshold
22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
23: // we are not past the minimum volume threshold for the stat window,
24: // so no change to circuit status.
25: // if it was CLOSED, it stays CLOSED
26: // if it was half-open, we need to wait for a successful command execution
27: // if it was open, we need to wait for sleep window to elapse
28: } else {
29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
30: //we are not past the minimum error threshold for the stat window,
31: // so no change to circuit status.
32: // if it was CLOSED, it stays CLOSED
33: // if it was half-open, we need to wait for a successful command execution
34: // if it was open, we need to wait for sleep window to elapse
35: } else {
36: // our failure rate is too high, we need to set the state to OPEN
37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
38: circuitOpened.set(System.currentTimeMillis());
39: }
40: }
41: }
42: }
43: });
44: }
-
第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裡的 Observable 基於 RxJava Window 運運算元。
FROM 《ReactiveX檔案中文翻譯》「Window」
定期將來自原始 Observable 的資料分解為一個 Observable 視窗,發射這些視窗,而不是每次發射一項資料 -
簡單來說,固定間隔,
#onNext()
方法將不斷被呼叫,每次計算斷路器的狀態。 -
第 22 行 :判斷週期( 可配,
HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms
)內,總請求數超過一定量( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 。 -
這裡要註意下,請求次數統計的是週期內,超過週期的不計算在內。例如說,
00:00
內發起了 N 個請求,00:11
不計算這 N 個請求。 -
第 29 行 :錯誤請求佔總請求數超過一定比例( 可配,
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50%
) 。 -
第 37 至 39 行 :滿足斷路器開啟條件,CAS 修改狀態(
CLOSED=>OPEN
),並設定開啟時間(circuitOpened
) 。 -
【補充】第 5 至 7 行 :? 怕寫在上面,大家有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 運運算元 :
-
Observable#window(timespan, unit)
方法,固定週期( 可配,HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds=500ms
),發射 Observable 視窗。點選 BucketedCounterStream 構造方法 檢視呼叫處的程式碼。 -
Observable#window(count, skip)
方法,每發射一次(skip
) Observable 忽略count
( 可配,HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20
) 個資料項。為什麼?答案在第 22 行的程式碼,週期內達到一定請求量是斷路器開啟的一個條件。點選 BucketedRollingCounterStream 構造方法 檢視呼叫處的程式碼。
目前該方法有兩處呼叫 :
-
「4.1 構造方法」,在建立 HystrixCircuitBreakerImpl 時,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。固定間隔,計算斷路器是否要關閉(
CLOSE
)。 -
「4.4 #markSuccess()」,清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊,取消原有訂閱,併發起新的訂閱。
4.3 #attemptExecution()
如下是 AbstractCommand#applyHystrixSemantics(_cmd)
方法,對 HystrixCircuitBreakerImpl#attemptExecution
方法的呼叫的程式碼 :
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// ... 省略無關程式碼
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
// 執行【正常邏輯】
} else {
// 執行【回退邏輯】
}
}
-
使用
HystrixCircuitBreakerImpl#attemptExecution
方法,判斷是否可以執行正常邏輯。
#attemptExecution
方法,程式碼如下 :
1: @Override
2: public boolean attemptExecution() {
3: // 強制 開啟
4: if (properties.circuitBreakerForceOpen().get()) {
5: return false;
6: }
7: // 強制 關閉
8: if (properties.circuitBreakerForceClosed().get()) {
9: return true;
10: }
11: // 開啟時間為空
12: if (circuitOpened.get() == -1) {
13: return true;
14: } else {
15: // 滿足間隔嘗試斷路器時間
16: if (isAfterSleepWindow()) {
17: //only the first request after sleep window should execute
18: //if the executing command succeeds, the status will transition to CLOSED
19: //if the executing command fails, the status will transition to OPEN
20: //if the executing command gets unsubscribed, the status will transition to OPEN
21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
22: return true;
23: } else {
24: return false;
25: }
26: } else {
27: return false;
28: }
29: }
30: }
-
第 4 至 6 行 :當
HystrixCommandProperties.circuitBreakerForceOpen=true
( 預設值 :false
) 時,即斷路器強制開啟,傳回false
。當該配置接入配置中心後,可以動態實現開啟熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,透過該配置以實現類似效果。 -
第 8 至 10 行 :當
HystrixCommandProperties.circuitBreakerForceClose=true
( 預設值 :false
) 時,即斷路器強制關閉,傳回true
。當該配置接入配置中心後,可以動態實現關閉熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動態切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,透過該配置以實現類似效果。 -
第 12 至 13 行 :斷路器開啟時間(
circuitOpened
) 為"空",傳回true
。 -
第 16 至 28 行 :呼叫
#isAfterSleepWindow()
方法,判斷是否滿足嘗試呼叫正常邏輯的間隔時間。當滿足,使用CAS 方式修改斷路器狀態(OPEN=>HALF_OPEN
),從而保證有且僅有一個執行緒能夠嘗試呼叫正常邏輯。
#isAfterSleepWindow()
方法,程式碼如下 :
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
-
在當前時間超過斷路器開啟時間
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds
( 預設值,5000ms
),傳回true
。
4.4 #markSuccess()
當嘗試呼叫正常邏輯成功時,呼叫 #markSuccess()
方法,關閉斷路器。程式碼如下 :
1: @Override
2: public void markSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
4: // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計資訊**
5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0
6: metrics.resetStream();
7: // 取消原有訂閱
8: Subscription previousSubscription = activeSubscription.get();
9: if (previousSubscription != null) {
10: previousSubscription.unsubscribe();
11: }
12: // 發起新的訂閱
13: Subscription newSubscription = subscribeToStream();
14: activeSubscription.set(newSubscription);
15: // 設定斷路器開啟時間為空
16: circuitOpened.set(-1L);
17: }
18: }
-
第 3 行 :使用 CAS 方式,修改斷路器狀態(
HALF_OPEN=>CLOSED
)。 -
第 6 行 :清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊。
-
第 8 至 14 行 :取消原有訂閱,發起新的訂閱。
-
第 16 行 :設定斷路器開啟時間為"空" 。
如下兩處呼叫了 #markNonSuccess()
方法 :
-
markEmits
-
markOnCompleted
4.5 #markNonSuccess()
當嘗試呼叫正常邏輯失敗時,呼叫 #markNonSuccess()
方法,重新開啟斷路器。程式碼如下 :
1: @Override
2: public void markNonSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
5: circuitOpened.set(System.currentTimeMillis());
6: }
7: }
-
第 3 行 :使用 CAS 方式,修改斷路器狀態(
HALF_OPEN=>OPEN
)。 -
第 5 行 :設定設定斷路器開啟時間為當前時間。這樣,
#attemptExecution()
過一段時間,可以再次嘗試執行正常邏輯。
如下兩處呼叫了 #markNonSuccess()
方法 :
-
handleFallback
-
unsubscribeCommandCleanup
4.6 #allowRequest()
#allowRequest()
和 #attemptExecution()
方法,方法目的基本類似,差別在於當斷路器滿足嘗試關閉條件時,前者不會將斷路器不會修改狀態( CLOSE=>HALF-OPEN
),而後者會。點選 連結 檢視程式碼實現。
4.7 #isOpen()
#isOpen()
方法,比較簡單,點選 連結 檢視程式碼實現。
666. 彩蛋
呼呼,相對比較乾凈的一篇文章,滿足。
胖友,分享一波朋友圈可好!