歡迎光臨
每天分享高質量文章

深入剖析 Redis5.0 全新資料結構 Streams(訊息佇列的新選擇)

點選上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

來源:阿飛的部落格

Redis 5.0 全新的資料型別:streams,官方把它定義為:以更抽象的方式建模日誌的資料結構。Redis的streams主要是一個append only的資料結構,至少在概念上它是一種在記憶體中表示的抽象資料型別,只不過它們實現了更強大的操作,以剋服日誌檔案本身的限制。

如果你瞭解MQ,那麼可以把streams當做MQ。如果你還瞭解kafka,那麼甚至可以把streams當做kafka。

另外,這個功能有點類似於redis以前的Pub/Sub,但是也有基本的不同:

  • streams支援多個客戶端(消費者)等待資料(Linux環境開多個視窗執行XREAD即可模擬),並且每個客戶端得到的是完全相同的資料。

  • Pub/Sub是傳送忘記的方式,並且不儲存任何資料;而streams樣式下,所有訊息被無限期追加在streams中,除非用於顯示執行刪除(XDEL)。

  • streams的Consumer Groups也是Pub/Sub無法實現的控制方式。

streams資料結構

streams資料結構本身非常簡單,但是streams依然是Redis到目前為止最複雜的型別,其原因是實現的一些額外的功能:一系列的阻塞操作允許消費者等待生產者加入到streams的新資料。另外還有一個稱為Consumer Groups的概念,這個概念最先由kafka提出,Redis有一個類似實現,和kafka的Consumer Groups的目的是一樣的:允許一組客戶端協調消費相同的資訊流!

redis原始碼中定義streams結構的原始碼如下,由原始碼可知,stream的核心資料結構是radix tree:

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

原始碼參考:https://github.com/antirez/redis/blob/5.0.0/src/stream.h;

至於redis對radix tree的實現,參考原始碼:https://github.com/antirez/redis/blob/5.0.0/src/rax.c 和 https://github.com/antirez/redis/blob/5.0.0/src/rax.h 。網上也有很多radix tree的文章,本篇文章就不做過多的介紹了。下麵給出一張從官方原始碼中的部分截圖:

radix tree

radix tree

streams基礎

為了理解streams的目的,以及如何使用它,我們先忽略掉所有高階特性,只把註意力放在資料結構本身,以及那些操作和訪問streams的命令。這基本上也是大多數其他Redis資料型別共有的部分,例如Lists,Sets,Sorted Sets等。然而需要註意的是,Lists也有一個更複雜的阻塞式的API,例如BLPOP,BRPOP等。streams這方便的API也沒什麼不同,只是更複雜,更強大(更牛逼,哈)!

streams命令

廢話不多說,先上手玩玩這個全新的資料型別。streams這個資料型別對應有如下13個操作命令,所有命令都以”X“開頭:

XADD

用法:XADD key ID field string [field string …]
正如其名,這個命令就是用來新增的,給streams追加(append,前面提到過:streams主要是一個append only的資料結構)一個新的entry(和Java裡的Map類似,Redis裡的streams中的資料也稱為entry)。

key:的含義就是同一型別streams的名稱;
ID: streams中entry的唯一識別符號,如果執行XADD命令時,傳入星號(*),那麼,ID會自動生成,且自動生成的ID會在執行XADD後傳回,預設生成的ID格式為millisecondsTime+sequenceNumber,即當前毫秒級別的時間戳加上一個自增序號值,例如"1540013735401-0"。並且執行XADD時,不接受少於或等於上一次執行XADD的ID,否則會報錯:ERR The ID specified in XADD is equal or smaller than the target stream top item
field&string;:接下來就是若干組field string。可以把它理解為表示屬性的json中的key-value。例如,某一streams的key命名為userInfo,且某個使用者資訊為{“username”:”afei”, “password”:”123456″},那麼執行XADD命令如下:

127.0.0.1:6379xadd userInfo * username afei password 123456
"1540014082060-0"

