您好,登錄后才能下訂單哦!
如下所示:
安裝kafka支持庫pip install kafka-python
from kafka import KafkaProducer import json ''' 生產者demo 向test_lyl2主題中循環寫入10條json數據 注意事項:要寫入json數據需加上value_serializer參數,如下代碼 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'] ) for i in range(10): data={ "name":"李四", "age":23, "gender":"男", "id":i } producer.send('test_lyl2', data) producer.close()
from kafka import KafkaConsumer import json ''' 消費者demo 消費test_lyl2主題中的數據 注意事項:如需以json格式讀取數據需加上value_deserializer參數 ''' consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1", bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'], auto_offset_reset='earliest',value_deserializer=json.loads ) for message in consumer: print(message.value)
以上這篇對python操作kafka寫入json數據的簡單demo分享就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。