您好,登錄后才能下訂單哦!
本篇內容主要講解“C++怎么實現RPC網絡通訊”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“C++怎么實現RPC網絡通訊”吧!
RPC指的是計算機A的進程調用另外一臺計算機B的進程,A上的進程被掛起,B上被調用的進程開始執行,當B執行完畢后將執行結果返回給A,A的進程繼續執行。調用方可以通過使用參數將信息傳送給被調用方,然后通過傳回的結果得到信息。這些傳遞的信息都是被加密過或者其他方式處理。這個過程對開發人員是透明的,因此RPC可以看作是本地過程調用的一種擴展,使被調用過程不必與調用過程位于同一物理機中。
RPC可以用于構建基于B/S模式的分布式應用程序:請求服務是一個客戶端、而服務提供程序是一臺服務器。和常規和本地的調用過程一樣,遠程過程調用是同步操作,在結果返回之前,需要暫時中止請求程序。
RPC的優點:
支持面向過程和面向線程的模型;
內部消息傳遞機制對用戶隱藏;
基于 RPC 模式的開發可以減少代碼重寫;
可以在本地環境和分布式環境中運行;
以ARM環境為例,我們拆解本地調用的過程,以下面代碼為例:
int selfIncrement(int a) { return a + 1; } int a = 10;
當執行到selfIncrement(a)時,首先把a存入寄存器R0,之后轉到函數地址selfIncrement,執行函數內的指令 ADD R0,#1。跳轉到函數的地址偏移量在編譯時確定。
但是如果這是一個遠程調用,selfIncrement函數存在于其他機器,為了實現遠程調用,請求方和服務方需要提供需要解決以下問題:
1. 網絡傳輸。
本地調用的參數存放在寄存器或棧中,在同一塊內存中,可以直接訪問到。遠程過程調用需要借助網絡來傳遞參數和需要調用的函數 ID。
2. 編解碼
請求方需要將參數轉化為字節流,服務提供方需要將字節流轉化為參數。
3. 函數映射表
服務提供方的函數需要有唯一的 ID 標識,請求方通過 ID 標識告知服務提供方需要調用哪個函數。
以上三個功能即為 RPC 的基本框架所必須包含的功能。
一次 RPC 調用的運行流程大致分為如下七步,具體如下圖所示。
1.客戶端調用客戶端存根程序,將參數傳入;
2.客戶端存根程序將參數轉化為標準格式,并編組進消息;
3.客戶端存根程序將消息發送到傳輸層,傳輸層將消息傳送至遠程服務器;
4.服務器的傳輸層將消息傳遞到服務器存根程序,存根程序對闡述進行解包,并使用本地調用的機制調用所需的函數;
5.運算完成之后,將結果返回給服務器存根,存根將結果編組為消息,之后發送給傳輸層;
6.服務器傳輸層將結果消息發送給客戶端傳輸層;
7.客戶端存根對返回消息解包,并返回給調用方。
服務端存根和客戶端存根可以看做是被封裝起來的細節,這些細節對于開發人員來說是透明的,但是在客戶端層面看到的是 “本地” 調用了 selfIncrement() 方法,在服務端層面,則需要封裝、網絡傳輸、解封裝等等操作。因此 RPC 可以看作是傳統本地過程調用的一種擴展,其使得被調用過程不必與調用過程位于同一物理機中。
RPC 的目標是做到在遠程機器上調用函數與本地調用函數一樣的體驗。 為了達到這個目的,需要實現網絡傳輸、序列化與反序列化、函數映射表等功能,其中網絡傳輸可以使用socket或其他,序列化和反序列化可以使用protobuf,函數映射表可以使用std::function。
lambda與std::function內容可以看:
C++11 匿名函數lambda的使用
C++11 std::function 基礎用法
lambda 表達式和 std::function 的功能是類似的,lambda 表達式可以轉換為 std::function,一般情況下,更多使用 lambda 表達式,只有在需要回調函數的情況下才會使用 std::function。
#include <iostream> #include <memory> #include <thread> #include <functional> #include <cstring> class RPCClient { public: using RPCCallback = std::function<void(const std::string&)>; RPCClient(const std::string& server_address) : server_address_(server_address) {} ~RPCClient() {} void Call(const std::string& method, const std::string& request, RPCCallback callback) { // 序列化請求數據 std::string data = Serialize(method, request); // 發送請求 SendRequest(data); // 開啟線程接收響應 std::thread t([this, callback]() { std::string response = RecvResponse(); // 反序列化響應數據 std::string result = Deserialize(response); callback(result); }); t.detach(); } private: std::string Serialize(const std::string& method, const std::string& request) { // 省略序列化實現 } void SendRequest(const std::string& data) { // 省略網絡發送實現 } std::string RecvResponse() { // 省略網絡接收實現 } std::string Deserialize(const std::string& response) { // 省略反序列化實現 } private: std::string server_address_; }; int main() { std::shared_ptr<RPCClient> client(new RPCClient("127.0.0.1:8000")); client->Call("Add", "1,2", [](const std::string& result) { std::cout << "Result: " << result << std::endl; }); return 0; }
這段代碼定義了RPCClient類來處理客戶端的請求任務,用到了lambda和std::function來處理函數調用,在Call中使用多線程技術。main中使用智能指針管理Rpcclient類,并調用了客戶端的Add函數。
127.0.0.1為本地地址,對開發來說需要使用本地地址自測,端口號為8000,需要選擇一個空閑端口來通信。
下面是服務端的實現
#include <iostream> #include <map> #include <functional> #include <memory> #include <thread> #include <mutex> // 使用第三方庫實現序列化和反序列化 #include <boost/serialization/serialization.hpp> #include <boost/serialization/map.hpp> using namespace std; // 定義RPC函數類型 using RPCCallback = std::function<std::string(const std::string&)>; class RPCHandler { public: void registerCallback(const std::string& name, RPCCallback callback) { std::unique_lock<std::mutex> lock(mtx_); callbacks_[name] = callback; } std::string handleRequest(const std::string& request) { // 反序列化請求 std::map<std::string, std::string> requestMap; std::istringstream is(request); boost::archive::text_iarchive ia(is); ia >> requestMap; // 查找并調用對應的回調函數 std::string name = requestMap["name"]; std::string args = requestMap["args"]; std::unique_lock<std::mutex> lock(mtx_); auto it = callbacks_.find(name); if (it == callbacks_.end()) { return "Error: Unknown function"; } RPCCallback callback = it->second; return callback(args); } private: std::map<std::string, RPCCallback> callbacks_; std::mutex mtx_; }; int main() { RPCHandler rpcHandler; // 注冊回調函數 rpcHandler.registerCallback("add", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a + b; std::ostringstream os; os << result; return os.str(); }); rpcHandler.registerCallback("sub", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a - b; std::ostringstream os; os << result; return os.str }); // 創建處理請求的線程 std::thread requestThread([&]() { while (true) { std::string request; std::cin >> request; std::string response = rpcHandler.handleRequest(request); std::cout << response << std::endl; } }); requestThread.join(); return 0; }
上面的代碼實現了一個簡單的C++ RPC服務端。主要實現了以下功能:
1.定義了RPC函數類型 RPCCallback,使用std::function<std::string(const std::string&)>表示。
2.RPCHandler類實現了注冊函數和處理請求的功能。
3.在main函數中創建了一個RPCHandler對象,并注冊了兩個函數"add" 和 "sub"。這些函數通過lambda表達式實現,并在被調用時通過std::istringstream讀取參數并返回結果。
4.創建了一個新線程requestThread來處理請求。在這個線程中,通過std::cin讀取請求,然后調用RPCHandler的handleRequest函數并使用std::cout輸出響應。
注意,這套代碼是最簡單的RPC機制,只能調用本地的資源,他還存在以下缺點:
1.代碼并沒有處理錯誤處理,如果請求格式不正確或函數不存在,服務端將會返回“Error: Unknown function”。
2.沒有使用網絡庫進行通信,所以只能在本機上使用。
3.沒有提供高效的并發性能,所有請求都在單獨的線程中處理。
4.沒有考慮RPC服務的可用性和高可用性,如果服務端崩潰或不可用,客戶端將無法繼續使用服務。
5.沒有考慮RPC服務的可擴展性,如果有大量請求需要處理,可能會導致性能問題。
6.使用了第三方庫Boost.Serialization來實現序列化和反序列化,如果不想使用第三方庫,可能需要自己實現序列化的功能。
下面我們一步一步完善它。
下面是 RPCHandler 類中加入錯誤處理的代碼示例:
class RPCHandler { public: // 其他代碼... std::string handleRequest(const std::string& request) { // 反序列化請求 std::map<std::string, std::string> requestMap; std::istringstream is(request); boost::archive::text_iarchive ia(is); ia >> requestMap; // 查找并調用對應的回調函數 std::string name = requestMap["name"]; std::string args = requestMap["args"]; std::unique_lock<std::mutex> lock(mtx_); auto it = callbacks_.find(name); if (it == callbacks_.end()) { return "Error: Unknown function"; } RPCCallback callback = it->second; try { return callback(args); } catch (const std::exception& e) { return "Error: Exception occurred: " + std::string(e.what()); } catch (...) { return "Error: Unknown exception occurred"; } } };
上面的代碼在 RPCHandler 類的 handleRequest 函數中加入了錯誤處理的代碼,它使用了 try-catch 語句來捕獲可能發生的異常。如果找不到對應的函數或發生了異常,會返回錯誤信息。這樣,如果請求格式不正確或函數不存在,服務端將會返回相應的錯誤信息。
加入網絡連接不需要動服務端的實現,只需要在main里創造套接字去鏈接就好:
int main() { io_context ioc; ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080)); RPCHandler rpcHandler; // 注冊函數 rpcHandler.registerCallback("add", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a + b; std::ostringstream os; os << result; return os.str(); }); rpcHandler.registerCallback("sub", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a - b; std::ostringstream os; os << result; return os.str(); }); // 等待連接 while (true) { ip::tcp::socket socket(ioc); acceptor.accept(socket); // 創建線程處理請求 std::thread requestThread([&](ip::tcp::socket socket) { while (true) { // 讀取請求 boost::asio::streambuf buf; read_until(socket, buf, '\n'); std::string request = boost::asio::buffer_cast<const char*>(buf.data()); request.pop_back(); // 處理請求 std::string response = rpcHandler.handleRequest(request); // 發送響應 write(socket, buffer(response + '\n')); } }, std::move(socket)); requestThread.detach(); } return 0; }
這是一個使用Boost.Asio庫實現的RPC服務端代碼示例。它使用了TCP協議監聽8080端口,等待客戶端的連接。當有客戶端連接時,創建一個新線程來處理請求。請求和響應通過網絡傳輸。
使用并發和異步機制,忽略重復代碼,實現如下:
class RPCHandler { public: // ... void handleConnection(ip::tcp::socket socket) { while (true) { // 讀取請求 boost::asio::streambuf buf; read_until(socket, buf, '\n'); std::string request = boost::asio::buffer_cast<const char*>(buf.data()); request.pop_back(); // 使用并行執行處理請求 std::vector<std::future<std::string>> futures; for (int i = 0; i < request.size(); i++) { futures.emplace_back(std::async(std::launch::async, &RPCHandler::handleRequest, this, request[i])); } // 等待所有請求處理完成并發送響應 for (auto& f : futures) { std::string response = f.get(); write(socket, buffer(response + '\n')); } } } };
這樣,請求會被分成多個部分并行處理,可以利用多核 CPU 的優勢提高服務端的并發性能。
main():
int main() { io_context ioc; ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080)); RPCHandler rpcHandler; // 注冊函數 rpcHandler.registerCallback("add", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a + b; std::ostringstream os; os << result; return os.str(); }); rpcHandler.registerCallback("sub", [](const std::string& args) { std::istringstream is(args); int a, b; is >> a >> b; int result = a - b; std::ostringstream os; os << result; return os.str(); }); // 創建線程池 boost::thread_pool::executor pool(10); // 等待連接 while (true) { ip::tcp::socket socket(ioc); acceptor.accept(socket); // 將請求添加到線程池中處理 pool.submit(boost::bind(&RPCHandler::handleConnection, &rpcHandler, std::move(socket))); } return 0; }
在 main 函數中可以使用 boost::thread_pool::executor 來管理線程池,在線程池中提交任務來處理請求。這里的線程池大小設置為10,可以根據實際情況調整。
在其中使用了重試機制來保證客戶端能夠重新連接服務端:
class RPCClient { public: RPCClient(const std::string& address, int port) : address_(address), port_(port), socket_(io_context_) { connect(); } std::string call(const std::string& name, const std::string& args) { // 序列化請求 std::ostringstream os; boost::archive::text_oarchive oa(os); std::map<std::string, std::string> request; request["name"] = name; request["args"] = args; oa << request; std::string requestStr = os.str(); // 發送請求 write(socket_, buffer(requestStr + '\n')); // 讀取響應 boost::asio::streambuf buf; read_until(socket_, buf, '\n'); std::string response = boost::asio::buffer_cast<const char*>(buf.data()); response.pop_back(); return response; } private: void connect() { bool connected = false; while (!connected) { try { socket_.connect(ip::tcp::endpoint(ip::address::from_string(address_), port_)); connected = true; } catch (const std::exception& e) { std::cerr << "Error connecting to server: " << e.what() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } } } std::string address_; int port_; io_context io_context_; ip::tcp::socket socket_; };
在這個示例中,當連接服務端失敗時,客戶端會在一定的時間間隔后重試連接,直到成功連接上服務端為止。
到此,相信大家對“C++怎么實現RPC網絡通訊”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。