來自:程式設計玩家
連結:http://www.cnblogs.com/Erik_Xu/p/9515208.html
前言
最近需要使用到訊息佇列相關技術,於是重新接觸RabbitMQ。其中遇到了不少可靠性方面的問題,歸納了一下,大概有以下幾種:
1、臨時異常,如資料庫網路閃斷、http請求臨時失效等;
2、時序異常,如A任務依賴於B任務,但可能由於排程或消費者分配的原因,導致A任務先於B任務執行;
3、業務異常,由於系統測試不充分,上線後發現某幾個或某幾種訊息無法正常處理;
4、系統異常,業務中介軟體無法正常操作,如網路中斷、資料庫宕機等;
5、非法異常,一些偽造、攻擊型別的訊息。
針對這些異常,我採用了一種基於訊息審計、訊息重試、訊息檢索、訊息重發的方案。
方案
1、訊息均使用Exchange進行通訊,方式可以是direct或topic,不建議fanout。
2、根據業務在Exchange下分配一個或多個Queue,同時設定一個審計執行緒(Audit)監聽所有Queue,用於記錄訊息到MongoDB,同時又不阻塞正常業務處理。
3、生產者(Publisher)在釋出訊息時,基於AMQP協議,生成訊息標識MessageId和時間戳Timestamp,根據訊息業務新增頭資訊Headers便於跟蹤。
4、消費者(Comsumer)訊息處理失敗時,則把訊息傳送到重試交換機(Retry Exchange),並設定過期(重試)時間及更新重試次數;如果超過重試次數則刪除訊息。
5、重試交換機Exchange設定死信交換機(Dead Letter Exchange),訊息過期後自動轉發到業務交換機(Exchange)。
6、WebApi可以根據訊息標識MessageId、時間戳Timestamp以及頭資訊Headers在MongoDB中對訊息進行檢索或重試。
註:選擇MongoDB作為儲存介質的主要原因是其對頭資訊(essay-headers)的動態查詢支援較好,同等的替代產品還可以是Elastic Search這些。
生產者(Publisher)
1、設定斷線自動恢復
var factory = new ConnectionFactory
{
Uri = new Uri(“amqp://guest:guest@192.168.132.137:5672”),
AutomaticRecoveryEnabled = true
};
2、定義Exchange,樣式為direct
channel.ExchangeDeclare(“Exchange”, “direct”);
3、根據業務定義QueueA和QueueB
channel.QueueDeclare(“QueueA”, true, false, false);
channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);
channel.QueueDeclare(“QueueB”, true, false, false);
channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);
4、啟動訊息傳送確認機制,即需要收到RabbitMQ服務端的確認訊息
channel.ConfirmSelect();
5、設定訊息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
6、生成訊息標識MessageId、時間戳Timestamp以及頭資訊Headers
properties.MessageId = Guid.NewGuid().ToString(“N”);
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
properties.Headers = new Dictionary
{
{ “key”, “value” + i}
};
7、傳送訊息,偶數序列傳送到QueueA(RouteA),奇數序列傳送到QueueB(RouteB)
channel.BasicPublish(“Exchange”, i % 2 == 0 ? “RouteA” : “RouteB”, properties, body);
8、確定收到RabbitMQ服務端的確認訊息
var isOk = channel.WaitForConfirms();
if (!isOk)
{
throw new Exception(“The message is not reached to the server!”);
}
完整程式碼
var factory = new ConnectionFactory
{
Uri = new Uri(“amqp://guest:guest@localhost:5672”),
AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(“Exchange”, “direct”);
channel.QueueDeclare(“QueueA”, true, false, false);
channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);
channel.QueueDeclare(“QueueB”, true, false, false);
channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);
channel.ConfirmSelect();
for (var i = 0; i < 2; i++)
{
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString(“N”);
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
properties.Headers = new Dictionary
{
{ “key”, “value” + i}
};
var message = “Hello ” + i;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(“Exchange”, i % 2 == 0 ? “RouteA” : “RouteB”, properties, body);
var isOk = channel.WaitForConfirms();
if (!isOk)
{
throw new Exception(“The message is not reached to the server!”);
}
}
}
}
效果:QueueA和QueueB各一條訊息,QueueAudit兩條訊息
註:Exchange下必須先宣告Queue才能接收到訊息,上述程式碼並沒有QueueAudit的宣告;需要手動宣告,或者先執行下麵的消費者程式進行宣告。
正常消費者(ComsumerA)
1、設定預取訊息,避免公平輪訓問題,可以根據需要設定預取訊息數,這裡是1
_channel.BasicQos(0, 1, false);
2、宣告Exchange和Queue
_channel.ExchangeDeclare(“Exchange”, “direct”);
_channel.QueueDeclare(“QueueA”, true, false, false);
_channel.QueueBind(“QueueA”, “Exchange”, “RouteA”);
3、編寫回呼函式
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
//The QueueA is always successful.
try
{
_channel.BasicAck(ea.DeliveryTag, false);
}
catch (AlreadyClosedException ex)
{
_logger.LogCritical(ex, “RabbitMQ is closed!”);
}
};
_channel.BasicConsume(“QueueA”, false, consumer);
註:設定了RabbitMQ的斷線恢復機制,當RabbitMQ連線不可用時,與MQ通訊的操作會丟擲AlreadyClosedException的異常,導致主執行緒退出,哪怕連線恢復了,程式也無法恢復,因此,需要捕獲處理該異常。
異常消費者(ComsumerB)
1、設定預取訊息
_channel.BasicQos(0, 1, false);
2、宣告Exchange和Queue
_channel.ExchangeDeclare(“Exchange”, “direct”);
_channel.QueueDeclare(“QueueB”, true, false, false);
_channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);
3、設定死信交換機(Dead Letter Exchange)
var retryDic = new Dictionary
{
{“x-dead-letter-exchange”, “Exchange”},
{“x-dead-letter-routing-key”, “RouteB”}
};
_channel.ExchangeDeclare(“Exchange_Retry”, “direct”);
_channel.QueueDeclare(“QueueB_Retry”, true, false, false, retryDic);
_channel.QueueBind(“QueueB_Retry”, “Exchange_Retry”, “RouteB_Retry”);
4、重試設定,3次重試;第一次1秒,第二次10秒,第三次30秒
_retryTime = new List
{
1 * 1000,
10 * 1000,
30 * 1000
};
5、獲取當前重試次數
var retryCount = 0;
if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey(“retryCount”))
{
retryCount = (int)ea.BasicProperties.Headers[“retryCount”];
_logger.LogWarning($”[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started…”);
}
6、發生異常,判斷是否可以重試
private bool CanRetry(int retryCount)
{
return retryCount <= _retryTime.Count - 1;
}
7、可以重試,則啟動重試機制
private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)
{
var body = ea.Body;
var properties = ea.BasicProperties;
properties.Headers = properties.Headers ?? new Dictionary
(); properties.Headers[“retryCount”] = retryCount;
properties.Expiration = _retryTime[retryCount].ToString();
try
{
_channel.BasicPublish(retryExchange, retryRoute, properties, body);
}
catch (AlreadyClosedException ex)
{
_logger.LogCritical(ex, “RabbitMQ is closed!”);
}
}
完整程式碼
_channel.BasicQos(0, 1, false);
_channel.ExchangeDeclare(“Exchange”, “direct”);
_channel.QueueDeclare(“QueueB”, true, false, false);
_channel.QueueBind(“QueueB”, “Exchange”, “RouteB”);
var retryDic = new Dictionary
{
{“x-dead-letter-exchange”, “Exchange”},
{“x-dead-letter-routing-key”, “RouteB”}
};
_channel.ExchangeDeclare(“Exchange_Retry”, “direct”);
_channel.QueueDeclare(“QueueB_Retry”, true, false, false, retryDic);
_channel.QueueBind(“QueueB_Retry”, “Exchange_Retry”, “RouteB_Retry”);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
//The QueueB is always failed.
bool canAck;
var retryCount = 0;
if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey(“retryCount”))
{
retryCount = (int)ea.BasicProperties.Headers[“retryCount”];
_logger.LogWarning($”[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started…”);
}
try
{
Handle();
canAck = true;
}
catch (Exception ex)
{
_logger.LogCritical(ex, “Error!”);
if (CanRetry(retryCount))
{
SetupRetry(retryCount, “Exchange_Retry”, “RouteB_Retry”, ea);
canAck = true;
}
else
{
canAck = false;
}
}
try
{
if (canAck)
{
_channel.BasicAck(ea.DeliveryTag, false);
}
else
{
_channel.BasicNack(ea.DeliveryTag, false, false);
}
}
catch (AlreadyClosedException ex)
{
_logger.LogCritical(ex, “RabbitMQ is closed!”);
}
};
_channel.BasicConsume(“QueueB”, false, consumer);
審計消費者(Audit Comsumer)
1、宣告Exchange和Queue
_channel.ExchangeDeclare(“Exchange”, “direct”);
_channel.QueueDeclare(“QueueAudit”, true, false, false);
_channel.QueueBind(“QueueAudit”, “Exchange”, “RouteA”);
_channel.QueueBind(“QueueAudit”, “Exchange”, “RouteB”);
2、排除死信Exchange轉發過來的重覆訊息
if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey(“x-death”))
{
…
}
3、生成訊息物體
var message = new Message
{
MessageId = ea.BasicProperties.MessageId,
Body = ea.Body,
Exchange = ea.Exchange,
Route = ea.RoutingKey
};
4、RabbitMQ會用bytes來儲存字串,因此,要把頭中bytes轉回字串
if (ea.BasicProperties.Headers != null)
{
var essay-headers = new Dictionary
(); foreach (var essay-header in ea.BasicProperties.Headers)
{
if (essay-header.Value is byte[] bytes)
{
essay-headers[essay-header.Key] = Encoding.UTF8.GetString(bytes);
}
else
{
essay-headers[essay-header.Key] = essay-header.Value;
}
}
message.Headers = essay-headers;
}
5、把Unix格式的Timestamp轉成UTC時間
if (ea.BasicProperties.Timestamp.UnixTime > 0)
{
message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;
var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
message.Timestamp = offset.UtcDateTime;
}
6、訊息存入MongoDB
_mongoDbContext.Collection
().InsertOne(message, cancellationToken: cancellationToken);
MongoDB記錄:
重試記錄:
訊息檢索及重發(WebApi)
1、透過訊息Id檢索訊息
2、透過頭訊息檢索訊息
3、訊息重發,會重新生成MessageId
Ack,Nack,Reject的關係
1、訊息處理成功,執行Ack,RabbitMQ會把訊息從佇列中刪除。
2、訊息處理失敗,執行Nack或者Reject:
a) 當requeue=true時,訊息會重新回到佇列,然後當前消費者會馬上再取回這條訊息;
b) 當requeue=false時,如果Exchange有設定Dead Letter Exchange,則訊息會去到Dead Letter Exchange;
c) 當requeue=false時,如果Exchange沒設定Dead Letter Exchange,則訊息從佇列中刪除,效果與Ack相同。
3、Nack與Reject的區別在於:Nack可以批次操作,Reject只能單條操作。
RabbitMQ自動恢復
連線(Connection)恢復
1、重連(Reconnect)
2、恢復連線監聽(Listeners)
3、重新開啟通道(Channels)
4、恢復通道監聽(Listeners)
5、恢復basic.qos,publisher confirms以及transaction設定
拓撲(Topology)恢復
1、重新宣告交換機(Exchanges)
2、重新宣告佇列(Queues)
3、恢復所有系結(Bindings)
4、恢復所有消費者(Consumers)
異常處理機制
1、臨時異常,如資料庫網路閃斷、http請求臨時失效等透過短時間重試(如1秒後)的方式處理,也可以考慮Nack/Reject來實現重試(時效性更高)。
2、時序異常,如A任務依賴於B任務,但可能由於排程或消費者分配的原因,導致A任務先於B任務執行透過長時間重試(如1分鐘、30分鐘、1小時、1天等),等待B任務先執行完的方式處理。
3、業務異常,由於系統測試不充分,上線後發現某幾個或某幾種訊息無法正常處理
等系統修正後,透過訊息重發的方式處理。
4、系統異常,業務中介軟體無法正常操作,如網路中斷、資料庫宕機等等系統恢復後,透過訊息重發的方式處理。
5、非法異常,一些偽造、攻擊型別的訊息多次重試失敗後,訊息從佇列中被刪除,也可以針對此業務做進一步處理。
原始碼地址:https://github.com/ErikXu/RabbitMesage
看完本文有收穫?請轉發分享給更多人
●編號137,輸入編號直達本文
●輸入m獲取文章目錄
Web開發
更多推薦《18個技術類公眾微信》
涵蓋:程式人生、演演算法與資料結構、駭客技術與網路安全、大資料技術、前端開發、Java、Python、Web開發、安卓開發、iOS開發、C/C++、.NET、Linux、資料庫、運維等。