摘要: 原創出處 http://www.iocoder.cn/RocketMQ/filtersrv/ 「芋道原始碼」歡迎轉載,保留摘要,謝謝!
本文主要基於 RocketMQ 4.0.x 正式版
-
1. 概述
-
2. Filtersrv 註冊到 Broker
-
3. 過濾類
-
3.1 Consumer 訂閱時設定 過濾類程式碼
-
3.2 Consumer 上傳 過濾類程式碼
-
3.3 Filter 編譯 過濾類程式碼
-
4. 過濾訊息
-
4.1 Consumer 從 Filtersrv 拉取訊息
-
4.2 Filtersrv 從 Broker 拉取訊息
-
5. Filtersrv 高可用
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
-
友情提示:歡迎關註公眾號【芋道原始碼】。?關註後,拉你進【原始碼圈】微信群和【芋艿】搞基嗨皮。
1. 概述
Filtersrv
,負責自定義規則過濾 Consumer
從 Broker
拉取的訊息。
為什麼 Broker
不提供過濾訊息的功能呢?我們來看看官方的說法:
Broker 端訊息過濾
在 Broker 中,按照 Consumer 的要求做過濾,優點是減少了對於 Consumer 無用訊息的網路傳輸。 缺點是增加了 Broker 的負擔,實現相對複雜。
(1). 淘寶 Notify 支援多種過濾方式,包含直接按照訊息型別過濾,靈活的語法運算式過濾,幾乎可以滿足最苛刻的過濾需求。
(2). 淘寶 RocketMQ 支援按照簡單的 Message Tag 過濾,也支援按照 Message Header、body 進行過濾。
(3). CORBA Notification 規範中也支援靈活的語法運算式過濾。Consumer 端訊息過濾
這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的訊息要傳輸到 Consumer 端。
就是在這種考慮下,Filtersrv
出現了。減少了 Broker
的負擔,又減少了 Consumer
接收無用的訊息。當然缺點也是有的,多了一層 Filtersrv
網路開銷。
2. Filtersrv 註冊到 Broker
-
? 一個
Filtersrv
只對應一個Broker
。 -
? 一個
Broker
可以對應多個Filtersrv
。Filtersrv
的高可用透過啟動多個Filtersrv
實現。 -
?
Filtersrv
註冊失敗時,主動退出關閉。
核心程式碼如下:
1: // ⬇️⬇️⬇️【FiltersrvController.java】
2: public boolean initialize() {
3: // ....(省略程式碼)
4:
5: // 固定間隔註冊到Broker
6: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
7:
8: @Override
9: public void run() {
10: FiltersrvController.this.registerFilterServerToBroker();
11: }
12: }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay時間太短,可能導致初始化失敗。從3=》15
13:
14: // ....(省略程式碼)
15: }
16:
17: /**
18: * 註冊Filtersrv 到 Broker
19: * !!!如果註冊失敗,關閉Filtersrv
20: */
21: public void registerFilterServerToBroker() {
22: try {
23: RegisterFilterServerResponseHeader responseHeader =
24: this.filterServerOuterAPI.registerFilterServerToBroker(
25: this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());
26: this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()
27: .setDefaultBrokerId(responseHeader.getBrokerId());
28:
29: if (null == this.brokerName) {
30: this.brokerName = responseHeader.getBrokerName();
31: }
32:
33: log.info("register filter server to broker OK, Return: {} {}",
34: this.localAddr(),
35: this.filtersrvConfig.getConnectWhichBroker(),
36: responseHeader.getBrokerName(),
37: responseHeader.getBrokerId());
38: } catch (Exception e) {
39: log.warn("register filter server Exception", e);
40:
41: log.warn("access broker failed, kill oneself");
42: System.exit(-1); // 異常退出
43: }
44: }
3. 過濾類
3.1 Consumer 訂閱時設定 過濾類程式碼
-
?
Consumer
針對每個Topic
可以訂閱不同的過濾類程式碼
。
1: // ⬇️⬇️⬇️【DefaultMQPushConsumer.java】
2: @Override
3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
4: this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
5: }
3.2 Consumer 上傳 過濾類程式碼
-
?
Consumer
心跳註冊到Broker
的同時,上傳過濾類程式碼
到Broker
對應的所有Filtersrv
。
1: // ⬇️⬇️⬇️【MQClientInstance.java】
2: /**
3: * 傳送心跳到Broker,上傳過濾類原始碼到Filtersrv
4: */
5: public void sendHeartbeatToAllBrokerWithLock() {
6: if (this.lockHeartbeat.tryLock()) {
7: try {
8: this.sendHeartbeatToAllBroker();
9: this.uploadFilterClassSource();
10: } catch (final Exception e) {
11: log.error("sendHeartbeatToAllBroker exception", e);
12: } finally {
13: this.lockHeartbeat.unlock();
14: }
15: } else {
16: log.warn("lock heartBeat, but failed.");
17: }
18: }
19:
20: /**
21: * 上傳過濾類到Filtersrv
22: */
23: private void uploadFilterClassSource() {
24: Iterator> it = this.consumerTable.entrySet().iterator();
25: while (it.hasNext()) {
26: Entry next = it.next();
27: MQConsumerInner consumer = next.getValue();
28: if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
29: Set subscriptions = consumer.subscriptions();
30: for (SubscriptionData sub : subscriptions) {
31: if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
32: final String consumerGroup = consumer.groupName();
33: final String className = sub.getSubString();
34: final String topic = sub.getTopic();
35: final String filterClassSource = sub.getFilterClassSource();
36: try {
37: this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
38: } catch (Exception e) {
39: log.error("uploadFilterClassToAllFilterServer Exception", e);
40: }
41: }
42: }
43: }
44: }
45: }
3.3 Filter 編譯 過濾類程式碼
-
?
Filtersrv
處理Consumer
上傳的過濾類程式碼
,併進行編譯使用。
核心程式碼如下:
1: // ⬇️⬇️⬇️【FilterClassManager.java】
2: /**
3: * 註冊過濾類
4: *
5: * @param consumerGroup 消費分組
6: * @param topic Topic
7: * @param className 過濾類名
8: * @param classCRC 過濾類原始碼CRC
9: * @param filterSourceBinary 過濾類原始碼
10: * @return 是否註冊成功
11: */
12: public boolean registerFilterClass(final String consumerGroup, final String topic,
13: final String className, final int classCRC, final byte[] filterSourceBinary) {
14: final String key = buildKey(consumerGroup, topic);
15: // 判斷是否要註冊新的過濾類
16: boolean registerNew = false;
17: FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);
18: if (null == filterClassInfoPrev) {
19: registerNew = true;
20: } else {
21: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
22: if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 類有變化
23: registerNew = true;
24: }
25: }
26: }
27: // 註冊新的過濾類
28: if (registerNew) {
29: synchronized (this.compileLock) {
30: filterClassInfoPrev = this.filterClassTable.get(key);
31: if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {
32: return true;
33: }
34: try {
35: FilterClassInfo filterClassInfoNew = new FilterClassInfo();
36: filterClassInfoNew.setClassName(className);
37: filterClassInfoNew.setClassCRC(0);
38: filterClassInfoNew.setMessageFilter(null);
39:
40: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {
41: String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
42: // 編譯新的過濾類
43: Class> newClass = DynaCode.compileAndLoadClass(className, javaSource);
44: // 建立新的過濾類物件
45: Object newInstance = newClass.newInstance();
46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
47: filterClassInfoNew.setClassCRC(classCRC);
48: }
49:
50: this.filterClassTable.put(key, filterClassInfoNew);
51: } catch (Throwable e) {
52: String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",
53: consumerGroup, topic, className);
54: log.error(info, e);
55: return false;
56: }
57: }
58: }
59:
60: return true;
61: }
4. 過濾訊息
4.1 Consumer 從 Filtersrv 拉取訊息
-
?
Consumer
拉取 使用過濾類方式訂閱 的消費訊息時,從Broker
對應的Filtersrv
串列隨機選擇一個拉取訊息。如果選擇不到Filtersrv
,則無法拉取訊息。因此,Filtersrv
一定要做高可用。
1: // ⬇️⬇️⬇️【PullAPIWrapper.java】
2: /**
3: * 拉取訊息核心方法
4: *
5: * @param mq 訊息嘟列
6: * @param subExpression 訂閱運算式
7: * @param subVersion 訂閱版本號
8: * @param offset 拉取佇列開始位置
9: * @param maxNums 批次拉 取訊息數量
10: * @param sysFlag 拉取系統標識
11: * @param commitOffset 提交消費進度
12: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間
13: * @param timeoutMillis 請求broker超時時間
14: * @param communicationMode 通訊樣式
15: * @param pullCallback 拉取回呼
16: * @return 拉取訊息結果。只有通訊樣式為同步時,才傳回結果,否則傳回null。
17: * @throws MQClientException 當尋找不到 broker 時,或發生其他client異常
18: * @throws RemotingException 當遠端呼叫發生異常時
19: * @throws MQBrokerException 當 broker 發生異常時。只有通訊樣式為同步時才會發生該異常。
20: * @throws InterruptedException 當發生中斷異常時
21: */
22: protected PullResult pullKernelImpl(
23: final MessageQueue mq,
24: final String subExpression,
25: final long subVersion,
26: final long offset,
27: final int maxNums,
28: final int sysFlag,
29: final long commitOffset,
30: final long brokerSuspendMaxTimeMillis,
31: final long timeoutMillis,
32: final CommunicationMode communicationMode,
33: final PullCallback pullCallback
34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
35: // // ....(省略程式碼)
36: // 請求拉取訊息
37: if (findBrokerResult != null) {
38: // ....(省略程式碼)
39: // 若訂閱topic使用過濾類,使用filtersrv獲取訊息
40: String brokerAddr = findBrokerResult.getBrokerAddr();
41: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
43: }
44:
45: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
46: brokerAddr,
47: requestHeader,
48: timeoutMillis,
49: communicationMode,
50: pullCallback);
51:
52: return pullResult;
53: }
54:
55: // Broker資訊不存在,則丟擲異常
56: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
57: }
58:
59: /**
60: * 計算filtersrv地址。如果有多個filtersrv,隨機選擇一個。
61: *
62: * @param topic Topic
63: * @param brokerAddr broker地址
64: * @return filtersrv地址
65: * @throws MQClientException 當filtersrv不存在時
66: */
67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
68: throws MQClientException {
69: ConcurrentHashMap topicRouteTable = this.mQClientFactory.getTopicRouteTable();
70: if (topicRouteTable != null) {
71: TopicRouteData topicRouteData = topicRouteTable.get(topic);
72: List list = topicRouteData.getFilterServerTable().get(brokerAddr);
73: if (list != null && !list.isEmpty()) {
74: return list.get(randomNum() % list.size());
75: }
76: }
77: throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
78: + topic, null);
79: }
4.2 Filtersrv 從 Broker 拉取訊息
-
?
Filtersrv
拉取訊息後,會建議Consumer
向Broker主節點
拉取訊息。 -
?
Filtersrv
可以理解成一個Consumer
,向Broker
拉取訊息時,實際使用的DefaultMQPullConsumer.java
的方法和邏輯。
1: // ⬇️⬇️⬇️【DefaultRequestProcessor.java】
2: /**
3: * 拉取訊息
4: *
5: * @param ctx 拉取訊息context
6: * @param request 拉取訊息請求
7: * @return 響應
8: * @throws Exception 當發生異常時
9: */
10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
11: final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
12: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
13: final PullMessageRequestHeader requestHeader =
14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
15:
16: final FilterContext filterContext = new FilterContext();
17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
18:
19: response.setOpaque(request.getOpaque());
20:
21: DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
22:
23: // 校驗Topic過濾類是否完整
24: final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
25: if (null == findFilterClass) {
26: response.setCode(ResponseCode.SYSTEM_ERROR);
27: response.setRemark("Find Filter class failed, not registered");
28: return response;
29: }
30: if (null == findFilterClass.getMessageFilter()) {
31: response.setCode(ResponseCode.SYSTEM_ERROR);
32: response.setRemark("Find Filter class failed, registered but no class");
33: return response;
34: }
35:
36: // 設定下次請求從 Broker主節點。
37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
38:
39: MessageQueue mq = new MessageQueue();
40: mq.setTopic(requestHeader.getTopic());
41: mq.setQueueId(requestHeader.getQueueId());
42: mq.setBrokerName(this.filtersrvController.getBrokerName());
43: long offset = requestHeader.getQueueOffset();
44: int maxNums = requestHeader.getMaxMsgNums();
45:
46: final PullCallback pullCallback = new PullCallback() {
47:
48: @Override
49: public void onSuccess(PullResult pullResult) {
50: responseHeader.setMaxOffset(pullResult.getMaxOffset());
51: responseHeader.setMinOffset(pullResult.getMinOffset());
52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
53: response.setRemark(null);
54:
55: switch (pullResult.getPullStatus()) {
56: case FOUND:
57: response.setCode(ResponseCode.SUCCESS);
58:
59: List msgListOK = new ArrayList();
60: try {
61: for (MessageExt msg : pullResult.getMsgFoundList()) {
62: // 使用過濾類過濾訊息
63: boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
64: if (match) {
65: msgListOK.add(msg);
66: }
67: }
68:
69: if (!msgListOK.isEmpty()) {
70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
71: return;
72: } else {
73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
74: }
75: } catch (Throwable e) {
76: final String error =
77: String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
78: requestHeader.getConsumerGroup(), requestHeader.getTopic());
79: log.error(error, e);
80:
81: response.setCode(ResponseCode.SYSTEM_ERROR);
82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
84: return;
85: }
86:
87: break;
88: case NO_MATCHED_MSG:
89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
90: break;
91: case NO_NEW_MSG:
92: response.setCode(ResponseCode.PULL_NOT_FOUND);
93: break;
94: case OFFSET_ILLEGAL:
95: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
96: break;
97: default:
98: break;
99: }
100:
101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
102: }
103:
104: @Override
105: public void onException(Throwable e) {
106: response.setCode(ResponseCode.SYSTEM_ERROR);
107: response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
109: return;
110: }
111: };
112:
113: // 拉取訊息
114: pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
115: return null;
116: }