91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

自定義RPC的完整實現---深入理解rpc內部原理

發布時間:2020-07-19 17:47:42 來源:網絡 閱讀:1087 作者:ck_god 欄目:編程語言

倘若不使用RPC遠端調用的情況下,代碼如下:

local.py

# coding:utf-8

# 本地調用除法運算的形式
class InvalidOperation(Exception):
    def __init__(self, message = None):
        self.message = message or 'involid operation'

def divide(num1, num2 = 1):
    if num2 == 0:
        raise InvalidOperation
    res = num1 / num2
    return res

try:
    val = divide(200, 100)
except InvalidOperation as e:
    print(e.message)
else:
    print(val)

接下來將使用RPC二進制的形式,遠程過程調用代碼如下。

service.py 中自定義需要實現消息協議、傳輸控制,并且實現客戶端存根clientStub和服務器端存根serverStub,服務器定義以及channel的定義。

import struct
from io import BytesIO
import socket

class InvalidOperation(BaseException):
    def __init__(self, message = None):
        self.message = message or 'involid operation'

class MethodProtocol(object):
    ''''
    解讀方法名
    '''
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        幫助我們讀取二進制數據
        :param  size: 想要讀取的二進制數據大小
        :return:  二進制數據bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有時候長度大于每次讀取的長度
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l
                if l == 0:
                    # 表示客戶端已經關閉了
                    raise EOFError
            return buff

    def get_method_name(self):
        # 讀取字符串長度
        buff = self._read_all(4)
        length = struct.unpack('!I',buff)[0]

        # 讀取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name

class DivideProtocol(object):
    """
    divide過程消息協議轉換工具
    """
    def args_encode(self, num1, num2=1):
        """
        將原始調用的請求參數轉換打包成二進制消息數據
        :param num1: int
        :param num2: int
        :return: bytes 二進制消息數據
        """
        name = 'divide'

        # 處理函數名
        buff = struct.pack('!I', 6) # 無符號int
        buff += name.encode()

        # 處理參數1
        buff2 = struct.pack('!B', 1) # 無符號byte
        buff2 += struct.pack('!i', num1)

        # 處理參數2
        if num2 != 1:
            # 沒有傳參的時候
            buff2 += struct.pack('!B', 2)
            buff2 += struct.pack('!i', num2)

        # 處理參數邊界和組合成完整數據
        buff += struct.pack('!I',len(buff2))
        buff += buff2

        return buff

    def _read_all(self, size):
        """
        幫助我們讀取二進制數據
        :param  size: 想要讀取的二進制數據大小
        :return:  二進制數據bytes
        """
        # self.conn
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # 有時候長度大于每次讀取的長度
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have +=  l
                if l == 0:
                    # 表示客戶端已經關閉了
                    raise EOFError
            return buff

    def args_decode(self, connection):
        """
        接受調用請求數據病進行解析
        :param connection: 鏈接請求數據 socket  BytesIO
        :return: 因為有多個參數,定義為字典
        """
        param_len_map = {
            1:4,
            2:4,
        }

        param_fmt_map = {
            1:'!i',
            2:'!i',
        }

        param_name_map = {
            1: 'num1',
            2: 'num2',
        }

        # 保存用來返回的參數字典
        args = {}

        self.conn = connection
        # 處理方法的名字,已經提前被處理,稍后處理

        # 處理消息邊界
        # 1) 讀取二進制數據----read  , ------ByteIO.read
        # 2) 將二進制數據轉換為python的數據類型
        buff = self._read_all(4)
        length = struct.unpack('!I',buff)[0]

        # 記錄已經讀取的長度值
        have = 0

        # 處理第一個參數
        # 解析參數序號
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]

        # 解析參數值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt,buff)[0]

        # 設置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param

        if have >= length:
            return args
        # 處理第二個參數
        # 解析參數序號
        buff = self._read_all(1)
        param_seq = struct.unpack('!B', buff)[0]

        # 解析參數值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt, buff)[0]

        # 設置解析后的字典
        param_name = param_name_map[param_seq]
        args[param_name] = param
        return args

    def result_encode(self, result):
        """
        將原始結果數據轉換為消息協議二進制數據
        :param result:
        :return:
        """
        if  isinstance(result,float):
            # 處理返回值類型
            buff = struct.pack('!B', 1)
            buff += struct.pack('!f', result)
            return buff
        else:
            buff = struct.pack('!B', 2)
            # 處理返回值
            length = len(result.message)
            # 處理字符串長度
            buff += struct.pack('!I', length)
            buff += result.message.encode()
            return buff

    def result_decode(self, connection):
        """
        將返回值消息數據轉換為原始返回值
        :param connection: socket BytesIo
        :return: float InvalidOperation對象
        """
        self.conn = connection
        # 處理返回值類型
        buff = self._read_all(1)
        result_type = struct.unpack('!B', buff)[0]

        if result_type == 1:
            #正常情況
            buff = self._read_all(4)
            val = struct.unpack('!f', buff)[0]
            return val
        else:
            buff = self._read_all(4)
            length = struct.unpack('!I', buff)[0]
            # 讀取字符串
            buff = self._read_all(length)
            message = buff.decode(buff)
            return InvalidOperation(message)