由於命令中ID欄位的值是星號,所以自定生成ID,1540014082060-0就是自動生成的ID。 XADD命令也支援顯示指定ID,例如:XADD streamname 0-2 foo bar

  • 時鐘回撥

需要註意的是,ID的時間戳部分是部署Redis伺服器的本地時間,如果發生時鐘回撥會怎麼樣?如果發生始終回撥,生成的ID的時間戳部分就是回撥後的時間,然後加上這個時間的遞增序列號。例如當前時間戳1540014082060,然後這時候發生了時鐘回撥,且回撥5ms,那麼時間戳就是1540014082055。假設以前已經生成了1540014082055-0,1540014082055-1,那麼這次由於時鐘回撥,生成的ID就是1540014082055-2。所以允許自動生成的ID在發生時鐘回撥時少於上次的ID,但是不允許顯示指定一個少於上次的ID。

XDEL

用法:XDEL key ID [ID …]
和XADD相反,這是命令用來從streams中刪除若干個entry,並且會傳回實際刪除數,這個刪除數可能和引數ID個數不等,因為某些ID表示的訊息可能不存在。執行命令如下,第二個引數ID是不存在的,所以XDEL的傳回結果是1:

127.0.0.1:6379XDEL userInfo "1540014379642-0" "1540014379642-1"
(integer) 1

XLEN

用法:XLEN key
很好理解,這個命令就是用來傳回streams中有多少個entry。執行如下:

127.0.0.1:6379XLEN userInfo
(integer) 2

streams三種查詢樣式

redis提供了三種查詢streams資料的樣式:

  1. 範圍查詢:因為streams的每個entry,其預設生成的ID是基於時間且遞增的;

  2. 監聽樣式:類比linux中的tailf命令,實時接收新增加到streams中的entry(也有點像一個訊息系統,事實上筆者認為它就是借鑒了kafka);

  3. 消費者組:即Consumer Groups,特殊的監聽樣式。從一個消費者的角度來看streams,一個streams能被分割槽到多個處理訊息的消費者,對於任意一條訊息,同一個消費者組中只有一個消費者可以處理(和kafka的消費者組完全一樣)。這樣還能夠橫向擴容消費者,從而提升處理訊息的能力,而不需要只讓把讓一個消費者處理所有訊息。

接下里分別介紹這三種樣式。

XRANGE

用法:XRANGE key start end [COUNT count]
這個命令屬於第1種樣式,即基於範圍查詢。這個命令用來傳回streams某個順序範圍下的元素,start引數是更小的ID,end引數是更大的ID。有兩個特殊的ID用符號”-“和”+”表示,符號”-“表示最小的ID,符號”+”表示最大的ID:

127.0.0.1:6379> XRANGE userInfo "1540014096298-0" "1540014477236-0"
11"1540014096298-0"
   21"username"
      2"root"
      3"password"
      4"666666"
21"1540014477236-0"
   21"username"
      2"test"
      3"password"
      4"111111"
127.0.0.1:6379> 
127.0.0.1:6379> XRANGE userInfo - +
11"1540014082060-0"
   21"username"
      2"afei"
      3"password"
      4"123456"
21"1540014096298-0"
   21"username"
      2"root"
      3"password"
      4"666666"
31"1540014477236-0"
   21"username"
      2"test"
      3"password"
      4"111111"
41"1540014493402-0"
   21"username"
      2"u1"
      3"password"
      4"111111"

XRANGE還能實現遍歷某個範圍區間的功能,例如我想遍歷2018-10-20號新增的使用者資訊。首先得到2018-10-20 00:00:00對應的時間戳為1539964800000,再得到2018-10-20 23:59:59對應的時間戳為1540051199000,然後執行如下命令:

127.0.0.1:6379> XRANGE userInfo 1539964800000-0  1540051199000-0 COUNT 5
11"1540014082060-0"
   21"username"
      2"afei"
      3"password"
      4"123456"
