您好,登錄后才能下訂單哦!
這篇文章主要講解了Python如何通過zookeeper實現分布式服務,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。
借助zookeeper可以實現服務器的注冊與發現,有需求的時候調用zookeeper來發現可用的服務器,將任務均勻分配到各個服務器上去.
這樣可以方便的隨任務的繁重程度對服務器進行彈性擴容,客戶端和服務端是非耦合的,也可以隨時增加客戶端.
zk_server.py
import threading import json import socket import sys from kazoo.client import KazooClient # TCP服務端綁定端口開啟監聽,同時將自己注冊到zk class ZKServer(object): def __init__(self, host, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port self.sock.bind((host, port)) self.zk = None def serve(self): """ 開始服務,每次獲取得到一個信息,都新建一個線程處理 """ self.sock.listen(128) self.register_zk() print("開始監聽") while True: conn, addr = self.sock.accept() print("建立鏈接%s" % str(addr)) t = threading.Thread(target=self.handle, args=(conn, addr)) t.start() # 具體的處理邏輯,只要接收到數據就立即投入工作,下次沒有數據本次鏈接結束 def handle(self, conn, addr): while True: data=conn.recv(1024) if not data or data.decode('utf-8') == 'exit': break print(data.decode('utf-8')) conn.close() print('My work is done!!!') # 將自己注冊到zk,臨時節點,所以連接不能中斷 def register_zk(self): """ 注冊到zookeeper """ self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() self.zk.ensure_path('/rpc') # 創建根節點 value = json.dumps({'host': self.host, 'port': self.port}) # 創建服務子節點 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True) if __name__ == '__main__': if len(sys.argv) < 3: print("usage:python server.py [host] [port]") exit(1) host = sys.argv[1] port = sys.argv[2] server = ZKServer(host, int(port)) server.serve()
zk_client.py
import random import sys import time import json import socket from kazoo.client import KazooClient # 客戶端連接zk,并從zk獲取可用的服務器列表 class ZKClient(object): def __init__(self): self._zk = KazooClient(hosts='127.0.0.1:2181') self._zk.start() self._get_servers() def _get_servers(self, event=None): """ 從zookeeper獲取服務器地址信息列表 """ servers = self._zk.get_children('/rpc', watch=self._get_servers) # print(servers) self._servers = [] for server in servers: data = self._zk.get('/rpc/' + server)[0] if data: addr = json.loads(data.decode()) self._servers.append(addr) def _get_server(self): """ 隨機選出一個可用的服務器 """ return random.choice(self._servers) def get_connection(self): """ 提供一個可用的tcp連接 """ sock = None while True: server = self._get_server() print('server:%s' % server) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((server['host'], server['port'])) except ConnectionRefusedError: time.sleep(1) continue else: break return sock if __name__ == '__main__': # 模擬多個客戶端批量生成任務,推送給服務器執行 client = ZKClient() for i in range(40): sock = client.get_connection() sock.send(bytes(str(i), encoding='utf8')) sock.close() time.sleep(1)
看完上述內容,是不是對Python如何通過zookeeper實現分布式服務有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。