您好,登錄后才能下訂單哦!
本篇文章為大家展示了在Rabbitmq中利用heartbea實現心跳檢測機制的原理是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
前言
使用rabbitmq的時候,當你客戶端與rabbitmq服務器之間一段時間沒有流量,服務器將會斷開與客戶端之間tcp連接。
而你將在服務器上看這樣的日志:
missed heartbeats from client, timeout: xxs
這個間隔時間就是心跳間隔。
heartbeat通常用來檢測通信的對端是否存活(未正常關閉socket連接而異常crash)。其基本原理是檢測對應的socket連接上數據的收發是否正常,如果一段時間內沒有收發數據,則向對端發送一個心跳檢測包,如果一段時間內沒有回應則認為心跳超時,即認為對端可能異常crash了。
rabbitmq也不例外,heatbeat在客戶端和服務端之間用于檢測對端是否正常,即客戶端與服務端之間的tcp鏈接是否正常。
關于rabbitmq心跳
1.heartbeat檢測時間間隔可在配置文件rabbitmq.config中增加配置項{heartbeat,Timeout}進行配置,其中Timeout指定時間間隔,單位為秒,另外客戶端也可以配置heartbeat時間。
如果服務端沒有配置
默認代理心跳時間:
RabbitMQ 3.2.2:580秒
RabbitMQ 3.5.5:60秒
2.官方建議不要禁用心跳,且建議心跳時間為60秒。
3.心跳每 heartbeat timeout / 2 秒發送一次,服務器兩次沒有接收到則斷開tcp連接,以前的連接將失效,客戶端需要重新連接。
4.如果你使用Java, .NET and Erlang clients,服務器與客戶端會協商heartbeat時間
如果其中一個值為0,則使用兩者中較大的一個
否則,使用兩者中較小的一個
兩個值都為0,則表示要禁用心跳,則服務端與客戶端維持此tcp連接,不會斷開。
注意:在python客戶端上直接設置為0,則禁用心跳。
禁用心跳在python客戶端該如何設置:
在py3:ConnectionParameters設置heartbeat_interval=0即可。
在py2:ConnectionParameters設置heartbeat=0即可。
5.連接上的任何流量(傳輸的有效數據、確認等)都將被計入有效心跳,當然也包括心跳幀。
6.我在網上看到有人問到這個問題:
為什么服務端宕機,在心跳檢測機制下,服務器側斷開連接,而客戶端這邊不能檢測到tcp斷開,我測試過,客戶端確實不能檢測到tcp連接斷開,只有當客戶端在這個tcp有操作后,才能檢測到,當然在一個斷開的tcp連接上做操作會報錯(如發送消息)。
import pika import time credit = pika.PlainCredentials(username='cloud', password='cloud') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.32.1.12', credentials=credit)) channel = connection.channel() while True: connect_close = connection.is_closed connect_open = connection.is_open channel_close = channel.is_closed channel_open = channel.is_open print("connection is_closed ", connect_close) print("connection is_open ", connect_open) print("channel is_closed ", channel_close) print("channel is_open ", channel_open) print("") time.sleep(5)
7.一些RabbitMQ客戶端(Bunny,Java,.NET,Objective-C,Swift)提供了一種在網絡故障后自動恢復連接的機制,而pika只能通過檢測連接異常后再重新創建連接的方式。
示例代碼:通過檢測連接異常,重新創建連接:
import pika while True: try: connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) channel.start_consuming() # Don't recover if connection was closed by broker except pika.exceptions.ConnectionClosedByBroker: break # Don't recover on channel errors except pika.exceptions.AMQPChannelError: break # Recover on all other connection errors except pika.exceptions.AMQPConnectionError: continue
你也可以使用操作重試庫,例如 retry。
from retry import retry @retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3)) def consume(): connection = pika.BlockingConnection() channel = connection.channel() channel.basic_consume('test', on_message_callback) try: channel.start_consuming() # Don't recover connections closed by server except pika.exceptions.ConnectionClosedByBroker: pass consume()
heartbeat的實現
rabbitmq在收到來自客戶端的connection.tune-ok信令后,啟用心跳檢測,rabbitmq會為每個tcp連接創建兩個進程用于心跳檢測,一個進程定時檢測tcp連接上是否有數據發送(這里的發送是指rabbitmq發送數據給客戶端),如果一段時間內沒有數據發送給客戶端,則發送一個心跳包給客戶端,然后循環進行下一次檢測;另一個進程定時檢測tcp連接上是否有數據的接收,如果一段時間內沒有收到任何數據,則判定為心跳超時,最終會關閉tcp連接。另外,rabbitmq的流量控制機制可能會暫停heartbeat檢測,這里不展開描述。
涉及的源碼:
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> %%數據發送檢測進程 {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock, SendFun, heartbeat_sender, start_heartbeat_sender), %%數據接收檢測進程 {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, ReceiveFun, heartbeat_receiver, start_heartbeat_receiver), {Sender, Receiver}. start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for %% nearly 2 * TimeoutSec before sending a heartbeat in the %% boundary case heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> SendFun(), continue end}). start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out %% between 2 and 3 intervals after the last data has been %% received heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, Deb, {StatVal, SameCount} = State) -> Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, receive ... %% 定時檢測 after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> %% 收發數據有變化 if NewStatVal =/= StatVal -> %%重新開始檢測 Recurse({NewStatVal, 0}); %%未達到指定次數, 發送為0, 接收為1 SameCount < Threshold -> %%計數加1, 再次檢測 Recurse({NewStatVal, SameCount + 1}); %%heartbeat超時 true -> %%對于發送檢測超時, 向客戶端發送heartbeat包 %%對于接收檢測超時, 向父進程發送超時通知 %%由父進程觸發tcp關閉等操作 case Handler() of %%接收檢測超時 stop -> ok; %%發送檢測超時 continue -> Recurse({NewStatVal, 0}) end; ...
收發檢測的時候利用了inet模塊的getstat,查看socket的統計信息
recv_oct: 查看socket上接收的字節數
send_oct: 查看socket上發送的字節數
上述內容就是在Rabbitmq中利用heartbea實現心跳檢測機制的原理是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。