... ...
51"1540014496505-0"
   21"username"
      2"u2"
      3"password"
      4"111111"
127.0.0.1:6379> 
# 需要註意的是,接下來再遍歷的start引數是上一次遍歷結果最大的ID加1,即"1540014496505-0"加1就是"1540014496505-1"。
127.0.0.1:6379> XRANGE userInfo 1540014496505-1  1540051199000-0 COUNT 5
11"1540014499863-0"
   21"username"
      2"u3"
      3"password"
      4"111111"

XREVRANGE

用法:XREVRANGE key end start [COUNT count]
這個命令也屬於第1種樣式,且和XRANGE相反,傳回一個逆序範圍。end引數是更大的ID,start引數是更小的ID。執行示例如下:

XREVRANGE userInfo "1540014477236-0" "1540014096298-0"

XREAD

用法:XREAD [COUNT count][BLOCK milliseconds] STREAMS key [key …] ID [ID …]
很明顯,這個命令就是用來實現第2個樣式,即監聽樣式。其作用是傳回streams中從來沒有讀取的,且比引數ID更大的元素。

這個命令的使用方式如下:

127.0.0.1:6379> XREAD COUNT 10 BLOCK 60000 STREAMS userInfo "1540041139268-0"
11"userInfo"
   211"1540041264182-0"
         21"u2"
            2"p2"
(9.26s)
# "1540041264182-0"這條訊息時透過XADD新增的然後被XREAD監聽到的訊息。
127.0.0.1:6379> XREAD COUNT 2 STREAMS userInfo 0
11"userInfo"
   211"1540014082060-0"
         21"username"
            2"afei"
            3"password"
            4"123456"
      21"1540014096298-0"
         21"username"
            2"root"
            3"password"
            4"666666"
# 這條命令實現類似XRANGE的功能。

127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo $
11"userInfo"
   211"1540042613437-0"
         21"u7"
            2"p7"
# 說明BLOCK為0表示一致等待知道有新的資料,否則永遠不會超時。並且ID的值我們用特殊字元`$`表示,這個特殊字元表示我們只獲取最新新增的訊息。

此外,XREAD還支援同時監聽多個streams,用法如下所示:

127.0.0.1:6379> XREAD BLOCK 0 STREAMS userInfo_01 userInfo_02 userInfo_03 userInfo_04  $ $ $ $
11"userInfo_03"
   211"1540043348287-0"
         21"u1"
            2"p1"
(3.49s)
# 監聽userInfo_01~userInfo_04這4個streams的新的訊息。

XREAD除了COUNT和BLOCK,沒有其他選項了。所有XREAD是一個非常基本的命令。更多高階特性可以往下看接下來要介紹的XREADGROUP。

XREADGROUP

用法:XREADGROUP GROUP group consumer [COUNT count][BLOCK milliseconds] STREAMS key [key …] ID [ID …]
很明顯,這就是第三種樣式:消費者組樣式。

如果你瞭解kafka的消費者組,那麼你就也瞭解了streams的消費者組。如果不瞭解也沒關係,筆者簡單解釋一下,假設有三個消費者C1,C2,C3。在streams中總計有7條訊息:1, 2, 3, 4, 5, 6, 7,那麼消費關係如下所示:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

消費者組具備如下幾個特點:

  1. 同一個訊息不會被投遞到一個消費者組下的多個消費者,只可能是一個消費者。

  2. 同一個消費者組下,每個消費者都是唯一的,透過大小寫敏感的名字區分。

  3. 消費者組中的消費者請求的訊息,一定是新的,從來沒有投遞過的訊息。

  4. 消費一個訊息後,需要用命令(XACK)確認,意思是說:這條訊息已經給成功處理。正因為如此,當訪問streams的歷史訊息時,每個消費者只能看到投遞給它自己的訊息。

消費者組抽象的想象成如下這個樣子:

+----------------------------------------+
| consumer_group_name: afeigroup         |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

