摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-streaming-second/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝! 本文主要基於 SkyWalking 3.2.6 正式版 本文接 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》 ,主要分享 Collector Streaming 流式處理的第二部分。主要包含如下部分: AggregationWorker 和 PersistenceWorker ,都先聚合處理資料,在進行各自的後續處理。那麼聚合處理的資料結果,需要有容器進行快取暫存: 類圖如下: 構造方法 ,程式碼如下: 切換 Collection 相關,方法如下: 寫 Collection 相關,方法如下: 寫 Collection 相關,方法如下: 構造方法 ,程式碼如下: 考慮到需要保證儲存的時效性,PersistenceWorker 使用 PersistenceTimer ,定時儲存 Data ,在 「4.2 PersistenceWorker」 詳細解析。 構造方法 ,程式碼如下: 上述兩個抽象方法,用於 Worker 在建立時,會呼叫 記錄下來有什麼用呢?在 AgentStreamBootStartup 啟動時,建立 PersistenceTimer 物件,並將 WorkerCreateListener 記錄的 PersistenceWorker 物件集合傳遞給 PersistenceTimer 物件。這樣,PersistenceTimer 能夠”訪問“到 PersistenceWorker 物件們的 DataCache ,定時儲存資料。 第 55 至 68 行:獲得所有 PersistenceWorker 讀 Collection 快取的資料。 第 71 行:呼叫
1. 概述
2. Data
org.skywalking.apm.collector.core.cache
:介面org.skywalking.apm.collector.stream.worker.impl.data
:實現
2.1 Collection
org.skywalking.apm.collector.core.cache.Collection
,資料採集介面。
#collection()
/ #size()
/ #clear()
#reading()
/ #isReading()
/ #finishReading()
#writing()
/ #isWriting()
/ #finishWriting()
2.2 DataCollection
org.skywalking.apm.collector.stream.worker.impl.data.DataCollection
,實現 Collection 介面,資料採集實現類,使用 Map
作為資料容器。2.3 Window
org.skywalking.apm.collector.core.cache.Window
,視窗抽象類。
windowDataA
屬性,視窗資料A 。windowDataB
屬性,視窗資料B 。#collectionInstance()
抽象方法,建立視窗資料( Collection )物件。pointer
屬性,資料指向 windowDataA
或 windowDataA
。
#getCurrent()
方法,獲得現資料指向,即 pointer
。#getLast()
方法,獲得原資料指向,即非 pointer
。windowSwitch
屬性,視窗切換計數。
#trySwitchPointer()
方法,傳回是否可以切換 Collection 。可以切換需要滿足如下條件:
windowSwitch
屬性進行計數。#trySwitchPointerFinally()
方法,釋放 windowSwitch
的計數。#switchPointer()
方法,切換資料指向,並標記原資料指向的 Collection 正在讀取中。#finishReadingLast()
方法,清空原資料指向的 Collection 資料,並標記原資料指向的 Collection 完成讀取( 不在正在讀取中 )。
#getCurrentAndWriting()
方法,獲得現資料指向,並標記正在寫入中。透過正在寫入標記,切換 Collection 完成後,可以判斷該 Collection 正在寫入中,若是,等待不在寫入中,開始資料讀取並處理。2.4 DataCache
org.skywalking.apm.collector.stream.worker.impl.data.DataCache
,實現 Window 抽象類,資料快取。
#collectionInstance()
實現方法,建立 DataCollection 物件。#currentCollectionSize()
方法,獲得當前資料指向( 寫入 Collection )的資料數量。
#writing()
方法,呼叫 #getCurrentAndWriting()
方法,開始寫入。即,獲得現資料指向,並標記正在寫入中。
lockedDataCollection
屬性,寫入的視窗資料。#put(id, data)
方法,向 lockedDataCollection
屬性,寫入 Data 。#get(id)
方法,向 lockedDataCollection
屬性,根據 ID 獲得 Data 。#containsKey(id)
方法,向 lockedDataCollection
屬性,根據 ID 判斷 Data 是否存在 。#finishWriting()
方法,完成寫入。即,標記 lockedDataCollection
不在正在寫入中。3. AggregationWorker
org.skywalking.apm.collector.stream.worker.impl.AggregationWorker
,實現 AbstractLocalAsyncWorker 抽象類,非同步聚合 Worker,負責聚合處理資料,後提交 Data 到 Next 節點們處理。
dataCache
屬性,資料快取。messageNum
屬性,訊息計數。當超過一定數量( 目前是 100 ),重置計數歸零。
#onWork(message)
實現方法,聚合處理資料,當滿足條件時,提交 Data 到 Next 節點們處理。
messageNum
計數增加。#aggregate(message)
方法,聚合訊息到資料。messageNum >= 100
時,呼叫 #sendToNext()
,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。messageNum.endOfBatch == true
時,當訊息是批處理的最後一條時,呼叫 #sendToNext()
,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。
#sendToNext()
方法,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。
Window#switchPointer()
方法,切換資料指標,並標記原指向正在讀取中。這裡並未先呼叫 Window#trySwitchPointer()
方法,是否會有併發問題?目前這裡是非同步單執行緒,所以不會有問題,參見 《SkyWalking 原始碼分析 —— Collector Queue 佇列元件》 。另外,在 「4. PersistenceWorker」 會看到併發的情況處理。4. PersistenceWorker
org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker
,實現 AbstractLocalAsyncWorker 抽象類,非同步批次儲存 Worker,負責聚合處理資料,後儲存 Data 。
dataCache
屬性,資料快取。batchDAO
屬性,批次操作 DAO ,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》 有詳細解析。
#needMergeDBData()
抽象方法,儲存時,是否需要合併資料。一些 Data 只有新增操作,沒有更新操作。#persistenceDAO()
抽象方法,獲得 Data 對應的持久化 DAO 介面的實現類物件。#prepareBatch(dataMap)
方法,生成批次操作物件陣列,最終呼叫 IBatchDAO#batchPersistence(List >)
方法,透過執行批次操作物件陣列,實現批次持久化資料,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》 有詳細解析。
#onWork(message)
實現方法,當滿足條件時儲存 Data ,而後聚合資料。這點和 AggregationWorker 相反的,因為要考慮併發問題。程式碼如下:
DataCache#currentCollectionSize()
方法,獲得當前寫入 Collection 的資料數量,判斷是否超過 5000 。
DataCache#trySwitchPointer()
方法,判斷是否可以切換 Collection 。透過該判斷,保證和 PersistenceTimer 一起時,不會出現併發問題。Window#switchPointer()
方法,切換資料指標,並標記原指向正在讀取中。#buildBatchCollection()
方法,建立批次操作物件陣列。該方法和 AggregationWorker#sendToNext()
方法基本類似。IBatchDAO#batchPersistence(List >)
方法,透過執行批次操作物件陣列,實現批次持久化資料。DataCache#trySwitchPointerFinally()
方法,釋放 DataCache.windowSwitch
的計數。#aggregate(message)
方法,聚合資料。該方法和 AggregationWorker#aggregate(message)
方法基本相似。4.1 WorkerCreateListener
org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener
,Worker 建立監聽器。WorkerCreateListener#addWorker
方法,記錄所有的 PersistenceWorker 物件。4.2 PersistenceTimer
org.skywalking.apm.collector.stream.timer.PersistenceTimer
,持久化定時任務,負責定時、批次儲存 PersistenceWorker 快取的資料。#start(IBatchDAO, List)
方法,建立延遲 1 秒,每 1 秒執行一次 #extractDataAndSave()
方法的定時任務,用於定時、批次儲存 PersistenceWorker 快取的資料。#extractDataAndSave(IBatchDAO, List)
方法,程式碼如下:
PersistenceWorker#flushAndSwitch()
切換資料指標,即切換讀寫 Collection 。PersistenceWorker#buildBatchCollection()
方法,建立批次操作物件陣列。Window#trySwitchPointer()
方法,保證讀 Collection 正在被讀取中時,PersistenceWorker 和 PersistenceTimer 有且僅有一個切換佇列,讀取資料。當讀取完成後,呼叫 Window#finishReadingLast()
方法,清空原資料指向,並標記原資料指向完成正在讀取中。IBatchDAO#batchPersistence(List >)
方法,執行批次操作,進行儲存。
朋友會在“發現-看一看”看到你“在看”的內容