您好,登錄后才能下訂單哦!
本篇文章會盡力全面的介紹RocketMQ和Kafka各個關鍵點的比較,希望大家讀完能有所收獲。
RocketMQ前身叫做MetaQ, 在MeataQ發布3.0版本的時候改名為RocketMQ,其本質上的設計思路和Kafka類似,但是和Kafka不同的是其使用Java進行開發,由于在國內的Java受眾群體遠遠多于Scala,所以RocketMQ是很多以Java語言為主的公司的首選。同樣的RocketMQ和Kafka都是Apache基金會中的頂級項目,他們社區的活躍度都非常高,項目更新迭代也非常快。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
直接定義好一個producer,創建好Message,調用send方法即可。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
對于RocketMQ先拋出幾個問題:
RocketMQ的topic和隊列是什么樣的,和Kafka的分區有什么不同?
RocketMQ網絡模型是什么樣的,和Kafka對比如何?
對于RocketMQ的架構圖,在大體上來看和Kafka并沒有太多的差別,但是在很多細節上是有很多差別的,接下來會一一進行講述。
在3.1的架構中我們有多個Producer,多個主Broker,多個從Broker,每個Producer可以對應多個Topic,每個Consumer也可以消費多個Topic。
Broker信息會上報至NameServer,Consumer會從NameServer中拉取Broker和Topic的信息。
Producer:消息生產者,向Broker發送消息的客戶端
Consumer:消息消費者,從Broker讀取消息的客戶端
Broker:消息中間的處理節點,這里和kafka不同,kafka的Broker沒有主從的概念,都可以寫入請求以及備份其他節點數據,RocketMQ只有主Broker節點才能寫,一般也通過主節點讀,當主節點有故障或者一些其他特殊情況才會使用從節點讀,有點類似- 于mysql的主從架構。
Topic:消息主題,一級消息類型,生產者向其發送消息, 消費者讀取其消息。
Group:分為ProducerGroup,ConsumerGroup,代表某一類的生產者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說發送和消費的消息都是一樣的。
Tag:Kafka中沒有這個概念,Tag是屬于二級消息類型,一般來說業務有關聯的可以使用同一個Tag,比如訂單消息隊列,使用Topic_Order,Tag可以分為Tag_食品訂單,Tag_服裝訂單等等。
Queue: 在kafka中叫Partition,每個Queue內部是有序的,在RocketMQ中分為讀和寫兩種隊列,一般來說讀寫隊列數量一致,如果不一致就會出現很多問題。
很多朋友都在問什么是無狀態呢?狀態的有無實際上就是數據是否會做存儲,有狀態的話數據會被持久化,無狀態的服務可以理解就是一個內存服務,NameServer本身也是一個內存服務,所有數據都存儲在內存中,重啟之后都會丟失。
在RocketMQ中的每一條消息,都有一個Topic,用來區分不同的消息。一個主題一般會有多個消息的訂閱者,當生產者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生產者寫入的新消息。
在Topic中有分為了多個Queue,這其實是我們發送/讀取消息通道的最小單位,我們發送消息都需要指定某個寫入某個Queue,拉取消息的時候也需要指定拉取某個Queue,所以我們的順序消息可以基于我們的Queue維度保持隊列有序,如果想做到全局有序那么需要將Queue大小設置為1,這樣所有的數據都會在Queue中有序。
在上圖中我們的Producer會通過一些策略進行Queue的選擇:
非順序消息:非順序消息一般直接采用輪訓發送的方式進行發送。
我們同一組Consumer也會根據一些策略來選Queue,常見的比如平均分配或者一致性Hash分配。
要注意的是當Consumer出現下線或者上線的時候,這里需要做重平衡,也就是Rebalance,RocketMQ的重平衡機制如下:
定時拉取broker,topic的最新信息
每隔20s做重平衡
隨機選取當前Topic的一個主Broker,這里要注意的是不是每次重平衡所有主Broker都會被選中,因為會存在一個Broker再多個Broker的情況。
獲取當前Broker,當前ConsumerGroup的所有機器ID。
由于重平衡是定時做的,所以這里有可能會出現某個Queue同時被兩個Consumer消費,所以會出現消息重復投遞。
Kafka的重平衡機制和RocketMQ不同,Kafka的重平衡是通過Consumer和Coordinator聯系來完成的,當Coordinator感知到消費組的變化,會在心跳過程中發送重平衡的信號,然后由一個ConsumerLeader進行重平衡選擇,然后再由Coordinator將結果通知給所有的消費者。
在RocketMQ中Queue被分為讀和寫兩種,在最開始接觸RocketMQ的時候一直以為讀寫隊列數量配置不一致不會出現什么問題的,比如當消費者機器很多的時候我們配置很多讀的隊列,但是實際過程中發現會出現消息無法消費和根本沒有消息消費的情況。
當寫的隊列數量大于讀的隊列的數量,當大于讀隊列這部分ID的寫隊列的數據會無法消費,因為不會將其分配給消費者。
這個功能在RocketMQ在我看來明顯沒什么用,因為基本上都會設置為讀寫隊列大小一樣,這個為啥不直接將其進行統一,反而容易讓用戶配置不一樣出現錯誤。
這個問題在RocketMQ的Issue里也沒有收到好的答案。
一般來說消息隊列的消費模型分為兩種,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
基于推送模型的消息系統,由消息代理記錄消費狀態。消息代理將消息推送到消費者后,標記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。比如當我們把已經把消息發送給消費者之后,由于消費進程掛掉或者由于網絡原因沒有收到這條消息,如果我們在消費代理將其標記為已消費,這個消息就永久丟失了。如果我們利用生產者收到消息后回復這種方法,消息代理需要記錄消費狀態,這種不可取。
用過RocketMQ的同學肯定不禁會想到,在RocketMQ中不是提供了兩種消費者嗎?
MQPullConsumer
?和?MQPushConsumer
?,其中?MQPushConsumer
?不就是我們的推模型嗎?其實這兩種模型都是客戶端主動去拉消息,其中的實現區別如下:
MQPullConsumer:每次拉取消息需要傳入拉取消息的offset和每次拉取多少消息量,具體拉取哪里的消息,拉取多少是由客戶端控制。
消費模式我們分為兩種,集群消費,廣播消費:
集群消費: 同一個GroupId都屬于一個集群,一般來說一條消息只會被任意一個消費者處理。
在Kafka中使用的原生的socket實現網絡通信,而RocketMQ使用的是Netty網絡框架,現在越來越多的中間件都不會直接選擇原生的socket,而是使用的Netty框架,主要得益于下面幾個原因:
API使用簡單,不需要關心過多的網絡細節,更專注于中間件邏輯。
性能高。
選擇框架是一方面,而想要保證網絡通信的高效,網絡線程模型也是一方面,我們常見的有1+N(1個Acceptor線程,N個IO線程),1+N+M(1個acceptor線程,N個IO線程,M個worker線程)等模型,RocketMQ使用的是1+N1+N2+M的模型,如下圖所示:
1個acceptor線程,N1個IO線程,N2個線程用來做Shake-hand,SSL驗證,編解碼;M個線程用來做業務處理。這樣的好處將編解碼,和SSL驗證等一些可能耗時的操作放在了一個單獨的線程池,不會占據我們業務線程和IO線程。
做為一個好的消息系統,高性能的存儲,高可用都不可少。
RocketMQ和Kafka的存儲核心設計有很大的不同,所以其在寫入性能方面也有很大的差別,這是16年阿里中間件團隊對RocketMQ和Kafka不同Topic下做的性能測試:
從圖上可以看出:
Kafka在Topic數量由64增長到256時,吞吐量下降了98.37%。
RocketMQ在Topic數量由64增長到256時,吞吐量只下降了16%。
這是為什么呢?kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節點上。同時在kafka的機器上,每個Partition其實都會對應一個日志目錄,在目錄下面會對應多個日志分段。所以如果Topic很多的時候Kafka雖然寫文件是順序寫,但實際上文件過多,會造成磁盤IO競爭非常激烈。
那RocketMQ為什么在多Topic的情況下,依然還能很好的保持較多的吞吐量呢?我們首先來看一下RocketMQ中比較關鍵的文件:
這里有四個目錄(這里的解釋就直接用RocketMQ官方的了):
commitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;
config:保存一些配置信息,包括一些Group,Topic以及Consumer消費offset等信息。
我們發現我們的消息主體數據并沒有像Kafka一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入IO競爭就非常小,可以在很多Topic的時候依然保持很高的吞吐量。有同學說這里的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創建文件,那么文件數量依然很多,在這里ConsumeQueue的寫入的數據量很小,每條消息只有20個字節,30W條數據也才6M左右,所以其實對我們的影響相對Kafka的Topic之間影響是要小很多的。我們整個的邏輯可以如下:
Producer不斷的再往CommitLog添加新的消息,有一個定時任務ReputService會不斷的掃描新添加進來的CommitLog,然后不斷的去構建ConsumerQueue和Index。
注意:這里指的都是普通的硬盤,在SSD上面多個文件并發寫入和單個文件寫入影響不大。
讀取消息
Kafka中每個Partition都會是一個單獨的文件,所以當消費某個消息的時候,會很好的出現順序讀,我們知道OS從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取,將數據放入PageCache,所以Kafka的讀取消息性能比較好。
RocketMQ讀取流程如下:
先讀取ConsumerQueue中的offset對應CommitLog物理的offset
ConsumerQueue也是每個Queue一個單獨的文件,并且其文件體積小,所以很容易利用PageCache提高性能。而CommitLog,由于同一個Queue的連續消息在CommitLog其實是不連續的,所以會造成隨機讀,RocketMQ對此做了幾個優化:
Mmap映射讀取,Mmap的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷
使用DeadLine調度算法+SSD存儲盤
3.6.2.1 集群模式
我們首先需要選擇一種集群模式,來適應我們可忍耐的可用程度,一般來說分為三種:
單Master:這種模式,可用性最低,但是成本也是最低,一旦宕機,所有都不可用。這種一般只適用于本地測試。
單Master多SLAVE:這種模式,可用性一般,如果主宕機,那么所有寫入都不可用,讀取依然可用,如果master磁盤損壞,可以依賴slave的數據。
多Master:這種模式,可用性一般,如果出現部分master宕機,那么這部分master上的消息都不可消費,也不可寫數據,如果一個Topic的隊列在多個Master上都有,那么可以保證沒有宕機的那部分可以正常消費,寫入。如果master的磁盤損壞會導致消息丟失。
一般來說投入生產環境的話都會選擇第四種,來保證最高的可用性。
3.6.2.2 消息的可用性
當我們選擇好了集群模式之后,那么我們需要關心的就是怎么去存儲和復制這個數據,rocketMQ對消息的刷盤提供了同步和異步的策略來滿足我們的,當我們選擇同步刷盤之后,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關信息,選擇同步刷盤可以盡最大程度滿足我們的消息不會丟失。
除了存儲有選擇之后,我們的主從同步提供了同步和異步兩種模式來進行復制,當然選擇同步可以提升可用性,但是消息的發送RT時間會下降10%左右。
我們上面對于master-slave部署模式已經做了很多分析,我們發現,當master出現問題的時候,我們的寫入怎么都會不可用,除非恢復master,或者手動將我們的slave切換成master,導致了我們的Slave在多數情況下只有讀取的作用。RocketMQ在最近的幾個版本中推出了Dleger-RocketMQ,使用Raft協議復制CommitLog,并且自動進行選主,這樣master宕機的時候,寫入依然保持可用。
定時消息和延時消息在實際業務場景中使用的比較多,比如下面的一些場景:
訂單超時未支付自動關閉,因為在很多場景中下單之后庫存就被鎖定了,這里需要將其進行超時關閉。
需要一些延時的操作,比如一些兜底的邏輯,當做完某個邏輯之后,可以發送延時消息比如延時半個小時,進行兜底檢查補償。
在開源版本的RocketMQ中延時消息并不支持任意時間的延時,需要設置幾個固定的延時等級,目前默認設置為:?1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
?,從1s到2h分別對應著等級1到18,而阿里云中的版本(要付錢)是可以支持40天內的任何時刻(毫秒級別)。我們先看下在RocketMQ中定時任務原理圖:
Step1:Producer在自己發送的消息上設置好需要延時的級別。
Step2: Broker發現此消息是延時消息,將Topic進行替換成延時Topic,每個延時級別都會作為一個單獨的queue,將自己的Topic作為額外信息存儲。
Step3: 構建ConsumerQueue
Step4: 定時任務定時掃描每個延時級別的ConsumerQueue。
Step5: 拿到ConsumerQueue中的CommitLog的Offset,獲取消息,判斷是否已經達到執行時間
可以看見延時消息是利用新建單獨的Topic和Queue來實現的,如果我們要實現40天之內的任意時間度,基于這種方案,那么需要40?24?60?60?1000個queue,這樣的成本是非常之高的,那阿里云上面的支持任意時間是怎么實現的呢?這里猜測是持久化二級TimeWheel時間輪,二級時間輪用于替代我們的ConsumeQueue,保存Commitlog-Offset,然后通過時間輪不斷的取出當前已經到了的時間,然后再次投遞消息。具體的實現邏輯需要后續會單獨寫一篇文章。
事務消息同樣的也是RocketMQ中的一大特色,其可以幫助我們完成分布式事務的最終一致性,有關分布式事務相關的可以看我以前的很多文章都有很多詳細的介紹。
具體使用事務消息步驟如下:
Step1:調用sendMessageInTransaction發送事務消息
Step2: ?如果發送成功,則執行本地事務。
Step3: ?如果執行本地事務成功則發送commit,如果失敗則發送rollback。
事務消息的使用整個流程相對之前幾種消息使用比較復雜,下面是事務消息實現的原理圖:
Step1: 發送事務消息,這里也叫做halfMessage,會將Topic替換為HalfMessage的Topic。
Step2: 發送commit或者rollback,如果是commit這里會查詢出之前的消息,然后將消息復原成原Topic,并且發送一個OpMessage用于記錄當前消息可以刪除。如果是rollback這里會直接發送一個OpMessage刪除。
Step3: 在Broker有個處理事務消息的定時任務,定時對比halfMessage和OpMessage,如果有OpMessage且狀態為刪除,那么該條消息必定commit或者rollback,所以就可以刪除這條消息。
Step4: 如果事務超時(默認是6s),還沒有opMessage,那么很有可能commit信息丟了,這里會去反查我們的Producer本地事務狀態。
我們發現RocketMQ實現事務消息也是通過修改原Topic信息,和延遲消息一樣,然后模擬成消費者進行消費,做一些特殊的業務邏輯。當然我們還可以利用這種方式去做RocketMQ更多的擴展。
這里讓我們在回到文章中提到的幾個問題:
RocketMQ的topic和隊列是什么樣的,和Kafka的分區有什么不同?
RocketMQ網絡模型是什么樣的,和Kafka對比如何?
想必讀完這篇文章,你心中已經有答案
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。