XACK

用法:XACK key group ID [ID …]
這是消費者組相關的另一個重要的命令。標記一個處理中的訊息為已被正確處理,如此一來,這條訊息就會被從消費者組的pending訊息集合中刪除,類似MQ中的ack。

XGROUP

用法:XGROUP [CREATE key groupname id-or-$][SETID key id-or-$] [DESTROY key groupname][DELCONSUMER key groupname consumername]

這也是消費者組的一個重要命令,這個命令用來管理消費者組,例如建立,刪除等。

XREADGROUP,XACK,XGROUP三種命令構成了消費者組相關的操作命令,下麵是消費者組一些操作示例:

# 建立一個消費者組
127.0.0.1:6379> XGROUP CREATE userInfo GRP-AFEI $
OK
# 需要註意的是,目前XGROUP CREATE的streams必須是一個存在的streams,否則會報錯:
127.0.0.1:6379> XGROUP CREATE userinfo GRP-AFEI $
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.

# 名為zhangsan的消費者,需要註意的是streams名稱userInfo後面的特殊符號`>`表示這個消費者只接收從來沒有被投遞給其他消費者的訊息,即新的訊息。當然我們也可以指定具體的ID,例如指定0表示訪問所有投遞給該消費者的歷史訊息,指定1540081890919-1表示投遞給該消費者且大於這個ID的歷史訊息:
127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 1 BLOCK 0 STREAMS userInfo >
# 名為lisi的消費者:
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 1 BLOCK 0 STREAMS userInfo >

# 接下來分別新增兩條資訊,一條就會被zhangsan消費,另一條被lisi消費:
127.0.0.1:6379> XADD userInfo * username u102102 password p102102
"1540081873370-0"
127.0.0.1:6379> XADD userInfo * username u102103 password p102103
"1540081890919-0"

#現在消費者lisi有一條訊息:
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
11"userInfo"
   211"1540081890919-0"
         21"username"
            2"u102103"
            3"password"
            4"p102103"
#然後透過命令ack這條訊息:
127.0.0.1:6379> XACK userInfo mygroup 1540081890919-0
(integer) 1
# 再看消費者lisi的pending佇列,已經為空:
127.0.0.1:6379> XREADGROUP GROUP mygroup lisi COUNT 5 BLOCK 0 STREAMS userInfo 0
11"userInfo"
   2) (empty list or set)

XPENDING

用法:XPENDING key group [start end count][consumer]
傳回streams中消費者組的pending訊息,即消費者接收到但是還沒有ack的訊息,用法參考:

# 檢視消費者組下總計最多10條pending訊息
127.0.0.1:6379> XPENDING userInfo mygroup - + 10
1) 1) "1540083260408-0"
   2) "zhangsan"
   3) (integer) 183551
   4) (integer) 1
2) 1) "1540083266293-0"
   2) "lisi"
   3) (integer) 177666
   4) (integer) 1
# 檢視消費者組下zhangsan這個消費者總計最多10條pending訊息
127.0.0.1:6379> XPENDING userInfo mygroup - + 10 zhangsan
1) 1) "1540083260408-0"
   2) "zhangsan"
   3) (integer) 187006
   4) (integer) 1

XCLAIM

用法:XCLAIM key group consumer min-idle-time ID [ID …][IDLE ms] [TIME ms-unix-time][RETRYCOUNT count] [FORCE][JUSTID]
作用是改變消費者組中訊息的所有權,用法參考:

127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
11"userInfo"
   211"1540083260408-0"
         21"username"
            2"u102106"
            3"password"
            4"p102106"
# zhangsan本來有1條訊息,現在將另一條本來屬於lisi的訊息的所有權轉給它:
127.0.0.1:6379> XCLAIM userInfo mygroup zhangsan 360 1540083266293-0
11"1540083266293-0"
   21"username"
      2"u102107"
      3"password"
      4"p102107"
