您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ broker啟動流程是什么”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“RocketMQ broker啟動流程是什么”文章能幫助大家解決問題。
本系列RocketMQ4.8注釋github地址,希望對大家有所幫助,要是覺得可以的話麻煩給點一下Star哈
前面我們已經分析完了NameServer
和producer
,從本文開始,我們將分析Broker
。
broker
的啟動類為org.apache.rocketmq.broker.BrokerStartup
,代碼如下:
public class BrokerStartup { ... public static void main(String[] args) { start(createBrokerController(args)); } ... }
在main()
方法中,僅有一行代碼,這行代碼包含了兩個操作:
createBrokerController(...)
:創建BrokerController
start(...)
:啟動Broker
接下來我們就來分析這兩個操作。
創建BrokerController
的方法為BrokerStartup#createBrokerController
,代碼如下:
/** * 創建 broker 的配置參數 */ public static BrokerController createBrokerController(String[] args) { ... try { //解析命令行參數 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); } // 處理配置 final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); // tls安全相關 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); // 配置端口 nettyServerConfig.setListenPort(10911); // 消息存儲的配置 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ... // 將命令行中的配置設置到brokerConfig對象中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); // 檢查環境變量:ROCKETMQ_HOME if (null == brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //省略一些配置 ... // 創建 brokerController final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); controller.getConfiguration().registerConfig(properties); // 初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 關閉鉤子,在關閉前處理一些操作 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { if (!this.hasShutdown) { ... // 這里會發一條注銷消息給nameServer controller.shutdown(); ... } } } }, "ShutdownHook")); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
這個方法的代碼有點長,但功能并不多,總的來說就三個功能:
處理配置:主要是處理nettyServerConfig
與nettyClientConfig
的配置,這塊就是一些配置解析的操作,處理方式與NameServer
很類似,這里就不多說了。
創建及初始化controller
:調用方法controller.initialize()
,這塊內容我們后面分析。
注冊關閉鉤子:調用Runtime.getRuntime().addShutdownHook(...)
,可以在jvm進程關閉前進行一些操作。
BrokerController
的創建及初始化是在BrokerStartup#createBrokerController
方法中進行,我們先來看看它的構造方法:
public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { // 4個核心配置信息 this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; // 管理consumer消費消息的offset this.consumerOffsetManager = new ConsumerOffsetManager(this); // 管理topic配置 this.topicConfigManager = new TopicConfigManager(this); // 處理 consumer 拉消息請求的 this.pullMessageProcessor = new PullMessageProcessor(this); this.pullRequestHoldService = new PullRequestHoldService(this); // 消息送達的監聽器 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); ... // 往外發消息的組件 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); ... }
BrokerController
的構造方法很長,基本都是一些賦值操作,代碼中已列出關鍵項,這些包括:
核心配置賦值:主要是brokerConfig
/nettyServerConfig
/nettyClientConfig
/messageStoreConfig
四個配置
ConsumerOffsetManager
:管理consumer
消費消息位置的偏移量,偏移量表示消費者組消費該topic
消息的位置,后面再消費時,就從該位置后消費,避免重復消費消息,也避免了漏消費消息。
topicConfigManager
:topic
配置管理器,就是用來管理topic
配置的,如topic
名稱,topic
隊列數量
pullMessageProcessor
:消息處理器,用來處理消費者拉消息
messageArrivingListener
:消息送達的監聽器,當生產者的消息送達時,由該監聽器監聽
brokerOuterAPI
:往外發消息的組件,如向NameServer
發送注冊/注銷消息
以上這些組件的用處,這里先混個臉熟,我們后面再分析。
我們再來看看初始化操作,方法為BrokerController#initialize
:
public boolean initialize() throws CloneNotSupportedException { // 加載配置文件中的配置 boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { // 消息存儲管理組件,管理磁盤上的消息 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 啟用了DLeger,就創建DLeger相關組件 if (messageStoreConfig.isEnableDLegerCommitLog()) { ... } // broker統計組件 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst( new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } // 加載磁盤上的記錄,如commitLog寫入的位置、消費者主題/隊列的信息 result = result && this.messageStore.load(); if (result) { // 處理 nettyServer this.remotingServer = new NettyRemotingServer( this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer( fastConfig, this.clientHousekeepingService); // 創建線程池start... 這里會創建多種類型的線程池 ... // 處理consumer pull操作的線程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); ... // 創建線程池end... // 注冊處理器 this.registerProcessor(); // 啟動定時任務start... 這里會啟動好多的定時任務 ... // 定時將consumer消費到的offset進行持久化操作,即將數據保存到磁盤上 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); ... // 啟動定時任務end... ... // 開啟 DLeger 的一些操作 if (!messageStoreConfig.isEnableDLegerCommitLog()) { ... } // 處理tls配置 if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { ... } // 初始化一些操作 initialTransaction(); initialAcl(); initialRpcHooks(); } return result; }
這個還是很長,關鍵部分都做了注釋,該方法所做的工作如下:
加載配置文件中的配置
賦值與初始化操作
創建線程池
注冊處理器
啟動定時任務
這里我們來看下注冊處理器的操作this.registerProcessor()
:
this.registerProcessor()
實際調用的方法是BrokerController#registerProcessor
,代碼如下:
public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); ... /** * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * ReplyMessageProcessor */ ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); ... }
這個方法里注冊了許許多多的處理器,這里僅列出了與消息相關的內容,如發送消息、回復消息、拉取消息等,后面在處理producer
/consumer
的消息時,就會用到這些處理器,這里先不展開分析。
我們來看下remotingServer
注冊處理器的操作,方法為NettyRemotingServer#registerProcessor
:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { ... @Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); // 注冊到processorTable 中 this.processorTable.put(requestCode, pair); } ... }
最終,這些處理器注冊到了processorTable
中,它是NettyRemotingAbstract
的成員變量,定義如下:
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>
這是一個hashMap
的結構,key
為code
,value
為Pair
,該類中有兩個成員變量:NettyRequestProcessor
、ExecutorService
,code
與NettyRequestProcessor
的映射關系就是在hashMap
里存儲的。
Runtime.getRuntime().addShutdownHook(...)
接著我們來看看注冊關閉鉤子的操作:
// 關閉鉤子,在關閉前處理一些操作 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { if (!this.hasShutdown) { ... // 這里會發一條注銷消息給nameServer controller.shutdown(); ... } } } }, "ShutdownHook"));
跟進BrokerController#shutdown
方法:
public void shutdown() { // 調用各組件的shutdown方法 ... // 發送注銷消息到NameServer this.unregisterBrokerAll(); ... // 持久化consumer的消費偏移量 this.consumerOffsetManager.persist(); // 又是調用各組件的shutdown方法 ...
這個方法里會調用各組件的shutdown()
方法、發送注銷消息給NameServer
、持久化consumer的消費偏移量,這里我們主要看發送注銷消息的方法BrokerController#unregisterBrokerAll
:
private void unregisterBrokerAll() { // 發送一條注銷消息給nameServer this.brokerOuterAPI.unregisterBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId()); }
繼續進入BrokerOuterAPI#unregisterBrokerAll
:
public void unregisterBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId ) { // 獲取所有的 nameServer,遍歷發送注銷消息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { for (String namesrvAddr : nameServerAddressList) { try { this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); } catch (Exception e) { log.warn("unregisterBroker Exception, {}", namesrvAddr, e); } } } }
這個方法里,會獲取到所有的nameServer
,然后逐個發送注銷消息,繼續進入BrokerOuterAPI#unregisterBroker
方法:
public void unregisterBroker( final String namesrvAddr, final String clusterName, final String brokerAddr, final String brokerName, final long brokerId ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); // 發送的注銷消息:RequestCode.UNREGISTER_BROKER RemotingCommand request = RemotingCommand.createRequestCommand( c, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr); }
最終調用的是RemotingClient#invokeSync
進行消息發送,請求code
是RequestCode.UNREGISTER_BROKER
,這就與NameServer
接收broker
的注銷消息對應上了。
我們再來看看Broker
的啟動流程,處理方法為BrokerController#start
:
public void start() throws Exception { // 啟動各組件 // 啟動消息存儲相關組件 if (this.messageStore != null) { this.messageStore.start(); } // 啟動 remotingServer,其實就是啟動一個netty服務,用來接收producer傳來的消息 if (this.remotingServer != null) { this.remotingServer.start(); } ... // broker對外發放消息的組件,向nameServer上報存活消息時使用了它,也是一個netty服務 if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } ... // broker 核心的心跳注冊任務 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計算得到默認30秒執行一次 }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); ... }
這個方法主要就是啟動各組件了,這里列出了幾大重要組件的啟動:
messageStore
:消息存儲組件,在這個組件里,會啟動消息存儲相關的線程,如消息的投遞操作、commitLog
文件的flush
操作、comsumeQueue
文件的flush
操作等
remotingServer
:netty
服務,用來接收請求消息,如producer
發送過來的消息
brokerOuterAPI
:也是一個netty
服務,用來對外發送消息,如向nameServer
上報心跳消息
啟動定時任務:broker
向nameServer
發送注冊消息
這里我們重點來看定時任務是如何發送心跳發送的。
處理注冊消息發送的時間間隔如下:
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)
這行代碼看著長,但意思就一句話:時間間隔可以自行配置,但不能小于10s,不能大于60s,默認是30s.
處理消息注冊的方法為BrokerController#registerBrokerAll(...)
,代碼如下:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 處理topic相關配置 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ... } // 這里會判斷是否需要進行注冊 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 進行注冊操作 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
這個方法就是用來處理注冊操作的,不過注冊前會先驗證下是否需要注冊,驗證是否需要注冊的方法為BrokerController#needRegister
, 代碼如下:
private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 判斷是否需要進行注冊 List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); // 有一個發生了變化,就表示需要注冊了 boolean needRegister = false; for (Boolean changed : changeList) { if (changed) { needRegister = true; break; } } return needRegister; }
這個方法調用了brokerOuterAPI.needRegister(...)
來判斷broker
是否發生了變化,只要一個NameServer
上發生了變化,就說明需要進行注冊操作。
brokerOuterAPI.needRegister(...)
是如何判斷broker
是否發生了變化的呢?繼續跟進BrokerOuterAPI#needRegister
:
public List<Boolean> needRegister( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final TopicConfigSerializeWrapper topicConfigWrapper, final int timeoutMills) { final List<Boolean> changedList = new CopyOnWriteArrayList<>(); // 獲取所有的 nameServer List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 遍歷所有的nameServer,逐一發送請求 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); ... // 向nameServer發送消息,命令是 RequestCode.QUERY_DATA_VERSION RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); // 把當前的 DataVersion 發到 nameServer request.setBody(topicConfigWrapper.getDataVersion().encode()); // 發請求到nameServer RemotingCommand response = remotingClient .invokeSync(namesrvAddr, request, timeoutMills); DataVersion nameServerDataVersion = null; Boolean changed = false; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); changed = queryDataVersionResponseHeader.getChanged(); byte[] body = response.getBody(); if (body != null) { // 拿到 DataVersion nameServerDataVersion = DataVersion.decode(body, D ataVersion.class); // 這里是判斷的關鍵 if (!topicConfigWrapper.getDataVersion() .equals(nameServerDataVersion)) { changed = true; } } if (changed == null || changed) { changedList.add(Boolean.TRUE); } } default: break; } ... } catch (Exception e) { ... } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("query dataversion from nameserver countDownLatch await Exception", e); } } return changedList; }
這個方法里,先是遍歷所有的nameServer
,向每個nameServer
都發送一條code
為RequestCode.QUERY_DATA_VERSION
的參數,參數為當前broker
的DataVersion
,當nameServer
收到消息后,就返回nameServer
中保存的、與當前broker
對應的DataVersion
,當兩者版本不相等時,就表明當前broker
發生了變化,需要重新注冊。
DataVersion
是個啥呢?它的部分代碼如下:
public class DataVersion extends RemotingSerializable { // 時間戳 private long timestamp = System.currentTimeMillis(); // 計數器,可以理解為最近的版本號 private AtomicLong counter = new AtomicLong(0); public void nextVersion() { this.timestamp = System.currentTimeMillis(); this.counter.incrementAndGet(); } /** * equals 方法,當 timestamp 與 counter 都相等時,則兩者相等 */ @Override public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final DataVersion that = (DataVersion) o; if (timestamp != that.timestamp) { return false; } if (counter != null && that.counter != null) { return counter.longValue() == that.counter.longValue(); } return (null == counter) && (null == that.counter); } ... }
從DataVersion
的equals()
方法來看,只有當timestamp
與counter
都相等時,兩個DataVersion
對象才相等。那這兩個值會在哪里被修改呢?從DataVersion#nextVersion
方法的調用情況來看,引起這兩個值的變化主要有兩種:
broker
上新創建了一個 topic
topic
的發了的變化
在這兩種情況下,DataVersion#nextVersion
方法被調用,從而引起DataVersion
的改變。DataVersion
改變了,就表明當前broker
需要向nameServer
注冊了。
讓我們再回到BrokerController#registerBrokerAll(...)
方法:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { ... // 這里會判斷是否需要進行注冊 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 進行注冊操作 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
處理注冊的方法為BrokerController#doRegisterBrokerAll
,稍微看下它的流程:
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 注冊 List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), // 這個對象里就包含了當前broker的版本信息 topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); ... }
繼續跟下去,最終調用的是BrokerOuterAPI#registerBroker
方法:
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // 構建請求 RemotingCommand request = RemotingCommand .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); // 處理發送操作:sendOneWay if (oneway) { try { // 注冊操作 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; ... } .... }
所以,所謂的注冊操作,就是當nameServer
發送一條code
為RequestCode.REGISTER_BROKER
的消息,消息里會帶上當前broker
的topic
信息、版本號等。
關于“RocketMQ broker啟動流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。