您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“如何解決python3中pika之連接斷開的問題”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“如何解決python3中pika之連接斷開的問題”這篇文章吧。
問題描述
在消費rabbitMQ隊列時, 每次進入回調函數內需要進行一些比較耗時的操作;操作完成后給rabbitMQ server發送ack信號以dequeue本條消息。
問題就發生在發送ack操作時, 程序提示鏈接已被斷開或socket error。
源碼示例
#!/usr/bin #coding: utf-8 import pika import time USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' def callback(ch, method, properties, body): print(body) time.sleep(600) ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest") ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest") chan.basic_consume(callback, queue=TEST_QUEUE) chan.start_consuming() if __name__ == "__main__": test_main()
運行一段時間后, 就會報錯:
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None [CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed [ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')
問題排查
猜測:pika客戶端沒有及時發送心跳,連接被server斷開
一開始修改了heartbeat_interval參數值, 示例如下:
def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER, PWD))) # ....
修改后運行依然報錯,后來想想應該單線程被一直占用,pika無法發送心跳;
于是又加了個心跳線程, 示例如下:
#!/usr/bin #coding: utf-8 import pika import time import logging import threading USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' class Heartbeat(threading.Thread): def __init__(self, connection): super(Heartbeat, self).__init__() self.lock = threading.Lock() self.connection = connection self.quitflag = False self.stopflag = True self.setDaemon(True) def run(self): while not self.quitflag: time.sleep(10) self.lock.acquire() if self.stopflag : self.lock.release() continue try: self.connection.process_data_events() except Exception as ex: logging.warn("Error format: %s"%(str(ex))) self.lock.release() return self.lock.release() def startHeartbeat(self): self.lock.acquire() if self.quitflag==True: self.lock.release() return self.stopflag=False self.lock.release() def callback(ch, method, properties, body): logging.info("recv_body:%s" % body) time.sleep(600) ch.basic_ack(delivery_tag = method.delivery_tag) def test_main(): s_conn = pika.BlockingConnection( pika.ConnectionParameters('127.0.0.1', heartbeat_interval=10, socket_timeout=5, credentials=pika.PlainCredentials(USER, PWD))) chan = s_conn.channel() chan.queue_declare(queue=TEST_QUEUE) chan.basic_consume(callback, queue=TEST_QUEUE) heartbeat = Heartbeat(s_conn) heartbeat.start() #開啟心跳線程 heartbeat.startHeartbeat() chan.start_consuming() if __name__ == "__main__": test_main()
嘗試運行,結果還是不行,不得不安靜下來思考自己是不是想錯了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_interval: How often to send heartbeats. Min between this value and server's proposal will be used. Use 0 to deactivate heartbeats and None to accept server's proposal.
按這樣說法,應該還是沒有把心跳值給設置好。上面的程序期望是10秒發一次心跳,但是理論上發送心跳的間隔會比10秒多一點。所以艾瑪,我應該是把heartbeat_interval的作用搞錯了, 它是指超過這個時間間隔不發心跳或不給server任何信息,server就會斷開連接, 而不是說pika會按這個間隔來發心跳。 結果我把heartbeat_interval值設置高一點(比實際發送心跳/信息的間隔更長),比如上面設置成60秒,就正常運行了。
如果不指定heartbeat_interval, 它默認為None, 意味著按rabbitMQ server的配置來檢測心跳是否正常。
如果設置heartbeat_interval=0, 意味著不檢測心跳,server端將不會主動斷開連接。
以上是“如何解決python3中pika之連接斷開的問題”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。