# 現在zhangsan有兩條訊息了
127.0.0.1:6379> XREADGROUP GROUP mygroup zhangsan COUNT 5 BLOCK 0 STREAMS userInfo 0
11"userInfo"
   211"1540083260408-0"
         21"username"
            2"u102106"
            3"password"
            4"p102106"
      21"1540083266293-0"
         21"username"
            2"u102107"
            3"password"
            4"p102107"

XINFO

用法:XINFO [CONSUMERS key groupname][GROUPS key] [STREAM key][HELP]
其作用是得到streams和消費者組的一些資訊,使用參考:

127.0.0.1:6379> XINFO CONSUMERS userInfo mygroup 
1) 1) "name"
   2) "lisi"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 201086
2) 1) "name"
   2) "zhangsan"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 701954
127.0.0.1:6379> XINFO STREAM userInfo
 1) "length"
 2) (integer) 22
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 2
 9) "last-generated-id"
10) "1540082298051-0"
11) "first-entry"
12) 1) "1540014082060-0"
    2) 1) "username"
       2) "afei"
       3) "password"
       4) "123456"
13) "last-entry"
14) 1) "1540082298051-0"
    2) 1) "username"
       2) "u102105"
       3) "password"
       4) "p102105"

XTRIM

用法:XTRIM key MAXLEN [~] count
修剪streams到一個確定的size。Trims the stream to (approximately if ‘~’ is passed) a certain size,用法參考:

streams只保留10條訊息,其傳回結果表示被剪去多少條訊息:
127.0.0.1:6379XTRIM userInfo MAXLEN 10
(integer) 14

說明:streams目前的修剪策略比較簡單,比如連根據ID範圍修剪都沒有實現。根據具體某一個ID刪除,可以透過XDEL實現。

持久化,複製以及訊息安全性

和其他資料型別一樣,streams也會非同步複製到slave,並也會持久化到AOF和RDB檔案中。然而,消費者組的全部狀態是被傳播(propagated )到AOF,RDB和slave中。

需要註意的是,Redis的streams和消費者組使用Redis預設複製進行持久化和複製,因此:如果訊息的永續性在您的應用程式中很重要,則必須將AOF與強fsync策略一起使用
預設情況下,非同步複製不保證能複製每一個資料新增或使用者組狀態更改:在故障轉移之後,可能會丟失某些內容,具體取決於slave從master接收資料的能力。

  • 長度為0的streams

這是streams和其他redis資料型別的不同,其他資料型別,例如Lists,Sets等,如果所有元素都被刪除,那麼key也不存在。而streams允許所有entry都被刪除。

存在這種不對稱性的原因是因為streams可能具有關聯的消費者組,並且我們不希望由於streams中不再有任何entry而丟失消費者組定義的狀態。 目前,即使沒有關聯的消費者群體,也不會刪除該streams。



如果你對 Dubbo / Netty 等等原始碼與原理感興趣,歡迎加入我的知識星球一起交流。長按下方二維碼噢

目前在知識星球更新了《Dubbo 原始碼解析》目錄如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 叢集容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

… 一共 69+ 篇

目前在知識星球更新了《Netty 原始碼解析》目錄如下:

01. 除錯環境搭建
02. NIO 基礎
03. Netty 簡介
04. 啟動 Bootstrap

05. 事件輪詢 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 位元組緩衝區 ByteBuf

09. 通道處理器 ChannelHandler

10. 編解碼 Codec

11. 工具類 Util

… 一共 61+ 篇

目前在知識星球更新了《資料庫物體設計》目錄如下:


01. 商品模組
02. 交易模組
03. 營銷模組
04. 公用模組

… 一共 17+ 篇


目前在知識星球更新了《Spring 原始碼解析》目錄如下:


01. 除錯環境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 載入

04. IoC BeanDefinition 註冊

05. IoC Bean 獲取

06. IoC Bean 生命週期

… 一共 35+ 篇

贊(0)

分享創造快樂