(給ImportNew加星標,提高Java技能)
轉自:崔世寧,
https://www.cnblogs.com/kingszelda/p/10312242.html
1. 什麼是Hystrix
Hystrix是Netflix的一個開源框架,地址如下:https://github.com/Netflix/Hystrix
中文名為“豪豬”,即平時很溫順,在感受到危險的時候,用刺保護自己;在危險過去後,還是一個溫順的肉球。
所以,整個框架的核心業務也就是這2點:
- 何時需要保護
- 如何保護
2. 何時需要保護
對於一個系統而言,它往往承擔著2層角色,服務提供者與服務消費者。對於服務消費者而言最大的痛苦就是如何“明哲保身”,做過閘道器專案的同學肯定感同身受
上面是一個常見的系統依賴關係,底層的依賴往往很多,通訊協議包括 socket、HTTP、Dubbo、WebService等等。當通訊層發生網路抖動以及所依賴的系統發生業務響應異常時,我們業務本身所提供的服務能力也直接會受到影響。
這種效果傳遞下去就很有可能造成雪崩效應,即整個業務聯調發生異常,比如業務整體超時,或者訂單資料不一致。
那麼核心問題就來了,如何檢測業務處於異常狀態?
成功率!成功率直接反映了業務的資料流轉狀態,是最直接的業務表現。
當然,也可以根據超時時間做判斷,比如 Sentinel 的實現。其實這裡概念上可以做一個轉化,用時間做超時控制,超時=失敗,這依然是一個成功率的概念。
3. 如何保護
如同豪豬一樣,“刺”就是他的保護工具,所有的攻擊都會被刺無情的懟回去。
在 Hystrix 的實現中,這就出現了“熔斷器”的概念,即當前的系統是否處於需要保護的狀態。
當熔斷器處於開啟的狀態時,所有的請求都不會真正的走之前的業務邏輯,而是直接傳回一個約定的資訊,即 FallBack。透過這種快速失敗原則保護我們的系統。
但是,系統不應該永遠處於“有刺”的狀態,當危險過後需要恢復正常。
於是對熔斷器的核心操作就是如下幾個功能:
- 如果成功率過低,就開啟熔斷器,阻止正常業務
- 隨著時間的流動,熔斷器處於半開啟狀態,嘗試性放入一筆請求
熔斷器的核心 API 如下圖:
4. 限流、熔斷、隔離、降級
這四個概念是我們談起微服務會經常談到的概念,這裡我們討論的是 Hystrix 的實現方式。
限流
- 這裡的限流與 Guava 的 RateLimiter 的限流差異比較大,一個是為了“保護自我”,一個是“保護下游”
- 當對服務進行限流時,超過的流量將直接 Fallback,即熔斷。而 RateLimiter 關心的其實是“流量整形”,將不規整流量在一定速度內規整
熔斷
- 當我的應用無法提供服務時,我要對上游請求熔斷,避免上游把我壓垮
- 當我的下游依賴成功率過低時,我要對下游請求熔斷,避免下游把我拖垮
降級
- 降級與熔斷緊密相關,熔斷後業務如何表現,約定一個快速失敗的 Fallback,即為服務降級
隔離
- 業務之間不可互相影響,不同業務需要有獨立的執行空間
- 最徹底的,可以採用物理隔離,不同的機器部
- 次之,採用行程隔離,一個機器多個 Tomcat
- 次之,請求隔離
- 由於 Hystrix 框架所屬的層級為程式碼層,所以實現的是請求隔離,執行緒池或訊號量
5. 原始碼分析
先上一個 Hystrix 的業務流程圖
可以看到 Hystrix 的請求都要經過 HystrixCommand 的包裝,其核心邏輯在 AbstractComman.java 類中。
下麵的原始碼是基於 RxJava 的,看之前最好先瞭解下 RxJava 的常見用法與邏輯,否則看起來會很迷惑。
簡單的說,RxJava 就是基於回呼的函式式程式設計。通俗的說,就等同於策略樣式的匿名內部類實現。
5.1 熔斷器
首先看訊號量是如何影響我們請求的:
private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
// 自定義擴充套件
executionHook.onStart(_cmd);
//判斷熔斷器是否允許請求過來
if (circuitBreaker.attemptExecution()) {
//獲得分組訊號量,如果沒有採用訊號量分組,傳回預設透過的訊號量實現
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//呼叫終止的回呼函式
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
//呼叫異常的回呼函式
final Action1 markExceptionThrown = new Action1() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//根據訊號量嘗試競爭訊號量
if (executionSemaphore.tryAcquire()) {
try {
//競爭成功,註冊執行引數
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//競爭失敗,進入fallback
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔斷器已開啟,進入fallback
return handleShortCircuitViaFallback();
}
}
什麼時候熔斷器可以放請求進來:
@Override
public boolean attemptExecution() {
//動態屬性判斷,熔斷器是否強制開著,如果強制開著,就不允許請求
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
//如果強制關閉,就允許請求
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//如果當前是關閉,就允許請求
if (circuitOpened.get() == -1) {
return true;
} else {
//如果當前開著,就看是否已經過了"滑動視窗",過了就可以請求,不過就不可以
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
//這裡使用CAS的方式,只有一個請求能過來,即"半關閉"狀態
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
這裡有個重要概念就是”滑動視窗”:
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
//滑動視窗的判斷就是看看熔斷器開啟的時間與現在相比是否超過了配置的滑動視窗
return currentTime > circuitOpenTime + sleepWindowTime;
}
5.2 隔離
如果將業務請求進行隔離?
private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) {
//判斷隔離策略是什麼,是執行緒池隔離還是訊號量隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
//執行緒池隔離的執行邏輯如下
return Observable.defer(new Func0>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//按照配置生成監控資料
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
//執行擴充套件點邏輯
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.empty();
}
}
//註冊各種場景的回呼函式
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
//將邏輯放在執行緒池的排程器上執行,即將上述邏輯放入執行緒池中
}).subscribeOn(threadPool.getScheduler(new Func0() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
//走到這裡就是訊號量隔離,在當前執行緒中執行,沒有排程器
return Observable.defer(new Func0>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
5.3 核心執行流程
private Observable executeCommandAndObserve(final AbstractCommand _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//執行發生的回呼
final Action1 markEmits = new Action1() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
//執行成功的回呼,標記下狀態,熔斷器根據這個狀態維護熔斷邏輯
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
//執行失敗的回呼
final Func1> handleFallback = new Func1>() {
@Override
public Observable call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
//各種回呼進行各種fallback
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1super
R>> setRequestContext = new Action1super R>>() {
public void call(Notification super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
//註冊各種回呼函式
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
6. 小結
- Hystrix 是基於單機應用的熔斷限流框架
- 根據熔斷器的滑動視窗判斷當前請求是否可以執行
- 執行緒競爭實現“半關閉”狀態,拿一個請求試試是否可以關閉熔斷器
- 執行緒池隔離將請求丟到執行緒池中執行,限流依靠執行緒池拒絕策略
- 訊號量隔離在當前執行緒中執行,限流依靠併發請求數
- 當訊號量競爭失敗/執行緒池佇列滿,就進入限流樣式,執行 Fallback
- 當熔斷器開啟,就熔斷請求,執行 Fallback
- 整個框架採用的 RxJava 的程式設計樣式,回呼函式滿天飛
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能
喜歡就點「好看」唄~