上篇文章主要梳理了NameServer的啟動器和配置資訊,並複習了JVM中的關閉鉤子這個知識點。這篇文章看下NameServer的其他模組。建議帶著如下三個問題閱讀:
-
NameServer管理哪些資訊?如何管理的?
-
NameServer中對Netty的使用案例?
-
NameServer中對Java併發程式設計使用案例?
一、NamesrvController
-
作用:NameServer模組的控制器
-
主要屬性:
-
namesrvConfig:name server的配置資訊
-
nettyServerConfig:name server中作為netty服務端的配置
-
scheduledExecutorService:排程執行緒池,用於:(1)週期性檢查broker資訊;(2)週期性列印路由資訊;這兩個檢查每隔5秒交替進行。
-
kvConfigManager:name server配置的操作介面
-
routeInfoManager:name server路由資訊的操作介面
-
remotingServer:netty伺服器
-
brokerHousekeepingService:監聽連線的broker的通道的關閉或異常事件,用於清理broker資訊;
-
remotingExecutor:服務端處理請求的執行緒池
程式碼如下:
public class NamesrvController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//name server的配置
private final NamesrvConfig namesrvConfig;
//netty server的配置定義
private final NettyServerConfig nettyServerConfig;
//建立一個具備排程功能的執行緒池,該執行緒池裡只有一個執行緒,用於:(1)週期性檢查broker資訊;(2)週期性列印路由資訊
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//name server配置的操作介面
private final KVConfigManager kvConfigManager;
//name server路由資訊的操作介面
private final RouteInfoManager routeInfoManager;
//伺服器
private RemotingServer remotingServer;
//broker資訊清理器,監聽通道事件
private BrokerHousekeepingService brokerHousekeepingService;
//服務端處理請求的執行緒池
private ExecutorService remotingExecutor;
private Configuration configuration;
//other code....
}
-
主要方法
-
initialize:初始化
public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); //伺服器啟動後5秒,開始每隔10秒檢查broker的執行狀態 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //伺服器啟動後1秒,開始每隔10秒檢查 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }
-
registerProcessor:註冊處理器
//在name server伺服器上註冊請求處理器,預設是DefaultRequestProcessor private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
-
其他還有:構造方法、start方法、shutdown方法
Java併發:
-
Executors.newFixedThreadPool(),用於建立固定數量的執行緒池,根據執行緒池的執行原理:執行緒池啟動時候沒有執行緒,當新任務到來時就建立執行緒處理;由於coreSize和maxSize設定為相同大小,如果任務來的時候執行緒已經達到coreSize,就直接放入等待佇列;keepAlive設定為0,目的是讓執行緒數不會超過coreSize;blockqueue設定為LinkedBlockingQueue,表示是無界佇列,最多可以放Integer.MAX_VALUE個任務。
public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
(), threadFactory); } -
週期執行緒池
NameServerController中使用了排程執行緒池,我們看下建立一個排程執行緒池的方法,即Executors.newSingleThreadScheduledExecutor(),該方法的定義如下所示:
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
這種執行緒池的建立又委託給了DelegatedScheduledExecutorService類,這裡為什麼這麼設計,不是太理解。不過可以看下真正建立排程執行緒池的程式碼:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
上面這個方法,關鍵在於兩點:(1)maxSize選了Integer.MAX_VALUE;(2)任務佇列使用了延遲佇列;再回頭去看那個委託類的程式碼,就可以明白,委託類包裝了ScheduledExecutorService執行器,提供了延遲或週期執行的介面。
/** * A wrapper class that exposes only the ScheduledExecutorService * methods of a ScheduledExecutorService implementation. */ static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public
ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } } 找到上面幾個主要類和介面的類圖,再綜合上面的程式碼,可以這麼理解:Executors是一個工具類,提供了生成不同的執行緒池的工廠方法,其中包括newSingleThreadScheduledExecutor方法,由於ScheduledExecutorService擴充套件了ExecutorService介面,同時又想重用AbstractExecutorService中的一些方法,因此需要一個委託類,將ExecutorService和ScheduledExecutorService的功能整合在一個類中。
Netty
RemotingServer是name server中的通訊服務端,在name controller初始化name server模組的時候,會將name server的請求處理器註冊到netty伺服器上。
二、DefaultRequestProcessor
在NameServerController中會註冊請求處理器,那麼name server的請求處理器實現了哪些介面呢,請看程式碼:
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController;
public DefaultRequestProcessor(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
//其他具體的實現方法
}
從這個程式碼中可以看出兩個方面的內容:
-
如何使用Netty處理網路請求。關鍵資料結構:(1)RemotingCommand:自定義的協議,攜帶請求引數和響應(2)ChannelHandlerContext:netty的資料結構,攜帶channel相關的資訊。設計模型:processRequest:透過請求碼進行請求轉發;
-
請求處理方法(跟協議相關,具體參見remote模組)(1)processRequest:請求分發;(2)putKVConfig:將配置資訊放在記憶體中;(3)getKVConfig:傳回配置資訊(4)deleteKVConfig:刪除配置資訊;(5)註冊broker,支援兩個註冊方式:帶過濾服務的(MQ版本在V3011之後的)、不帶過濾服務的,等其他處理方法。
三、BrokerHousekeepingService
該模組實現了ChannelEventListener介面,每個broker都會跟name server建立一個連線通道,當這個通道發生異常事件時,需要及時在name server這邊清理掉對應的broker資訊。異常事件的型別有:(1)通道關閉時;(2)通道丟擲異常時;(3)通道空閑時。
public class BrokerHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
四、RouteInfoManager
這個模組是name server的核心模組,真正管理broker、訊息佇列等相關資訊的地方。程式碼如下:
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap> topicQueueTable;
private final HashMap brokerAddrTable;
private final HashMap > clusterAddrTable;
private final HashMap brokerLiveTable;
private final HashMap/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap>(1024);
this.brokerAddrTable = new HashMap(128);
this.clusterAddrTable = new HashMap>(32);
this.brokerLiveTable = new HashMap(256);
this.filterServerTable = new HashMap>(256);
}
//對外暴露的方法
}
主要屬性的含義如下:
-
BROKERCHANNELEXPIRED_TIME,表示一個broker距離上次發心跳包的最長時間,即120秒;
-
使用可重入讀寫鎖實現併發安全、使用輕量級的非執行緒安全容器實現高效併發;【這點非常重要】
-
topicQueueTable:用於管理topic和屬於這個topic的佇列的對映關係;
-
brokerAddrTable:用於管理某個broker和它對應的資訊
-
clusterAddrTable:用於管理broker叢集和叢集中對應的broker的對映關係
-
brokerLiveTable:用於管理broker的存活資訊
-
filterServerTable:用於管理broker和過濾服務串列【暫不理解】
關於ReentrantReadWriteLock:
-
這裡使用的鎖是非公平鎖
-
ReentrantReadWriteLock基於Sync、ReadLock、WriteLock三個模組實現,Sync負責處理公平與否的問題。ReadLock和WriteLock透過鎖外部物件ReentrantReadWriteLock來處理併發。在RoutInfoManager中的使用案例如下:
public void deleteTopic(final String topic) { try { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.remove(topic); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("deleteTopic Exception", e); } }
五、KVConfigManager
這個模組用於管理name server自己的配置資訊,配置資訊以json資訊存放在檔案中,以二維陣列形式存在於記憶體中,請看程式碼:
/**
* 管理NameServer的配置屬性
*/
public class KVConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
//可重入讀寫鎖
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//配置表
private final HashMap > configTable =
new HashMap>();
public KVConfigManager(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
//這個類對外暴露的方法,省了
}
這個類對外暴露的方法有:
-
load方法:將配置資訊載入到記憶體中
-
putKVConfig方法:將配置資訊持久化
-
deleteKVConfig方法:刪除指定的配置項
-
getKVListByNamespace和getKVConfig用於查詢配置資訊
參考資料
-
訊息佇列技術點梳理
-
netty的執行緒模型
-
《Java併發程式設計的藝術》