您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么基于sqlite實現kafka延時消息”,在日常操作中,相信很多人在怎么基于sqlite實現kafka延時消息問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么基于sqlite實現kafka延時消息”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
延時消息(或者說定時消息)是業務系統里一個常見的功能點。常用業務場景如:
1) 訂單超時取消
2) 離線超過指定時間的用戶,召回通知
3) 手機消失多久后通知監護人……
現流行的實現方案主要有:
1)數據庫定時輪詢,掃描到達到延時時間的記錄,業務處理,刪除該記錄
2)jdk 自帶延時隊列(DelayQueue),或優化的時間輪算法
3)redis 有序集合
4)支持延時消息的分布式消息隊列
但以上方案,都存在各種缺陷:
1)定時輪詢間隔小,則對數據庫造成很大壓力,分布式微服務架構不好適配。
2)jdk 自帶延時隊列,占用內存高,服務重啟則丟失消息,分布式微服務架構不好適配。
3)redis 有序集合比較合適,但內存貴,分布式微服務架構不好適配。
4)現在主流的 RocketMQ 不支持任意延時時間的延時消息,RabbitMQ或ActiveMQ 性能不夠好,發送配置麻煩,kafka不支持延時消息。
因此,我想實現一個適配分布式微服務架構、高性能、方便業務系統使用的延時消息轉發中間件。
要保證高性能,推薦使用 kafka 或者 RocketMQ 做分布式消息隊列。當前是基于 sqlite 實現 kafka 延時消息。
當前實現思路是基于kafka的,實際適用于任意MQ產品。
1)業務系統先推送延時消息到統一延時消息隊列
2)定時讀取延時消息隊列的延時消息,保存于本地,提交偏移量
3)定時掃描本地到達延時期限的消息,轉發到實際業務消息隊列
4)刪除本地延時消息
1)一個業務處理流程使用一個sqlite數據庫文件,可并發執行提高性能。
2)使用雪花算法生成 id 。
3)沒有延時消息時,線程休眠一定時間,減低kafka集群、和本地io壓力。
4)本地存儲使用 sqlite。
1)kafka-client
2)sqlite
3)slf4j+log4j2
4)jackson
測試機器: i5-6500,16GB內存,機械硬盤
延時消息大小: 1kb
并發處理數:1
已本地簡單測試,性能表現:
1) 1個并發處理數就可以達到1秒存儲、轉發、刪除 約15000條延時消息,2 個可以達到 30000條/s ……
2) 一次性處理1萬條記錄,是經過多次對比試驗得出的合適批次大小
也測試了其它兩個本地存儲方案的性能:
1)直接存讀 json 文件,讀寫性能太差(約1200條記錄/s,慢在頻繁創建、打開、關閉文件,隨機磁盤io);
2)RocksDB 存讀,寫入性能非常好(97000條記錄/s),但篩選到期延時消息性能太差了,在數據量大于100w時,表現不如 sqlite,而且運行時占用內存、cpu 資源非常高。
1)jdk 1.8
2)kafka 1.1.0
可以自行替換為符合實際kafka版本的jar包(不會有沖突的,jar包版本和kafka服務版本不一致可能會有異常[無法拉取消息、提交失敗等])。
可修改pom.xml內的 kafka_version
<kafka_version>1.1.0</kafka_version>
重新打包即可。當前程序可以獨立部署,對現有工程項目無侵入性。
1)在項目根目錄執行 maven 打包后,會生成 dev_ops 文件
2)在 dev_ops 目錄下執行 java -jar kafka_delay_sqlite-20220102.jar 即可啟動程序
3)如需修改配置,可在dev_ops目錄內創建kafka.properties文件,設置自定義配置
默認配置如下:
# kafka 連接url [ip:port,ip:port……] kafka.url=127.0.0.1:9092 # 延時消息本地存儲路徑,建議使用絕對值 kafka.delay.store.path=/data/kafka_delay # 統一延時消息topic kafka.delay.topic=common_delay_msg # 消費者組id kafka.delay.group.id=common_delay_app # 并發處理數。限制條件: workers 小于等于topic分區數 kafka.delay.workers=2
4)業務方發送 kafka 消息到 topic (common_delay_msg)
消息體參數說明:
{ "topic": "實際業務topic", "messageKey": "消息的key,影響發送到那個分區", "message": "業務消息內容", "delayTime": 1641470704 }
delayTime: 指定延時時限,秒級別時間戳
消息體案例:
{ "topic": "cancel_order", "messageKey": "123456", "message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}", "delayTime": 1641470704 }
復制 延時消息保存目錄 到新機器,重啟部署、啟動程序即可。(該配置項所在目錄 kafka.delay.store.path=/data/kafka_delay)
日志默認輸出到 /logs/kafka_delay/ ,日志輸出方式為異步輸出。
system.log 記錄了系統 info 級別以上的日志,info級別日志不是立刻輸出的,所以程序重啟時,可能會丟失部分日志
exception.log 記錄了系統 warn 級別以上的日志,日志配置為立即輸出,程序正常重啟,不會丟失日志,重點關注這個日志即可。
如需自定義日志配置,可以在 log4j2.xml 進行配置。
如果要進行本地調試,可以解開注釋,否則控制臺沒有日志輸出:
<Root level="info"> <!--非本地調試環境下,建議注釋掉 console_appender--> <!--<AppenderRef ref="console_appender"/>--> <AppenderRef ref="system_log_appender"/> <AppenderRef ref="system_error_log_appender"/> </Root>
1) 由于設置了線程空閑時休眠機制,延時消息最大可能會推遲8秒鐘發送。
如果覺得延遲時間比較大,可以自行修改源碼的配置,重新打包即可。
KafkaUtils.subscribe()
MsgTransferTask.run()
2) 當前程序嚴格依賴于系統時鐘,注意配置程序部署服務器的時鐘和業務服務器時鐘一致
3) 建議配置統一延時消息隊列(common_delay_msg)的分區數為 2 的倍數
4) 每個 kafka.delay.workers 約需要 200 mb 內存,默認配置為2 , jvm 建議配置 1 GB 以上內存,避免頻繁gc 。
workers 增大后,不要再減小,否則會導致部分 sqlite 數據庫沒有線程訪問,消息丟失。
并發處理數越大,延時消息處理效率越高,但需要注意不要大于topic的分區數。
需要自行測試多少個并發處理數就會達到磁盤io、網絡帶寬上限。
當前程序主要瓶頸在于磁盤io和網絡帶寬,實際內存和cpu資源占用極低。
5) 程序運行時,不要操作延時消息保存目錄即里面的文件
6) 當前配置為正常情況下不會拋棄消息模式,但程序重啟時,存在重復發送消息的可能,下游業務系統需要做好冪等性處理。
如果kafka集群異常,當前配置為重新發送16次,如果仍不能恢復過來,則拋棄當前消息,實際生產環境里,基本不可能出現該場景。
如果確定消息不能拋棄,需要自行修改源碼(MsgTransferTask.run,KafkaUtils.send(……)),重新打包、部署。
7) 程序出現未知異常(sqlite被手動修改、磁盤滿了……),會直接結束程序運行。
到此,關于“怎么基于sqlite實現kafka延時消息”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。