前言
在業務開發過程中,我們常常需要做一些定時任務,這些任務一般用來做監控或者清理任務,比如在訂單的業務場景中,使用者在建立訂單後一段時間內,沒有完成支付,系統將自動取消該訂單,並將庫存傳回到商品中,又比如在微信中,使用者發出紅包24小時後,需要對紅包進行檢查,是否已領取完成,如未領取完成,將剩餘金額退回到傳送者錢包中,同時銷毀該紅包。
在專案初始階段,或者是一些小型的專案中,常常採用定時輪詢的方法進行檢查,但是我們都知道,定時輪詢將給資料庫帶來不小的壓力,而且定時間隔無法進行動態調整,特別是一個系統中,同時存在好幾個定時器的時候,就顯得非常的麻煩,同時給資料庫造成巨大的訪問壓力。
下麵,本文將演示如何使用一個 RabbitMQ 的死信佇列同時監控多種業務(複合業務),達到模組解耦,釋放壓力的目的。
註意:名詞“複合死信”是為了敘述方便臨時創造的,如有不妥,歡迎指正
1. 什麼是 RabbitMQ 死信佇列
DLX(Dead Letter Exchanges)死信交換,死信佇列本身也是一個普通的訊息佇列,在建立佇列的時候,透過設定一些關鍵引數,可以將一個普通的訊息佇列設定為死信佇列,與其它訊息佇列不同的是,其入棧的訊息根據入棧時指定的過期時間/被拒絕/超出佇列長度被移除,依次被轉發到指定的訊息佇列中進行二次處理。這樣說法比較拗口,其原理就是死信佇列內位於頂部的訊息過期時,該訊息將被馬上傳送到另外一個訂閱者(訊息佇列)中。
其原理入下圖
由上圖可以看到,目前有三種型別的業務需要使用 DLX 進行處理,因為每個業務的超時時間不一致的問題,如果將他們都放入一個 DLX 中進行處理,將會出現一個時序的問題,即訊息佇列總數處理頂部的訊息,如果頂部的訊息未過期,而底部的訊息過期,這就麻煩了,因為過期的訊息無法得到消費,將會造成延遲;所以正常情況下,最好的辦法是每個業務都獨立一個佇列,這樣就可以保證,即將過期的訊息總是處於佇列的頂部,從而被第一時間處理。
但是多個 DLX 又帶來了管理上面的問題,隨著業務的增加,越來越多的業務需要進入不同的 DLX ,這個時候我們發現,由於人手不足的原因,維護這麼多 DLX 實在是太吃力了,如果能將這些訊息都接入一個 DLX 中該多好呀,在一個 DLX 中進行訊息訂閱,然後進行分發或者處理,這就非常有趣了。
下麵就按照這個思路,我們進行集中處理,也就是複合死信交換 CDLX(Composite Dead Letter Exchanges)
2. 如何建立死信佇列
建立 DLX 佇列的方式非常簡單,我們使用 RabbitMQ Web 控制面板進行建立 Exhcange(交換機)/Consumer(死信消費佇列)/cdlx(複合死信佇列)
2.1 建立佇列
建立交換機 cdlx-Exchange
死信消費佇列 cdlx-Consumer
複合死信佇列 cdlx-Master
- 註意,這裡新增死信佇列必須同時設定死信轉發交換機和路由,後續透過路由系結實現消費佇列
路由系結
上面的路由系結共有兩個,分別是 Master 和 Consumer 用於訊息路由到佇列,為下麵的業務訊息做準備,建好後的佇列如下
3.複合業務進入死信佇列
當建立好佇列以後,我們就可以專心的處理業務了,下麵就來模擬3種業務將訊息傳送到死信佇列的過程
3.1 傳送死信訊息到佇列
傳送訊息使用了 Asp.NetCore輕鬆學-實現一個輕量級高可復用的RabbitMQ客戶端 中的輕量客戶端,封裝後的傳送訊息程式碼如下
public class CdlxMasterService
{
private IConfiguration cfg = null;
private ILogger logger = null;
private string vhost = "test_mq";
private string exchange = "cdlx-Exchange";
private string routekey = "master";
private static MQConnection connection = null;
private MQConnection Connection
{
get
{
if (connection == null || !connection.Connection.IsOpen)
{
connection = new MQConnection(
cfg["rabbitmq:username"],
cfg["rabbitmq:password"],
cfg["rabbitmq:host"],
Convert.ToInt32(cfg["rabbitmq:port"]),
vhost,
logger);
}
return connection;
}
}
private static IModel channel = null;
private IModel Channel
{
get
{
if (channel == null || channel.IsClosed)
channel = Connection.Connection.CreateModel();
return channel;
}
}
public void SendMessage(object data)
{
string message = JsonConvert.SerializeObject(data);
this.Connection.Publish(this.Channel, exchange, routekey, message);
}
}
3.2 將 CdlxMasterService 註入到服務
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<CdlxMasterService>();
...
}
3.3 模擬3種業務生產死信訊息
public class HomeController : Controller
{
private CdlxMasterService masterService;
public HomeController(CdlxMasterService masterService)
{
this.masterService = masterService;
}
[HttpGet("publish")]
public int Publish()
{
Contract contract = new Contract(this.masterService);
for (int i = 0; i < 10; i++)
{
contract.Publish(MessageType.RedPackage, "紅包資訊,超時時間1024s");
contract.Publish(MessageType.Order, "訂單資訊,超時時間2048s");
contract.Publish(MessageType.Vote, "投票資訊,超時時間4096s");
}
return 0;
}
}
上面的介面 puhlish 模擬了業務訊息,由於我們依次釋出了 紅包/訂單/投票 訊息,所以迭代釋出 10 次後,正好形成了一個時序錯亂的資訊佇列,按照自動過期時序計算,當第一個紅包超時到達時,第四條訊息(紅包)也會接著超時,可是由於此時訂單和投票訊息位於紅包訊息上面,該紅包訊息在達到超時時間後並不會被投遞到 Consumer 消費佇列,這是正確的,我們確實也是希望是這個結果
如果有一個辦法把超時的訊息自動將其提升到佇列頂部就好了!
4. 處理複合死信
在 RabbitMQ 提供的 API 介面中,沒有什麼直接可用的能將死信佇列中超時訊息提升到頂部的好辦法;但是,我們可以利用部分 API 介面的特性來完成這件事情。
4.1 定時消費客戶端
下麵,我們將使用一個定時消費客戶端來完成對死信佇列的輪詢,充分利用 RabbitMQ 的消費特性來完成超時訊息的位置提升。
過程如下圖:
如上圖所示,我們增加一個 dlx-timer 定時器,定時的發起對死信佇列的消費,該消費者僅僅是消費,不確認訊息,也就是不做 ack,然後將訊息重新置入佇列中;這個過程,就是將訊息不斷提升位置的過程。
4.2 定時消費客戶端實現程式碼
public class CdlxTimerService : MQServiceBase
{
public override string vHost { get { return "test_mq"; } }
public override string Exchange { get { return "cdlx-Exchange"; } }
public override List<BindInfo> Binds => new List<BindInfo>();
private string queue = "cdlx-Master";
public CdlxTimerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
{
}
///
/// 檢查死信佇列
///
///
public List<CdlxMessage> CheckMessage()
{
long total = 0;
List<CdlxMessage> list = new List<CdlxMessage>();
var connection = base.CreateConnection();
using (IModel channel = connection.Connection.CreateModel())
{
bool latest = true;
while (latest)
{
BasicGetResult result = channel.BasicGet(this.queue, false);
total++;
latest = result != null;
if (latest)
{
var json = Encoding.UTF8.GetString(result.Body);
list.Add(JsonConvert.DeserializeObject<CdlxMessage>(json));
}
}
channel.Close();
connection.Close();
}
return list;
}
}
上面的程式碼首先在定時呼叫到來的時候,建立了一個 Connection,然後利用此 Connection 建立了了一個 Channel,緊接著,使用該 Channel 呼叫 BasicGet 方法,獲得佇列頂部的資訊,且設定 autoAck=false,表示僅檢查訊息,不確認,然後進入一個 while 迭代過程,一直讀取到佇列底部,獲得所有佇列中的資訊,最後,關閉了通道釋放連線。
這樣,就完成了一次訊息檢查的過程,在呼叫 BasicGet 後,下一條資訊將會出現在佇列的頂部,同步,佇列將自動對該訊息進行超時檢查,由於我們在呼叫 BasicGet 的時候,傳入 autoAck=false,不確認該訊息,在 RabbitMQ 控制臺中,將顯示為 unacted,所以在釋放連線後,所有訊息將會被重新置入佇列中,這是一個自動的過程,無需我們做額外的工作。
4.3 Consumer(死信消費佇列)最終處理業務
配置佇列管理隨程式啟動停止
private MQServcieManager serviceManager;
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime lifeTime)
{
serviceManager = new MQServcieManager(this.Configuration, factory.CreateLogger<MQServcieManager>());
lifeTime.ApplicationStarted.Register(() => { serviceManager.Start(); });
lifeTime.ApplicationStopping.Register(() => { serviceManager.Stop(); });
...
}
實現消費佇列
public class CdlxConsumerService : MQServiceBase
{
public override string vHost { get { return "test_mq"; } }
public override string Exchange { get { return "cdlx-Exchange"; } }
private string queue = "cdlx-Consumer";
private string routeKey = "all";
private List<BindInfo> bs = new List<BindInfo>();
public override List<BindInfo> Binds { get { return bs; } }
public CdlxConsumerService(IConfiguration cfg, ILogger logger) : base(cfg, logger)
{
this.bs.Add(new BindInfo
{
ExchangeType = ExchangeType.Direct,
Queue = this.queue,
RouterKey = this.routeKey,
OnReceived = this.OnReceived
});
}
private void OnReceived(MessageBody body)
{
var message = JsonConvert.DeserializeObject<CdlxMessage>(body.Content);
Console.WriteLine("型別:{0}\t 內容:{1}\t進入時間:{2}\t過期時間:{3}", message.Type, message.Data, message.CreateTime, message.CreateTime.AddSeconds(message.Expire));
body.Consumer.Model.BasicAck(body.BasicDeliver.DeliveryTag, true);
}
}
上面的程式碼,模擬了最終業務處理的過程,這裡僅僅是簡單演示,所以只是將訊息列印到螢幕上;在實際的業務場景中,我們可以根據不同的 MessageType 進行訊息的分發處理。
5. 消費過程演示
為了比較直觀的觀看死信消費過程,我們編寫一個簡單的串列頁面,自動掃清後去消費死信佇列,然後將訊息輸出到頁面上,透過觀察此頁面,我們可以實時瞭解到死信佇列的消費過程,實際的業務場景中,大家可以利用第三方定時器定時呼叫介面實現,或者使用內建的輕量主機做後臺任務實現定時輪詢,具體參考 Asp.Net Core 輕鬆學-基於微服務的後臺任務排程管理器
5.1 釋出訊息
瀏覽器訪問本機地址:http://localhost:5000/home/publish
下麵將釋出 30 條資訊到 DLX 中,每個業務各 10 條資訊。
通常情況下,紅包的過期時間最短且超時時間一致,應該最快超時,意味著當第一條紅包訊息超時的時候,其餘 9 條紅包訊息也會一併超時,但是由於紅包訊息混合的釋出在佇列中,且只有第一條紅包訊息位移佇列頂部;所以,當第一條紅包訊息超時被消費後,其餘 9 條紅包由於不是位於佇列頂部,雖然此時他們已經超時,但是 DLX 將無法處理;當我們使用 cdlx-timer(定時器)模擬呼叫 CdlxTimerService 的時候(也就是掃清首頁), CdlxTimerService 服務將會對 DLX 進行檢查。
檢視消費狀態
透過上圖的觀察得知,紅色部分首先位於訊息頂部被消費,然後就無法進行超時判斷,接下來,由於使用了定時輪詢,使得綠色部分訊息得以浮動到訊息頂部,然後被 DLX 進行處理後消費。
5.2 定時器檢查死信佇列
瀏覽器訪問本機地址:http://localhost:5000/home
上圖的每一次掃清,都是對 DLX 的一次輪詢檢查,隨著輪詢的深入,所有處於佇列中不同位置的超時訊息都有機會浮動到佇列頂部進行消費處理。
結束語
業務的發展促進了架構的演進,每一個需求升級的背後,是程式員深深的思考;本文從 CDLX 的需求出發,充分利用了 RabbitMQ DLX 對訊息檢查的特性,實現了對複合業務的集中處理。
演示程式碼下載
https://github.com/lianggx/Examples/tree/master/RabbitMQ.CDLX