class Channel(object):
    """
    用于客戶端建立網絡鏈接
    """
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_connection(self):
        """
        獲取鏈接對象
        :return: 與服務器通訊的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock

class Server(object):
    """
    RPC服務器
    """
    def __init__(self, host, port, handlers):
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

        # 地址復用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.host = host
        self.port = port
        # 綁定地址
        sock.bind((self.host, self.port))

        #  因為在啟動的方法中才開啟監聽,所以不在此處開啟
        # sock.listen(128)
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        開啟服務器運行,提供RPC服務
        :return:
        """
        # 開啟服務器的監聽,等待客戶端的鏈接請求
        self.sock.listen(128)
        print("服務器開啟監聽,ip地址為%s,port為%d..." % (self.host,self.port))
        while True:
            # 不斷的接收客戶端的鏈接請求
            client_sock, client_addr = self.sock.accept()
            print("與客戶端%s建立連接" % str(client_addr))

            # 交個ServerStub,完成客戶端的具體的RPC的調用請求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    # 不斷的接收
                    stub.process()
            except EOFError:
                # 表示客戶端關閉了連接
                print('客戶端關閉了連接')
                client_sock.close()

class ClientStub(object):
    """
    用來幫助客戶端完成遠程過程調用 RPC調用

    stub = ClientStub()
    stub.divide(200, 100)
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2 = 1):
        # 將調用的參數打包成消息協議的數據
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)
        # 將消息數據通過網絡發送給服務器
        self.conn.sendall(args)

        # 接受服務器返回的消息數據,并進行解析
        result = proto.result_decode(self.conn)

        # 將結果之(正常float 或 異常InvalidOperation)返回給客戶端
        if isinstance(result,float):
            return result
        else:
            raise result

class ServerStub(object):
    """
    服務端存根
    幫助服務端完成遠端過程調用
    """
    def __init__(self, connection, handlers):
        """
        :param connection: 與客戶端的鏈接
        :param handlers: 真正的本地函數路由
        此處不以map的形式處理,實現類的形式
        class Handler:
            @staticmethod
            def divide():
                pass
            @staticmethod
            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            'divide': self._process_divide,
            'add': self._process_add
        }
        self.handlers = handlers

    def process(self):
        """
        當服務端接受了客戶的鏈接,建立好鏈接后,完成遠端調用的處理
        :return:
        """
        # 接收消息數據,并解析方法的名字
        name = self.method_proto.get_method_name()
        # 根據解析獲得的方法名,調用相應的過程協議,接收并解析消息數據
        self.process_map[name]()

    def _process_divide(self):
        """
        處理除法過程調用
        :return:
        """
        proto = DivideProtocol()
        args = proto.args_decode(self.conn)
        # args = {'num1':xxx, 'num2':xxx}
        # 除法過程的本地調用------------------->>>>>>>>>
        # 將本地調用過程的返回值(包括可能的異常)打包成消息協議的數據,通過網絡返回給客戶端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)
        self.conn.sendall(ret_message)

    def _process_add(self):
        """
        處理加法過程調用
        此方法暫時不識閑
        :return:
        """
        pass

