摘要: 原創出處 http://www.iocoder.cn/SkyWalking/collector-streaming-second/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝! 本文主要基於 SkyWalking 3.2.6 正式版 本文接 《SkyWalking 原始碼分析 —— Collector Streaming Computing 流式處理(一)》 ,主要分享 Collector Streaming 流式處理的第二部分。主要包含如下部分: AggregationWorker 和 PersistenceWorker ,都先聚合處理資料,在進行各自的後續處理。那麼聚合處理的資料結果,需要有容器進行快取暫存: 類圖如下: 構造方法 ,程式碼如下: 切換 Collection 相關,方法如下: 寫 Collection 相關,方法如下: 寫 Collection 相關,方法如下: 構造方法 ,程式碼如下: 考慮到需要保證儲存的時效性,PersistenceWorker 使用 PersistenceTimer ,定時儲存 Data ,在 「4.2 PersistenceWorker」 詳細解析。 構造方法 ,程式碼如下: 上述兩個抽象方法,用於 Worker 在建立時,會呼叫 記錄下來有什麼用呢?在 AgentStreamBootStartup 啟動時,建立 PersistenceTimer 物件,並將 WorkerCreateListener 記錄的 PersistenceWorker 物件集合傳遞給 PersistenceTimer 物件。這樣,PersistenceTimer 能夠”訪問“到 PersistenceWorker 物件們的 DataCache ,定時儲存資料。 第 55 至 68 行:獲得所有 PersistenceWorker 讀 Collection 快取的資料。 第 71 行:呼叫
1. 概述

