摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 SkyWalking 3.2.6 正式版
- 1. 概述
- 2. collector-remote-define
- 2.1 RemoteModule
- 2.2 RemoteSenderService
- 2.3 RemoteClientService
- 2.4 RemoteClient
- 2.5 CommonRemoteDataRegisterService
- 2.6 RemoteSerializeService
- 2.7 RemoteSerializeService
- 3. collector-remote-grpc-provider
- 3.1 RemoteModuleGRPCProvider
- 3.2 GRPCRemoteSenderService
- 3.3 GRPCRemoteClientService
- 3.4 GRPCRemoteClient
- 3.5 RemoteCommonServiceHandler
- 3.6 GRPCRemoteSerializeService
- 3.7 GRPCRemoteDeserializeService
- 4. collector-remote-grpc-provider
1. 概述
本文主要分享 SkyWalking Collector Remote 遠端通訊服務。該服務用於 Collector 叢集內部通訊。
目前叢集內部通訊的目的,跨節點的流式處理。Remote Module 應用在 SkyWalking 架構圖如下位置( 紅框 ) :
FROM https://github.com/apache/incubating-skywalking
下麵我們來看看整體的專案結構,如下圖所示 :
collector-remote-define
:定義遠端通訊介面。collector-remote-kafka-provider
:基於 Kafka 的遠端通訊實現。目前暫未完成。collector-remote-grpc-provider
:基於 Google gRPC 的遠端通訊實現。生產環境目前使用
下麵,我們從介面到實現的順序進行分享。
2. collector-remote-define
collector-remote-define
:定義遠端通訊介面。專案結構如下 :
整體流程如下圖:
我們按照整個流程的處理順序,逐個解析涉及到的類與介面。
2.1 RemoteModule
org.skywalking.apm.collector.remote.RemoteModule
,實現 Module 抽象類,遠端通訊 Module 。
#name()
實現方法,傳回模組名為 "remote"
。
#services()
實現方法,傳回 Service 類名:RemoteSenderService 、RemoteDataRegisterService 。
2.2 RemoteSenderService
org.skywalking.apm.collector.remote.service.RemoteSenderService
,繼承 Service 介面,遠端傳送服務介面,定義了 #send(graphId, nodeId, data, selector)
介面方法,呼叫 RemoteClient ,傳送資料。
graphId
方法引數,Graph 編號。透過graphId
,可以查詢到對應的 Graph 物件。- Graph 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「2. apm-collector-core/graph」 有詳細解析。
nodeId
方法引數,Worker 編號。透過workerId
,可以查詢在 Graph 物件中的 Worker 物件,從而 Graph 中的流式處理。- Worker 在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》「3. apm-collector-stream」 有詳細解析。
data
方法引數,Data 資料物件。例如,流式處理的具體資料物件。- Data 在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》「2. apm-collector-core」 有詳細解析。
selector
方法引數,org.skywalking.apm.collector.remote.service.Selector
選擇器物件。根據 Selector 物件,使用對應的負載均衡策略,選擇叢集內的 Collector 節點,傳送資料。- RemoteSenderService.Mode 傳回值,傳送樣式分成
Remote
和Local
兩種方式。前者,傳送資料到遠端的 Collector 節點;後者,傳送資料到本地,即本地處理,參見RemoteWorkerRef#in(message)
方法。
2.3 RemoteClientService
org.skywalking.apm.collector.remote.service.RemoteClientService
,繼承 Service 介面,遠端客戶端服務介面,定義了 #create(host, port, channelSize, bufferSize)
介面方法,建立 RemoteClient 物件。
2.4 RemoteClient
org.skywalking.apm.collector.remote.service.RemoteClient
,繼承 java.lang.Comparable
介面,遠端客戶端介面。定義瞭如下介面方法:
#push(graphId, nodeId, data, selector)
介面方法,傳送資料。#getAddress()
介面方法,傳回客戶端連線的遠端 Collector 地址。#equals(address)
介面方法,判斷 RemoteClient 是否連線了指定的地址。
2.5 CommonRemoteDataRegisterService
在說 CommonRemoteDataRegisterService 之前,首先來說下 CommonRemoteDataRegisterService 的意圖。
在上文中,我們可以看到傳送給 Collector 是 Data 物件,而 Data 是資料的抽象類,在具體反序列化 Data 物件之前,程式是無法得知它是 Data 的哪個實現物件。這個時候,我們可以給 Data 物件的每個實現類,生成一個對應的資料協議編號。
- 在傳送資料之前,序列化 Data 物件時,增加該 Data 對應的協議編號,一起傳送。
- 在接收資料之後,反序列化資料時,根據協議編號,建立 Data 對應的實現類物件。
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService
,通用遠端資料註冊服務。
id
屬性,資料協議自增編號。dataClassMapping
屬性,資料型別( Class extends Data> )與資料協議編號的對映。dataInstanceCreatorMapping
屬性,資料協議編號與資料物件建立器( RemoteDataInstanceCreator )的對映。
2.5.1 RemoteDataRegisterService
org.skywalking.apm.collector.remote.service.RemoteDataRegisterService
,繼承 Service 介面,遠端客戶端服務介面,定義了 #register(Class extends Data>, RemoteDataInstanceCreator)
介面方法,註冊資料型別對應的遠端資料建立器( RemoteDataRegisterService.RemoteDataInstanceCreator
)物件。
CommonRemoteDataRegisterService 實現了 RemoteDataRegisterService 介面,#register(Class extends Data>, RemoteDataInstanceCreator)
實現方法。
另外,AgentStreamRemoteDataRegister 會呼叫 RemoteDataRegisterService#register(Class extends Data>, RemoteDataInstanceCreator)
方法,註冊每個資料型別的 RemoteDataInstanceCreator 物件。註意,例如 Application::new
是 RemoteDataInstanceCreator 的匿名實現類。
2.5.2 RemoteDataIDGetter
org.skywalking.apm.collector.remote.service.RemoteDataIDGetter
,繼承 Service 介面,遠端資料協議編號獲取器介面,定義了 #getRemoteDataId(Class extends Data>)
介面方法,根據資料型別獲取資料協議編號。
CommonRemoteDataRegisterService 實現了 RemoteDataIDGetter 介面,#getRemoteDataId(Class extends Data>)
實現方法。
2.5.3 RemoteDataInstanceCreatorGetter
org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter
,繼承 Service 介面,遠端資料建立器的獲取器介面,定義了 #getInstanceCreator(remoteDataId
介面方法,根據資料協議編號獲得遠端資料建立器( RemoteDataInstanceCreator )。
CommonRemoteDataRegisterService 實現了 RemoteDataInstanceCreatorGetter 介面,#getInstanceCreator(remoteDataId)
實現方法。
2.6 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteSerializeService
,遠端通訊序列化服務介面,定義了 #serialize(Data)
介面方法,序列化資料,生成 Builder 物件。
2.7 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteDeserializeService
,遠端通訊序反列化服務介面,定義了 #deserialize(RemoteData, Data)
介面方法,反序列化傳輸資料。
3. collector-remote-grpc-provider
collector-remote-grpc-provider
,基於 Google gRPC 的遠端通訊實現。
專案結構如下 :
預設配置,在 application-default.yml
已經配置如下:
remote:
gRPC:
host: localhost
port: 11800
3.1 RemoteModuleGRPCProvider
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider
,實現 ModuleProvider 抽象類,基於 gRPC 的元件服務提供者實現類。
#name()
實現方法,傳回元件服務提供者名為 "gRPC"
。
module()
實現方法,傳回元件類為 RemoteModule 。
#requiredModules()
實現方法,傳回依賴元件為 cluster
、gRPC_manager
。
#prepare(Properties)
實現方法,執行準備階段邏輯。
- 第 53 至 56 行 :建立 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 物件,並呼叫
#registerServiceImplementation()
父類方法,註冊到services
。
#start()
實現方法,執行啟動階段邏輯。
- Server 相關
- 第 65 行:建立 gRPC Server 物件。
- 第 67 行:註冊 RemoteCommonServiceHandler 物件到 gRPC Server 上,用於接收 gRPC 請求後的處理。
- 《SkyWalking 原始碼分析 —— Collector Server Component 伺服器元件》「3. gRPC 實現」
- 《SkyWalking 原始碼分析 —— Collector gRPC Server Manager》
- 註冊發現相關
- 第 70 至 71 行:建立
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration
物件,將自己註冊到叢集管理。這樣,自己可以被 Collector 叢集節點發現,從而被呼叫。 - 第 73 至 74 行:註冊 GRPCRemoteSenderService 物件到叢集管理。這樣,自己可以監聽到 Collector 叢集節點的加入或離開,從而呼叫。
- 《SkyWalking 原始碼分析 —— Collector Cluster 叢集管理》
- 第 70 至 71 行:建立
#notifyAfterCompleted()
實現方法,方法為空。
3.2 GRPCRemoteSenderService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService
,繼承 ClusterModuleListener 抽象類,實現 RemoteSenderService 介面,基於 gPRC 的遠端傳送服務實現類。
3.2.1 註冊發現
透過繼承 ClusterModuleListener 抽象類,實現了監聽 Collector 叢集節點的加入或離開。
remoteClients
屬性,連線 Collector 叢集節點的客戶端陣列。每個 Collector 叢集節點,對應一個客戶端。#path()
實現方法,傳回監聽的目錄"/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME
。Collector 叢集中,每個節點的 Remote Server 都會註冊到該目錄下。#serverJoinNotify(serverAddress)
實現方法,當新的節點加入,建立新的客戶端連線。#serverQuitNotify(serverAddress)
實現方法,當老的節點離開,移除對應的客戶端連線。
3.2.2 負載均衡
RemoteModuleGRPCProvider 基於不同的選擇器 ( Selector ) ,提供不同的客戶端選擇( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector
)實現 :
hashCodeSelector
屬性,HashCodeSelector ,基於資料的雜湊碼。foreverFirstSelector
屬性,ForeverFirstSelector ,基於客戶端陣列的順序,選擇第一個。rollingSelector
屬性,RollingSelector ,基於客戶端陣列的順序,順序向下選擇。#send(graphId, nodeId, data, selector)
方法,程式碼如下:- 第 76 至 77 行:當選擇的客戶端連線的是本地時,不傳送資料,交給本地處理,參見
RemoteWorkerRef#in(message)
方法。 - 第 78 至 81 行:當選擇的客戶端連線的是遠端時,呼叫
RemoteClient#push(graphId, nodeId, data)
方法,傳送資料。 - 第 63 、66 、69 行:根據選擇器,呼叫
RemoteClientSelector#select(clients, data)
方法,選擇客戶端。 - 第 64 、67 、70 行:呼叫
#sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data)
方法,傳送請求資料。
- 第 76 至 77 行:當選擇的客戶端連線的是本地時,不傳送資料,交給本地處理,參見
3.3 GRPCRemoteClientService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService
,實現 RemoteClientService 介面,基於 gRPC 的遠端客戶端服務實現類。
#create(host, port, channelSize, bufferSize)
實現方法,建立 GRPCRemoteClient 物件。
3.4 GRPCRemoteClient
友情提示:本小節會涉及較多 gRPC 相關的知識,建議不熟悉的胖友自己 Google ,補充下姿勢。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient
,實現 RemoteClient 介面,基於 gRPC 的遠端客戶端實現類。
client
屬性,GRPCClient 物件。相比來說,GRPCRemoteClient 偏業務的封裝,內部呼叫 GRPCClient 物件。carrier
屬性,DataCarrier 物件,本地訊息佇列。GRPCRemoteClient 在被呼叫傳送資料時,先提交到本地佇列,非同步消費進行傳送到遠端 Collector 節點。DataCarrier 在 《SkyWalking 原始碼分析 —— DataCarrier 非同步處理庫》 詳細解析。- 第 63 行:呼叫
DataCarrier#consume(IConsumer, num)
方法,設定消費者為 RemoteMessageConsumer 物件。
- 第 63 行:呼叫
#push(graphId, nodeId, data)
實現方法,非同步傳送訊息到遠端 Collector 。
- 第 73 行:呼叫
RemoteDataIDGetter#getRemoteDataId(Class extends Data>)
方法,獲得資料協議編號。 - 第 76 至 80 行:建立傳輸資料( RemoteMessage.Builder ) 物件。RemoteMessage 透過 Protobuf 建立定義,如下圖所示:
- 第 83 行:呼叫
DataCarrier#produce(data)
方法,傳送資料到本地佇列。
RemoteMessageConsumer ,批次消費本地佇列的資料,逐條傳送資料到遠端 Collector 節點。
#consume(List)
實現方法,程式碼如下:- 第 100 行:建立 StreamObserver 物件。StreamObserver 主要是 gPRC 相關的 API 的呼叫。
- 第 101 至 103 行:呼叫
io.grpc.stub.StreamObserver#onNext(RemoteMessage)
方法,逐條傳送資料。 - 第 106 行:呼叫
io.grpc.stub.StreamObserver#onCompleted()
方法,全部請求資料傳送完成。
3.5 RemoteCommonServiceHandler
org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler
,實現 org.skywalking.apm.collector.server.grpc.GRPCHandler
介面,繼承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象類,遠端通訊通用邏輯處理器。
其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto
檔案的定義如下圖:
#call(StreamObserver)
實現方法,程式碼如下:
#onNext(RemoteMessage)
方法,處理每一條訊息,程式碼如下:- 第 65 行:呼叫
RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId)
方法,獲得資料協議編號對應的 RemoteDataInstanceCreator 物件。然後,呼叫RemoteDataInstanceCreator#createInstance(id)
方法,建立資料協議編號對應的 Data 實現類對應的物件。 - 第 70 行:呼叫
GraphManager#findGraph(graphId)
方法,獲得graphId
對應的 Graph 物件。然後,調動GraphNodeFinder#findNext(nodeId)
方法,獲得 Next 物件。 - 第 71 行:呼叫
Next#execute(Data)
方法,繼續流式處理。
- 第 65 行:呼叫
3.6 GRPCRemoteSerializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService
,實現 RemoteSerializeService 介面,基於 gRPC 的遠端通訊序列化服務實現類。
3.7 GRPCRemoteDeserializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService
,實現 GRPCRemoteDeserializeService 介面,基於 gRPC 的遠端通訊反序列化服務實現類。
4. collector-remote-grpc-provider
collector-remote-kafka-provider
:基於 Kafka 的遠端通訊實現。
目前暫未完成。
TODO 【4005】collector-remote-grpc-provider
朋友會在“發現-看一看”看到你“在看”的內容