91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python如何實現對kafka的基本操作

發布時間:2021-10-18 15:25:40 來源:億速云 閱讀:215 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“python如何實現對kafka的基本操作”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“python如何實現對kafka的基本操作”這篇文章吧。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生產者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個消費者消費一個topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個偏移量消費
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個消費者訂閱多個topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消費者(手動拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何實現對kafka的基本操作”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

太仓市| 陕西省| 天祝| 甘南县| 大连市| 乐至县| 交城县| 商河县| 桐柏县| 额尔古纳市| 鄂托克前旗| 建湖县| 嘉义市| 仁怀市| 嘉祥县| 格尔木市| 昂仁县| 全椒县| 海南省| 崇文区| 广昌县| 忻城县| 水富县| 盐山县| 孟连| 呈贡县| 昌宁县| 邵阳县| 惠州市| 垦利县| 沂源县| 车致| 沙雅县| 沐川县| 怀宁县| 六枝特区| 美姑县| 孙吴县| 九寨沟县| 定南县| 同心县|