2. Data
org.skywalking.apm.collector.core.cache :介面org.skywalking.apm.collector.stream.worker.impl.data :實現
2.1 Collection
org.skywalking.apm.collector.core.cache.Collection ,資料採集介面。
#collection() / #size() / #clear()#reading() / #isReading() / #finishReading()#writing() / #isWriting() / #finishWriting()2.2 DataCollection
org.skywalking.apm.collector.stream.worker.impl.data.DataCollection ,實現 Collection 介面,資料採集實現類,使用 Map 作為資料容器。2.3 Window
org.skywalking.apm.collector.core.cache.Window ,視窗抽象類。
windowDataA 屬性,視窗資料A 。windowDataB 屬性,視窗資料B 。#collectionInstance() 抽象方法,建立視窗資料( Collection )物件。pointer 屬性,資料指向 windowDataA 或 windowDataA。
#getCurrent() 方法,獲得現資料指向,即 pointer 。#getLast() 方法,獲得原資料指向,即非 pointer 。windowSwitch 屬性,視窗切換計數。
#trySwitchPointer() 方法,傳回是否可以切換 Collection 。可以切換需要滿足如下條件:
windowSwitch 屬性進行計數。#trySwitchPointerFinally() 方法,釋放 windowSwitch 的計數。#switchPointer() 方法,切換資料指向,並標記原資料指向的 Collection 正在讀取中。#finishReadingLast() 方法,清空原資料指向的 Collection 資料,並標記原資料指向的 Collection 完成讀取( 不在正在讀取中 )。
#getCurrentAndWriting() 方法,獲得現資料指向,並標記正在寫入中。透過正在寫入標記,切換 Collection 完成後,可以判斷該 Collection 正在寫入中,若是,等待不在寫入中,開始資料讀取並處理。2.4 DataCache
org.skywalking.apm.collector.stream.worker.impl.data.DataCache ,實現 Window 抽象類,資料快取。
#collectionInstance() 實現方法,建立 DataCollection 物件。#currentCollectionSize() 方法,獲得當前資料指向( 寫入 Collection )的資料數量。
#writing() 方法,呼叫 #getCurrentAndWriting() 方法,開始寫入。即,獲得現資料指向,並標記正在寫入中。
lockedDataCollection 屬性,寫入的視窗資料。#put(id, data) 方法,向 lockedDataCollection 屬性,寫入 Data 。#get(id) 方法,向 lockedDataCollection 屬性,根據 ID 獲得 Data 。#containsKey(id) 方法,向 lockedDataCollection 屬性,根據 ID 判斷 Data 是否存在 。#finishWriting() 方法,完成寫入。即,標記 lockedDataCollection 不在正在寫入中。3. AggregationWorker
org.skywalking.apm.collector.stream.worker.impl.AggregationWorker ,實現 AbstractLocalAsyncWorker 抽象類,非同步聚合 Worker,負責聚合處理資料,後提交 Data 到 Next 節點們處理。
dataCache 屬性,資料快取。messageNum 屬性,訊息計數。當超過一定數量( 目前是 100 ),重置計數歸零。
#onWork(message) 實現方法,聚合處理資料,當滿足條件時,提交 Data 到 Next 節點們處理。
messageNum 計數增加。#aggregate(message) 方法,聚合訊息到資料。messageNum >= 100 時,呼叫 #sendToNext() ,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。messageNum.endOfBatch == true 時,當訊息是批處理的最後一條時,呼叫 #sendToNext() ,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。
#sendToNext() 方法,提交快取資料的讀 Collection 的資料給 Next 節點們繼續處理。
Window#switchPointer() 方法,切換資料指標,並標記原指向正在讀取中。這裡並未先呼叫 Window#trySwitchPointer() 方法,是否會有併發問題?目前這裡是非同步單執行緒,所以不會有問題,參見 《SkyWalking 原始碼分析 —— Collector Queue 佇列元件》 。另外,在 「4. PersistenceWorker」 會看到併發的情況處理。4. PersistenceWorker
org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker ,實現 AbstractLocalAsyncWorker 抽象類,非同步批次儲存 Worker,負責聚合處理資料,後儲存 Data 。
dataCache 屬性,資料快取。batchDAO 屬性,批次操作 DAO ,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》 有詳細解析。
#needMergeDBData() 抽象方法,儲存時,是否需要合併資料。一些 Data 只有新增操作,沒有更新操作。#persistenceDAO() 抽象方法,獲得 Data 對應的持久化 DAO 介面的實現類物件。#prepareBatch(dataMap) 方法,生成批次操作物件陣列,最終呼叫 IBatchDAO#batchPersistence(List >) 方法,透過執行批次操作物件陣列,實現批次持久化資料,在 《SkyWalking 原始碼分析 —— Collector Storage 儲存元件》 有詳細解析。
#onWork(message) 實現方法,當滿足條件時儲存 Data ,而後聚合資料。這點和 AggregationWorker 相反的,因為要考慮併發問題。程式碼如下:
DataCache#currentCollectionSize() 方法,獲得當前寫入 Collection 的資料數量,判斷是否超過 5000 。
DataCache#trySwitchPointer() 方法,判斷是否可以切換 Collection 。透過該判斷,保證和 PersistenceTimer 一起時,不會出現併發問題。Window#switchPointer() 方法,切換資料指標,並標記原指向正在讀取中。#buildBatchCollection() 方法,建立批次操作物件陣列。該方法和 AggregationWorker#sendToNext() 方法基本類似。IBatchDAO#batchPersistence(List >) 方法,透過執行批次操作物件陣列,實現批次持久化資料。DataCache#trySwitchPointerFinally() 方法,釋放 DataCache.windowSwitch 的計數。#aggregate(message) 方法,聚合資料。該方法和 AggregationWorker#aggregate(message) 方法基本相似。4.1 WorkerCreateListener
org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener ,Worker 建立監聽器。WorkerCreateListener#addWorker 方法,記錄所有的 PersistenceWorker 物件。4.2 PersistenceTimer
org.skywalking.apm.collector.stream.timer.PersistenceTimer ,持久化定時任務,負責定時、批次儲存 PersistenceWorker 快取的資料。#start(IBatchDAO, List) 方法,建立延遲 1 秒,每 1 秒執行一次 #extractDataAndSave() 方法的定時任務,用於定時、批次儲存 PersistenceWorker 快取的資料。#extractDataAndSave(IBatchDAO, List) 方法,程式碼如下:
PersistenceWorker#flushAndSwitch() 切換資料指標,即切換讀寫 Collection 。PersistenceWorker#buildBatchCollection() 方法,建立批次操作物件陣列。Window#trySwitchPointer() 方法,保證讀 Collection 正在被讀取中時,PersistenceWorker 和 PersistenceTimer 有且僅有一個切換佇列,讀取資料。當讀取完成後,呼叫 Window#finishReadingLast() 方法,清空原資料指向,並標記原資料指向完成正在讀取中。IBatchDAO#batchPersistence(List >) 方法,執行批次操作,進行儲存。
知識星球
朋友會在“發現-看一看”看到你“在看”的內容