1.背景
瞭解過RabbitMQ的Fanout樣式,應該知道它原本的Fanout樣式就是用來做廣播的。但是它的廣播有一點區別,來回顧下它的含義:Fanout型別沒有路由鍵的概念,只要佇列系結到了改exchange上面,就會接收到所有的訊息。
使用過程一般就是先new 出一個Fanout型別的交換機,然後往這個交換機上系結多個佇列queue,不同的消費者各自監聽不同的佇列,這就實現了廣播效果,因為同一個訊息,會分發到所有佇列中。
舉個例子:
應用A監聽了佇列A,應用B監聽了佇列B,Fanout型別交換機同時系結了佇列A和B.假設生產者端發送了一條訊息到Fanout型別交換機,交換機就會把訊息分發到所有佇列,這時應用A和應用B會收到同一條訊息,這就是廣播。
說了上面一大堆,只是為了強調,對於RabbitMQ的原本Fanout樣式,它的設計就是多個消費者必須監聽不同的佇列,多個消費者之間才會形成廣播關係。
那麼問題來了,假如在Fanout工作樣式下,多個消費者同時監聽的是同一個佇列,會怎樣?實踐過的同學應該都知道,這種情況下,這些消費者會形成競爭關係,現象是同一個訊息只會被其中一個消費者接收,達不到廣播的效果。。
2.需求
假如現在有一個需求,要做到對同一個應用的多個節點進行廣播,怎麼實現?
註意,這裡所說的同一個應用多個節點,通俗點理解就是一個war包,布在多個伺服器節點上。
在實際部署叢集時,為了高可用,同一個應用可能會部署多個節點,那假如工程裡已經透過配置定義某個佇列,那多個節點它們定義的佇列就會是相同的,那按照上面的背景,那這些節點間肯定就會存在競爭關係,即便是Fanout樣式的交換機,一條訊息也只能被其中一個節點接收,其他節點收不到,達不到廣播的效果。那該如何做?
相信看到這裡,有人會問,為何會有 對同一個應用的多個節點進行廣播的需求場景?為什麼要有這個需求。生產中的業務系統很多,自然而然場景就很多。
舉兩個經典的例子:
1.想要同時掃清所有節點的快取
業務系統離不開快取,有時會用記憶體快取,假如我要掃清所有節點的記憶體快取,多個節點前可能有負載均衡例如nginx之類的,我只需要訪問其中一個節點,然後讓這個節點做廣播通知所有其他節點刷快取。(廣播刷快取)
2.websocket會話尋找
websocket是比較受歡迎的實時訊息推送方案。用過websocket應該知道,websocket只能與多個節點中的其中一個節點做長連線會話保持,也就是說使用者的會話只會存在於一個節點上,假設服務端要主動向用戶推一條訊息,必須要知道使用者的會話在哪個節點上,怎麼得知?可以透過廣播,透過訊息廣播,把訊息發到多個節點上,然後節點收到訊息只需要判斷使用者會話是否就在本節點上,假如在則主動推訊息,不在,則丟棄這條訊息。
類似上面這兩種需求,就需要用到廣播,並且是對同一個應用的多個節點進行廣播。當然不用廣播肯定也有其他通知方案,本文我們只討論用MQ怎麼做到。
3.思路
假如繼續用RabbitMQ的Fanout樣式,怎麼做到對同一個應用的多個節點進行廣播?
要起到廣播效果,關鍵就是讓多個應用節點間不要存在競爭關係或者存在競爭關係時它們的訊息怎麼共享?可以從這兩個方向解決這個問題。
方法可能很多種,在這裡,我只描述兩種比較容易實現的方案。
方案1
過程大致如下
- 應用啟動,多個節點監聽同一個佇列(此時多個節點是競爭關係,一條訊息只會發到其中一個節點上)
- 訊息生產者傳送訊息,同一條訊息只被其中一個節點收到
- 收到訊息的節點透過redis的釋出訂閱樣式來通知其他兄弟節點
這種方案是最容易想到的,思路就是依賴其他元件來做訊息共享,例如redis這種可以替換成其他方案,只要能做到訊息共享就行,那麼最終的效果就肯定是廣播效果了。
方案2
過程大致如下
- 應用啟動,利用監聽器生成唯一ID
- 生成的唯一ID,透過檔案寫入的方式寫到配置檔案中
- spring啟動,把這個唯一ID載入為全域性屬性(為何要用唯一ID,就是為了用這個ID作為該節點的監聽佇列名,當然字首可以用相同的,字尾用唯一ID區分即可,舉個例子就是:節點1監聽佇列 kunghsu-123 節點2監聽佇列 kunghsu-456.必須保證它們的唯一ID是唯一的,不然還是會存在競爭關係)
- 多個節點監聽了多個佇列(讓每個佇列名都不同,目的就是讓他們不存在競爭關係,沒有競爭關係就不用做訊息共享,只管由MQ分發即可,這時同一條訊息就會發到多個節點上)
- 到MQ控制檯,將所有節點生成的佇列手動系結到指定的Fanout交換機上(這一步是手動的,當然也可以透過API做到,下麵會說到)
- 生產者傳送訊息指定的Fanout交換機,交換機將同一條訊息被分發到多個節點上
- 廣播效果達成!
這種方案,也比較容易。這樣做,就是為了讓多個節點間是廣播關係。總的來說不麻煩,其中第五步手動操作其實有點挫,這種手動操作步驟其實是應該轉成自動化,讓應用程式來完成,方便以後自動化建設。
這種方案的spring配置也比較簡單,參考Fanout樣式的配置即可。本文重點在這個思路的實現過程。
只列舉部分程式碼如下:
訊息生產者
"exchangeFour"
durable=“true” auto–delete=“false” >
template id=“amqpTemplate4” connection-factory=“connectionFactory2”
exchange=“exchangeFour” />
訊息消費者
queue name="${queue-name-fanout}" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
"fanoutTwoConsumer"
class=“com.lunch.foo.rabbitmq.FanoutTwoConsumer”>
connection-factory=“connectionFactory2”>
“${queue-name-fanout}” ref=“fanoutOneConsumer” />
另外,RabbitMQ的客戶端API支援讓我們 將佇列系結到指定的交換機上。具體可參考我的工具類程式碼。
程式碼如下:
package com.lunch.foo.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xuyaokun On 2019/3/10 2:26
* @desc:
*/
public class RabbitMQUtil {
private static final String HOST = "192.168.3.128";
private static final int PORT = AMQP.PROTOCOL.PORT;
private static final String USERNAME = "kunghsu";
private static final String PASSWORD = "123456";
private static final String VIRTUALHOST = "/";
public static void main(String[] args) {
String QUEUE_NAME = "queueOneX";
String EXCHANGE_NAME = "exchangeFour";
try {
queueBind(EXCHANGE_NAME, QUEUE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* 獲取會話連結
*
* @return
* @throws IOException
* @throws TimeoutException
*/
private static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUALHOST);
return factory.newConnection();
}
/**
* 系結佇列到指定交換機
*
* @param exchangeName
* @param queueName
* @throws IOException
* @throws TimeoutException
*/
public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {
Channel channel = null;
try{
channel = getConnection().createChannel();
} catch(Exception e){
System.out.println("獲取RabbitMQ會話連線失敗!取消做佇列系結。");
return ;
}
//預設持久化
channel.queueDeclare(queueName, true, false, false, null);
// 宣告交換機:指定交換機的名稱和型別(廣播:fanout)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);
// 在消費者端佇列系結
channel.queueBind(queueName, exchangeName, "");
channel.close();
}
}
總結
RabbitMQ的Fanout樣式相關的文章,網上一抓一大把,但是幾乎沒有人講到 如何實現 對同一個應用的多個節點進行廣播。。希望透過這篇文章,能幫助到有需要的同學。另外,假如大家有更好的方案,歡迎交流。感謝閱讀!
【本文作者】
作者網名:xyk_1021