if __name__ == '__main__':
    # 目的:消息協議測試,模擬網絡傳輸
    # 構造消息數據
    proto = DivideProtocol()

    # 測試一
    # divide(200,100)
    # message = proto.args_encode(200,100)

    # 測試二
    message = proto.args_encode(200)

    conn = BytesIO()
    conn.write(message)
    conn.seek(0)

    # 解析消息數據
    method_proto = MethodProtocal(conn)
    name = method_proto.get_method_name()
    print(name)

    args = proto.args_decode(conn)
    print(args)

接下來,只需要創建服務器實例和使用客戶端發起請求

server.py

from services import InvalidOperation
from services import Server

class Handlers:
    @staticmethod
    def divide(num1, num2 = 1):
        if num2 == 0:
            raise InvalidOperation('ck_god_err')
        val = num1/num2
        return val

if __name__ == '__main__':
    # 開啟服務器
    _server = Server('127.0.0.1', 8000, Handlers)
    _server.serve()

client.py

ffrom services import ClientStub
from services import Channel
from services import InvalidOperation

# 創建與服務器的連接
channel = Channel('127.0.0.1', 8000)

# 創建用于rpc調用的工具
stub = ClientStub(channel)

# 進行調用
for i in range(5):
    try:
        # val = stub.divide(i * 100,100)
        # val = stub.divide(i * 100)
        val = stub.divide( 100, 0)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)

當然如果有必要的話,可以在services.py添加如下代碼,改為多線程的方式,自己再重寫創建實例就可以調用了。

class ThreadServer(object):
    """
    多線成RPC服務器
    """
    def __init__(self, host, port, handlers):
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

        # 地址復用
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.host = host
        self.port = port
        # 綁定地址
        sock.bind((self.host, self.port))

        #  因為在啟動的方法中才開啟監聽,所以不在此處開啟
        # sock.listen(128)
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        開啟服務器運行,提供RPC服務
        :return:
        """
        # 開啟服務器的監聽,等待客戶端的鏈接請求
        self.sock.listen(128)
        print("服務器開啟監聽,ip地址為%s,port為%d..." % (self.host,self.port))
        while True:
            # 不斷的接收客戶端的鏈接請求
            client_sock, client_addr = self.sock.accept()
            print("與客戶端%s建立連接" % str(client_addr))
            t = threading.Thread(target= self.handle, args=(client_sock,))
            t.start()

    # 子線程函數
    def handle(self,client_sock):
        """
        子線程調用的方法,用來處理一個客戶段的請求
        :return: 
        """
        # 交個ServerStub,完成客戶端的具體的RPC的調用請求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                # 不斷的接收
                stub.process()
        except EOFError:
            # 表示客戶端關閉了連接
            print('客戶端關閉了連接')
            client_sock.close()
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

札达县| 兰西县| 谢通门县| 天水市| 常熟市| 临城县| 武鸣县| 六盘水市| 秦皇岛市| 时尚| 屯门区| 广元市| 临沧市| 石屏县| 洱源县| 灌阳县| 高要市| 华亭县| 青海省| 临沧市| 龙江县| 镇远县| 普兰店市| 康保县| 许昌县| 额敏县| 宜川县| 邓州市| 武清区| 四子王旗| 黄梅县| 中方县| 安宁市| 武平县| 资源县| 彭山县| 定日县| 南平市| 宁海县| 微山县| 山西省|