91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么基于sqlite實現kafka延時消息

發布時間:2022-01-10 10:45:20 來源:億速云 閱讀:140 作者:iii 欄目:開發技術

這篇文章主要介紹“怎么基于sqlite實現kafka延時消息”,在日常操作中,相信很多人在怎么基于sqlite實現kafka延時消息問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么基于sqlite實現kafka延時消息”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

    1、需求

    延時消息(或者說定時消息)是業務系統里一個常見的功能點。常用業務場景如:

    1) 訂單超時取消

    2) 離線超過指定時間的用戶,召回通知

    3) 手機消失多久后通知監護人……

    現流行的實現方案主要有:

    1)數據庫定時輪詢,掃描到達到延時時間的記錄,業務處理,刪除該記錄

    2)jdk 自帶延時隊列(DelayQueue),或優化的時間輪算法

    3)redis 有序集合

    4)支持延時消息的分布式消息隊列

    但以上方案,都存在各種缺陷:

    1)定時輪詢間隔小,則對數據庫造成很大壓力,分布式微服務架構不好適配。

    2)jdk 自帶延時隊列,占用內存高,服務重啟則丟失消息,分布式微服務架構不好適配。

    3)redis 有序集合比較合適,但內存貴,分布式微服務架構不好適配。

    4)現在主流的 RocketMQ 不支持任意延時時間的延時消息,RabbitMQ或ActiveMQ 性能不夠好,發送配置麻煩,kafka不支持延時消息。

    因此,我想實現一個適配分布式微服務架構、高性能、方便業務系統使用的延時消息轉發中間件。

    2、實現思路

    要保證高性能,推薦使用 kafka 或者 RocketMQ 做分布式消息隊列。當前是基于 sqlite 實現 kafka 延時消息。

    當前實現思路是基于kafka的,實際適用于任意MQ產品。

    2.1 整體實現思路

    怎么基于sqlite實現kafka延時消息

    2.2 程序業務邏輯

    1)業務系統先推送延時消息到統一延時消息隊列

    2)定時讀取延時消息隊列的延時消息,保存于本地,提交偏移量

    3)定時掃描本地到達延時期限的消息,轉發到實際業務消息隊列

    4)刪除本地延時消息

    2.3 實現細節

    1)一個業務處理流程使用一個sqlite數據庫文件,可并發執行提高性能。

    2)使用雪花算法生成 id 。

    3)沒有延時消息時,線程休眠一定時間,減低kafka集群、和本地io壓力。

    4)本地存儲使用 sqlite。

    2.4 依賴框架

    1)kafka-client

    2)sqlite

    3)slf4j+log4j2

    4)jackson

    3、性能測試

    測試機器: i5-6500,16GB內存,機械硬盤

    延時消息大小: 1kb

    并發處理數:1

    已本地簡單測試,性能表現:

    1) 1個并發處理數就可以達到1秒存儲、轉發、刪除 約15000條延時消息,2 個可以達到 30000條/s ……

    2) 一次性處理1萬條記錄,是經過多次對比試驗得出的合適批次大小

    也測試了其它兩個本地存儲方案的性能:

    1)直接存讀 json 文件,讀寫性能太差(約1200條記錄/s,慢在頻繁創建、打開、關閉文件,隨機磁盤io);

    2)RocksDB 存讀,寫入性能非常好(97000條記錄/s),但篩選到期延時消息性能太差了,在數據量大于100w時,表現不如 sqlite,而且運行時占用內存、cpu 資源非常高。

    4、部署

    4.1 系統環境依賴

    1)jdk 1.8

    2)kafka 1.1.0

    可以自行替換為符合實際kafka版本的jar包(不會有沖突的,jar包版本和kafka服務版本不一致可能會有異常[無法拉取消息、提交失敗等])。

    可修改pom.xml內的 kafka_version

    <kafka_version>1.1.0</kafka_version>

    重新打包即可。當前程序可以獨立部署,對現有工程項目無侵入性。

    4.2 安裝

    1)在項目根目錄執行 maven 打包后,會生成 dev_ops 文件

    2)在 dev_ops 目錄下執行 java -jar kafka_delay_sqlite-20220102.jar 即可啟動程序

    3)如需修改配置,可在dev_ops目錄內創建kafka.properties文件,設置自定義配置

    默認配置如下:

    # kafka 連接url [ip:port,ip:port……]
    kafka.url=127.0.0.1:9092
    # 延時消息本地存儲路徑,建議使用絕對值
    kafka.delay.store.path=/data/kafka_delay
    # 統一延時消息topic
    kafka.delay.topic=common_delay_msg
    # 消費者組id
    kafka.delay.group.id=common_delay_app
    # 并發處理數。限制條件: workers 小于等于topic分區數
    kafka.delay.workers=2

    4)業務方發送 kafka 消息到 topic (common_delay_msg)

    消息體參數說明:

    {
      "topic": "實際業務topic",
      "messageKey": "消息的key,影響發送到那個分區",
      "message": "業務消息內容",
      "delayTime": 1641470704
    }

    delayTime: 指定延時時限,秒級別時間戳

    消息體案例:

    {
      "topic": "cancel_order",
      "messageKey": "123456",
      "message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}",
      "delayTime": 1641470704
    }

    4.3 程序遷移

    復制 延時消息保存目錄 到新機器,重啟部署、啟動程序即可。(該配置項所在目錄 kafka.delay.store.path=/data/kafka_delay)

    4.4 排查日志

    日志默認輸出到 /logs/kafka_delay/ ,日志輸出方式為異步輸出。

    system.log 記錄了系統 info 級別以上的日志,info級別日志不是立刻輸出的,所以程序重啟時,可能會丟失部分日志

    exception.log 記錄了系統 warn 級別以上的日志,日志配置為立即輸出,程序正常重啟,不會丟失日志,重點關注這個日志即可。

    如需自定義日志配置,可以在 log4j2.xml 進行配置。

    如果要進行本地調試,可以解開注釋,否則控制臺沒有日志輸出:

            <Root level="info">
                <!--非本地調試環境下,建議注釋掉 console_appender-->
                <!--<AppenderRef ref="console_appender"/>-->
                <AppenderRef ref="system_log_appender"/>
                <AppenderRef ref="system_error_log_appender"/>
            </Root>

    5、注意事項

    1) 由于設置了線程空閑時休眠機制,延時消息最大可能會推遲8秒鐘發送。

    如果覺得延遲時間比較大,可以自行修改源碼的配置,重新打包即可。

    KafkaUtils.subscribe()

    MsgTransferTask.run()

    2) 當前程序嚴格依賴于系統時鐘,注意配置程序部署服務器的時鐘和業務服務器時鐘一致

    3) 建議配置統一延時消息隊列(common_delay_msg)的分區數為 2 的倍數

    4) 每個 kafka.delay.workers 約需要 200 mb 內存,默認配置為2 , jvm 建議配置 1 GB 以上內存,避免頻繁gc 。

    workers 增大后,不要再減小,否則會導致部分 sqlite 數據庫沒有線程訪問,消息丟失。

    并發處理數越大,延時消息處理效率越高,但需要注意不要大于topic的分區數。

    需要自行測試多少個并發處理數就會達到磁盤io、網絡帶寬上限。

    當前程序主要瓶頸在于磁盤io和網絡帶寬,實際內存和cpu資源占用極低。

    5) 程序運行時,不要操作延時消息保存目錄即里面的文件

    6) 當前配置為正常情況下不會拋棄消息模式,但程序重啟時,存在重復發送消息的可能,下游業務系統需要做好冪等性處理。

    如果kafka集群異常,當前配置為重新發送16次,如果仍不能恢復過來,則拋棄當前消息,實際生產環境里,基本不可能出現該場景。

    如果確定消息不能拋棄,需要自行修改源碼(MsgTransferTask.run,KafkaUtils.send(&hellip;&hellip;)),重新打包、部署。

    7) 程序出現未知異常(sqlite被手動修改、磁盤滿了&hellip;&hellip;),會直接結束程序運行。

    到此,關于“怎么基于sqlite實現kafka延時消息”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    泸水县| 南靖县| 五台县| 介休市| 会宁县| 宜君县| 策勒县| 元阳县| 天柱县| 南昌县| 葫芦岛市| 车致| 彭泽县| 保山市| 绵阳市| 磐安县| 澄迈县| 三门县| 浦北县| 元谋县| 新龙县| 永宁县| 尖扎县| 元氏县| 金平| 香港| 越西县| 龙门县| 新密市| 松滋市| 甘南县| 晋江市| 滦平县| 平罗县| 周口市| 高唐县| 双江| 文昌市| 青浦区| 新营市| 金沙县|