摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-storage-module/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 SkyWalking 3.2.6 正式版
- 1. 概述
- 2. apm-collector-core
- 3. collector-storage-define
- 4. collector-storage-h2-provider
- 5. collector-storage-es-provider
1. 概述
本文主要分享 SkyWalking Collector Storage 儲存元件。顧名思義,負責將呼叫鏈路、應用、應用實體等等資訊儲存到儲存器,例如,ES 、H2 。
友情提示:建議先閱讀 《SkyWalking 原始碼分析 —— Collector 初始化》 ,以瞭解 Collector 元件體系。
FROM https://github.com/apache/incubating-skywalking
下麵我們來看看整體的專案結構,如下圖所示 :
apm-collector-core
的data
和define
包 :資料的抽象。collector-storage-define
:定義儲存元件介面。collector-storage-h2-provider
:基於 H2 的 儲存元件實現。該實現是單機版,建議僅用於 SkyWalking 快速上手,生產環境不建議使用。collector-storage-es-provider
:基於 Elasticsearch 的叢集管理實現。生產環境推薦使用。
下麵,我們從介面到實現的順序進行分享。
2. apm-collector-core
apm-collector-core
的 data
和 define
包,如下圖所示:
我們對類進行梳理分類,如下圖:
- Table :Data 和 TableDefine 之間的橋梁,每個 Table 定義了該表的表名,欄位名們。
- TableDefine :Table 的詳細定義,包括表名,欄位定義( ColumnDefine )們。在下文中,StorageInstaller 會基於 TableDefine 初始化表的相關資訊。
- Data :資料,包括一條資料的資料值們和資料欄位( Column )們。在下文中,Dao 會儲存 Data 到儲存器中。另外,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》 中,我們也會看到對 Data 的流式處理通用封裝。
2.1 Table
org.skywalking.apm.collector.core.data.CommonTable
,通用表。
- `TABLE_TYPE` 靜態屬性,表型別。目前只有 ES 儲存元件使用到,下文詳細解析。
- `COLUMN_` 字首的靜態屬性,通用的欄位名。
在 collector-storage-define
的 table
包下,我們可以看到所有 Table 類,以 "Table"
結尾。每個 Table 的表名,在每個實現類裡,例如 ApplicationTable 。
2.2 TableDefine
org.skywalking.apm.collector.core.data.TableDefine
,表定義抽象類。
- `name` 屬性,表名。
- `columnDefines` 屬性,ColumnDefine陣列。
- `#initialize()` 抽象方法,初始化表定義。例如:ApplicationEsTableDefine 。
不同的儲存元件實現,有不同的 TableDefine 實現類,如下圖:
-
ElasticSearchTableDefine :基於 Elasticsearch 的表定義抽象類,在
collector-storage-es-provider
的define
包下,我們可以看到所有 ES 的 TableDefine 類。 -
H2TableDefine :基於 H2 的表定義抽象類,在
collector-storage-h2-provider
的 `define` 包下,我們可以看到所有 H2 的 TableDefine 類。
2.2.1 ColumnDefine
org.skywalking.apm.collector.core.data.ColumnDefine
,欄位定義抽象類。
- `name` 屬性,欄位名。
- `type` 屬性,欄位型別。
在 collector-storage-xxx-provider
模組中,H2ColumnDefine 、ElasticSearchColumnDefine 實現 ColumnDefine 。
2.2.2 Loader
涉及到的類如下圖所示:
org.skywalking.apm.collector.core.data.StorageDefineLoader
,呼叫 org.skywalking.apm.collector.core.define.DefinitionLoader
,從 org.skywalking.apm.collector.core.data.StorageDefinitionFile
中,載入 TableDefine 實現類陣列。
另外,在 collector-storage-es-provider
和 collector-storage-h2-provider
裡都有 storage.define
檔案,如下圖:
- StorageDefinitionFile 宣告了讀取該檔案。
- 註意,DefinitionLoader 在載入時,兩個檔案都會被讀取,最終在
StorageInstaller#defineFilter(List)
方法,進行過濾。
程式碼比較簡單,中文註釋已加,胖友自己閱讀理解下。
2.3 Data
org.skywalking.apm.collector.core.data.Data
,資料抽象類。
- [
dataXXX
]() 字首的屬性,欄位值們。- `dataStrings` 屬性的第一位,是 ID 屬性。參見 構造方法的【第 51 行】 或者 `#setId(id)` 方法。
- [
xxxColumns
]() 字尾的屬性,欄位( Column )們。 - 透過上述兩種屬性 + 自身類,可以確定一條資料記錄的表、欄位型別、欄位名、欄位值。
- 繼承 `org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage` ,帶是否訊息批處理的最後一條標記的訊息抽象類,`endOfBatch` 屬性,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。
- 繼承 `org.skywalking.apm.collector.core.data.AbstractHashMessage` ,帶雜湊碼的訊息抽象類,`hashCode` 屬性,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。
- `#mergeData(Data)` 方法,合併傳入的資料到自身。該方法被 `AggregationWorker#aggregate(message)` 呼叫,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「3. AggregationWorker」 詳細解析。
在 collector-storage-define
的 table
包下,我們可以看到所有 Data 類,非 "Table"
結尾,例如 Application 。
2.3.1 Column
org.skywalking.apm.collector.core.data.Column
,欄位。
- `name` 屬性,欄位名。
- `operation` 屬性,操作( Operation )。
2.3.2 Operation
org.skywalking.apm.collector.core.data.Operation
,操作介面。用於兩個值之間的操作,例如,相加等等。目前實現類有:
- AddOperation :值相加操作。
- CoverOperation :值改寫操作,即以新值為傳回。
- NonOperation :空操作,即以老值為傳回。
3. collector-storage-define
collector-cluster-define
:定義儲存元件介面。專案結構如下 :
3.1 StorageModule
org.skywalking.apm.collector.storage.StorageModule
,實現 Module 抽象類,叢集管理 Module 。
#name()
實現方法,傳回模組名為 "storage"
。
#services()
實現方法,傳回 Service 類名:在 org.skywalking.apm.collector.storage.dao 包下的所有類 和 IBatchDAO。
3.2 table 包
在 org.skywalking.apm.collector.storage.table
包下,定義了儲存模組所有的 Table 和 Data 實現類。
3.3 StorageInstaller
org.skywalking.apm.collector.storage.StorageInstaller
,儲存安裝器抽象類,基於 TableDefine ,初始化儲存元件的表。
- `#defineFilter(List)` 抽象方法,過濾 TableDefine 陣列中,非自身需要的。例如說,ElasticSearchStorageInstaller 過濾後,只保留 ElasticSearchTableDefine 物件。
- `#isExists(Client, TableDefine)` 抽象方法,判斷表是否存在。
- `#deleteTable(Client, TableDefine)` 抽象方法,刪除表。
- `#createTable(Client, TableDefine)` 抽象方法,建立表。
- `#install(Client)` 方法,基於 TableDefine ,初始化儲存元件的表。
- 該方法會被 StorageModuleH2Provider 或 StorageModuleEsProvider 啟動時呼叫。
3.4 dao 包
在 collector-storage-define
專案結構圖,我們看到一共有兩個 bao
包:
org.skywalking.apm.collector.storage.base.dao
,系統的 DAO 介面。org.skywalking.apm.collector.storage.dao
,業務的 DAO 介面。- 繼承系統的 DAO 介面。
- 被 `collector-storage-xxx-provider` 的 `dao` 包實現。
3.4.1 系統 DAO
org.skywalking.apm.collector.storage.base.dao.DAO
,繼承 Service 介面,DAO 介面。
無任何方法。
3.4.1.1 AbstractDAO
org.skywalking.apm.collector.storage.base.dao.AbstractDAO
,實現 DAO 介面,DAO 抽象基類。
- `client` 屬性,資料操作客戶端。例如,H2Client 、ElasticSearchClient 。
在 collector-storage-xxx-provider
模組中,H2DAO 、EsDAO 實現 AbstractDAO 。
3.4.1.2 IPersistenceDAO
org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO
,實現 DAO 介面,持久化 DAO 介面,定義了 Data 的增刪改查操作。
- `#get(id)` 介面方法,根據 ID 查詢一條 Data 。
- `#deleteHistory(startTimestamp, endTimestamp)` 介面方法,刪除時間範圍內的 Data 們。
- `#prepareBatchInsert(data)` 介面方法,準備批次插入操作物件。例如:`CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric)` 方法,傳回的是
org.elasticsearch.action.index.IndexRequestBuilder
物件。註意:- 該方法不會發起具體的 DAO 操作,僅僅是建立插入操作物件,最終的執行在 `IBatchDAO#batchPersistence(List)`。
- 該方法建立的是批次插入操作物件們中的一個。
- `#prepareBatchUpdate(data)` 介面方法,準備批次更新操作物件。類似
#prepareBatchInsert(data)
方法。
3.4.1.3 IBatchDAO
org.skywalking.apm.collector.storage.base.dao.IBatchDAO
,實現 DAO 介面,批次操作 DAO 介面。
- `#batchPersistence(List batchCollection)` 介面方法,透過執行批次操作物件陣列,實現批次持久化資料。
- `batchCollection` 方法引數,透過 `IPersistenceDAO#prepareBatchInsert` 或 `IPersistenceDAO#prepareBatchUpdate` 方法,生成每個運算元組元素。
- 該方法會被 `PersistenceTimer#extractDataAndSave(…)` 或 `PersistenceWorker#onWork(…)` 方法呼叫,在 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(二)》「4. PersistenceWorker」 詳細解析。
在 collector-storage-xxx-provider
模組中,BatchH2DAO 、BatchEsDAO 實現 IBatchDAO 。
3.4.2 業務 DAO
在 StorageModule#services()
方法裡,我們可以看到,業務 DAO 按照用途可以拆分成四種:
- Cache :快取應用、應用實體、服務名
- Register :註冊應用、應用實體、服務名
- Persistence :持久化,實際可以理解成批次持久化
- UI :SkyWaling UI 查詢使用。
那麼整理如下:
Package | Data | Cache / Register | Persistence | UI | 關聯文章 |
---|---|---|---|---|---|
register | Application | √ | |||
register | Instance | √ | √ | √ | |
register | ServiceName | √ | |||
jvm | CpuMetric | √ | √ | ||
jvm | CMetric | √ | √ | ||
jvm | MemoryMetric | √ | √ | ||
jvm | MemoryPoolMetric | √ | √ | ||
global | GlobalTrace | √ | √ | ||
instance | InstPerformance | √ | √ | ||
node | NodeComponent | √ | √ | ||
node | NodeMapping | √ | √ | ||
noderef | NodeReference | √ | √ | ||
segment | SegmentCost | √ | √ | ||
segment | Segment | √ | √ | ||
service | ServiceEntry | √ | √ | ||
serviceref | ServiceReference | √ | √ |
4. collector-storage-h2-provider
collector-storage-h2-provider
,基於 H2 的儲存元件實現。專案結構如下 :
該實現是單機版,建議僅用於 SkyWalking 快速上手,生產環境不建議使用。
由於生產環境主要使用 ES 的儲存元件實現,所以本文暫不解析相關實現,感興趣的胖友自己嗨起來。
5. collector-storage-es-provider
collector-storage-es-provider
,基於 ES 的儲存元件實現。專案結構如下 :
實際使用時,透過 application.yml
配置如下:
JSON storage: elasticsearch: cluster_name: elasticsearch cluster_transport_sniffer: true cluster_nodes: 127.0.0.1:9300 index_shards_number: 2 index_replicas_number: 0 ttl: 7
- 生產環境下,推薦 Elasticsearch 配置成叢集。
cluster_name
、cluster_transport_sniffer
、cluster_nodes
、index_shards_number
、index_replicas_number
引數,Elasticsearch 相關引數。ttl
:保留 N 天內的資料。超過 N 天的資料,將被自動滾動刪除。- 該功能目前版本暫未釋出,需要等到 5.0 版本後。
- 《部署叢集collector》
5.1 StorageModuleEsProvider
org.skywalking.apm.collector.storage.es.StorageModuleEsProvider
,實現 ModuleProvider抽象類,基於 ES 的儲存元件服務提供者。
#name()
實現方法,傳回元件服務提供者名為 "elasticsearch"
。
module()
實現方法,傳回元件類為 StorageModule 。
#requiredModules()
實現方法,傳回依賴元件為 "cluster"
。
#prepare(Properties)
實現方法,執行準備階段邏輯。
- 第 71 至 75 行 :建立 `org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient` 物件。
- 第 77 至 82 行 :建立 DAO 物件們,並呼叫
#registerServiceImplementation()
父類方法,註冊到services
。
#start()
實現方法,執行啟動階段邏輯。
- 第 90 行 :呼叫
ElasticSearchClient#initialize()
方法,初始化 ZookeeperClient 。 - 第 93 至 94 行 :建立 ElasticSearchStorageInstaller 物件,初始化儲存元件的表。在 「5.2.4 ElasticSearchStorageInstaller」 詳細解析。
- 第 100 至 102 行 :建立 `org.skywalking.apm.collector.storage.es.StorageModuleEsRegistration` 物件,並註冊資訊到叢集管理。在 《SkyWalking 原始碼分析 —— Collector Cluster 叢集管理》 有詳細解析。
- 第 105 至 107 行 :建立 `org.skywalking.apm.collector.storage.es.StorageModuleEsNamingListener`物件,並註冊資訊到叢集管理。在 《SkyWalking 原始碼分析 —— Collector Cluster 叢集管理》 有詳細解析。
- 第 110 至 111 行 :建立 DataTTLKeeperTimer 物件。在 「5.4 DataTTLKeeperTimer」 詳細解析。
#notifyAfterCompleted()
實現方法,執行啟動完成邏輯。
- 第 115 行 :呼叫
DataTTLKeeperTimer#start()
方法,啟動 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 詳細解析。
5.2 define 包
在 collector-storage-es-provider
專案結構圖,我們看到一共有兩個 define
包:
org.skywalking.apm.collector.storage.es.base.define
,系統的 TableDefine 抽象類。org.skywalking.apm.collector.storage.es.define
,業務的 TableDefine 實現類。- 繼承系統的 TableDefine 抽象類。
5.2.1 ElasticSearchTableDefine
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine
,實現 TableDefine 介面,基於 Elasticsearch 的表定義抽象類。
- `#type()` 方法,檔案元資料
_type
欄位,參見 《Elasticsearch學習筆記》「_type」 。 - `#refreshInterval()` 抽象方法,檔案索引掃清頻率,參見 《Elasticsearch: 權威指南 » 基礎入門 » 分片內部原理 » 近實時搜尋》「refresh API」。
5.2.2 ElasticSearchColumnDefine
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine
,實現 ColumnDefine 抽象類,基於 ES 的欄位定義。
- Type 列舉類:列舉 ES 欄位型別。
5.2.3 業務 TableDefine 實現類
在 org.apache.skywalking.apm.collector.storage.es.define
包裡,我們可以看到,所有基於 ES 的業務 TableDefine 實現類。例如:ApplicationEsTableDefine 。
整體 #refreshInterval()
方法傳回的結果如下:
- 1 s
- CpuMetricEsTableDefine
- GCMetricEsTableDefine
- MemoryMetricEsTableDefine
- MemoryPoolMetricEsTableDefine
- 2 s
- InstPerformanceEsTableDefine
- NodeComponentEsTableDefine
- NodeMappingEsTableDefine
- NodeReferenceEsTableDefine
- ServiceEntryEsTableDefine
- ServiceReferenceEsTableDefine
- 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
- 【WriteRequest.RefreshPolicy.IMMEDIATE】參見 `ApplicationEsRegisterDAO#save(Application)` 方法
- ApplicationEsTableDefine
- InstanceEsTableDefine
- ServiceNameEsTableDefine
- 5 s
- GlobalTraceEsTableDefine
- SegmentCostEsTableDefine
- 10 s
- SegmentEsTableDefine
5.2.4 ElasticSearchStorageInstaller
友情提示:ElasticSearchStorageInstaller 主要是對 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller
,實現 StorageInstaller 抽象類, 基於 ES 儲存安裝器實現類。
- `#defineFilter(List)` 實現方法,過濾陣列中,非 ElasticSearchTableDefine 的元素。
- `#createTable(Client, TableDefine)` 實現方法,建立 Elasticsearch 索引。
- SkyWalking 彭勇升 :`_index`和 `_type` 是 ES 特有的,考慮其他資料庫接入,所以沒有用他這個特性。
- SkyWalking QQ交流群( 392443393 ) ,小心 群友 :`_type` 本來就沒做物理隔離,Lucene 層面也不存在,ES 6.x 已經廢棄了。
- 《Elasticsearch 6.0 將移除 Type》
- `_id` :資料編號,String 型別。
- `_type` :`”type”` 。
- `_index` :TableDefine 定義的表名。
- `source` :Data 資料。
- 檔案資料結構如下:
- 瞭解 Elasticsearch 的胖友可能有和筆者一樣的疑惑,網路上很多文章把 `_index` 類比成關係資料庫的 DB ,`_type` 類比成關係資料庫的 Table ,和 SkyWalking 目前使用的方式不一致?
- `#deleteTable(Client, TableDefine)` 實現方法,刪除 Elasticsearch 索引。
- `#isExists(Client, TableDefine)` 實現方法,判斷 Elasticsearch 索引是否存在。
- 在方法裡,筆者添加了一些 API 的說明,不熟悉的胖友,可以仔細閱讀理解。
5.3 dao 包
在 collector-storage-es-provider
專案結構圖,我們看到一共有兩個 dao
包:
org.skywalking.apm.collector.storage.es.base.dao
,系統的 DAO 抽象類。org.skywalking.apm.collector.storage.es.dao
,業務的 DAO 實現類。- 繼承系統的 DAO 抽象類。
5.3.1 EsDAO
org.skywalking.apm.collector.storage.es.base.dao.EsDAO
,實現 AbstractDAO 抽象類,基於 ES 的 DAO 抽象類。
- `#getMaxId(indexName, columnName)` 方法,獲得索引名的指定欄位的最大值。
- `#getMinId(indexName, columnName)` 方法,獲得索引名的指定欄位的最小值。
5.3.2 BatchEsDAO
org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO
,實現 IBatchDAO 介面,繼承 EsDAO 抽象類,基於 ES 批次操作 DAO 實現類。
- `#batchPersistence(List)` 實現方法,將
org.elasticsearch.action.index.IndexRequestBuilder
和org.elasticsearch.action.index.UpdateRequestBuilder
陣列,建立成org.elasticsearch.action.bulk.BulkRequestBuilder
物件,批次持久化。- IndexRequestBuilder 和 UpdateRequestBuilder 的建立,在 「5.3.3 業務 DAO 實現類」 會看到。
5.3.3 業務 DAO 實現類
在 org.apache.skywalking.apm.collector.storage.es.dao
包裡,我們可以看到,所有基於 ES 的業務 DAO 實現類。
實現程式碼易懂,胖友可以自己閱讀。良心如我們,按照 DAO 的業務用途,推薦例子如下:
- Cache :ApplicationEsCacheDAO
- Register :ApplicationEsRegisterDAO
- Persistence :SegmentEsPersistenceDAO
- 此處可見 IndexRequestBuilder 和 UpdateRequestBuilder 的建立。
- UI :SegmentEsUIDAO
5.4 DataTTLKeeperTimer
org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer
,過期資料刪除定時器。透過該定時器,只保留 N 天內的資料。
- `#start()` 方法,啟動定時任務。
- 第 49 行:建立延遲 1 小時,每 8 小時執行一次 `#delete()` 方法的定時任務。目前該行程式碼被註釋,胖友可以等待 SkyWallking 5.0 版本的釋出。
- `#delete()` 方法,刪除過期資料。
- 第 54 至 66 行:計算刪除的開始與結束時間,即指定時間的前一天。例如,2017-12-23 執行時,刪除 2017-12-16 那天的資料。
- 第 69 行:呼叫 `#deleteJVMRelatedData(startTimestamp, endTimestamp)` 方法,刪除 JVM 相關的資料。
- 第 70 行:呼叫 `#deleteTraceRelatedData(startTimestamp, endTimestamp)` 方法,刪除 Trace 相關的資料。
如下是不會刪除的資料的表:
- Application
- Instance
- ServiceName
- ServiceEntry
朋友會在“發現-看一看”看到你“在看”的內容