基于boost asio實現server端通信,采用one by one的同步處理方式,并且設置連接等待超時。下面給出了string和byte兩種數據類型的通信方式,可覆蓋基本通信場景需求。
#include "software.hpp" int main(int argc, char** argv) { if(2 != argc){ std::cout<<"Usage: "<<argv[0]<< " port"<<std::endl; return -1; } try { boost::asio::io_context io; int port = atoi(argv[1]); // get server port software::server(io, port); // 開啟一個server,ip地址為server主機地址,port為mian函數傳入 } catch (std::exception& e) { std::cout<<"main exception: " << e.what()<<std::endl; } return 0; }
#ifndef __SOFTWARE_HPP__ #define __SOFTWARE_HPP__ #include <string> #include <iostream> #include <boost/asio.hpp> namespace software { //! Session deadline duration constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2); //! Protocol delimiter to software client;分隔符:接收來自client的string數據必須以"}\n"結尾 static constexpr char const* delimiter = "}"; namespace asio = boost::asio; using tcp = asio::ip::tcp; /** * @brief Session for software * Inherit @class enable_shared_from_this<> * in order to give the lifecycle to io context, * it'll causes the lifecycle automatically end when connection break * (async operation will return when connection break) * @code * asio::io_context io; * session sess(io, std::move(socket)); * io.run(); * @endcode */ class session { public: /* session constructor function */ session(asio::io_context& io, tcp::socket socket); /* session destructor function */ ~session(); private: /*! Async read session socket */ void do_read(); /*! Async wait deadline */ void async_deadline_wait(); /*! software on message handler */ void on_message(std::string&& message); private: tcp::socket socket_; // tcp socket std::string recv_data_; // recv buffer[string] asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto }; /** * @brief Start server to software(同步方式accept) * Will serve client one by one(同步方式) * @param[in] io The asio io context * @param[in] port The listen port */ inline void server(asio::io_context& io, unsigned short port) { std::cout<<"sync server start, listen port: " << port << std::endl; tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port)); // 一次處理一個連接[one by one] while (true) { using namespace std; // client請求放在隊列中,循環逐個處理,處理完繼續阻塞 tcp::socket sock(io); acceptor.accept(sock); // 一開始會阻塞在這,等待software client連接 io.restart(); session sess(io, std::move(sock)); // io socket io.run(); // run until session async operations done,調用run()函數進入io事件循環 } } } // namespace software #endif
#include "software.hpp" using namespace std; namespace software { /** * @brief Session construct function * @param[in] io The io context * @param[in] socket The connected session socket */ session::session(asio::io_context& io, tcp::socket socket) : socket_(std::move(socket)) , deadline_(io) { std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl; do_read(); //在構造函數中調用do_read()函數完成對software數據的讀取 async_deadline_wait(); //set on-request-deadline } session::~session() { std::cout<<"session destruct!" << std::endl; } /** * @brief 從software異步讀取數據并存放在recv_data_中 */ void session::do_read() { auto handler = [this](std::error_code ec, std::size_t length) { // recv data success, dispose the received data in [on_message] func if (!ec && socket_.is_open() && length != 0) { on_message(recv_data_.substr(0, length)); recv_data_.erase(0, length); // 將recv_data_擦除為0 do_read(); // Register async read operation again,重新執行讀取操作 } // error occured, shutdown the session else if (socket_.is_open()) { std::cout<<"client offline, close session" << std::endl; socket_.shutdown(asio::socket_base::shutdown_both); // 關閉socket socket_.close(); // 關閉socket deadline_.cancel(); // deadline wait計時取消 } }; std::cout<<"server waiting message..." << std::endl; // block here until received the delimiter asio::async_read_until(socket_, asio::dynamic_buffer(recv_data_), delimiter, // 讀取終止條件(分隔符號) handler); // 消息處理句柄函數 deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超時2min自動關閉session } /** * @brief Async wait for the deadline,計時等待函數 * @pre @a deadline_.expires_xxx() must called */ void session::async_deadline_wait() { using namespace std::chrono; deadline_.async_wait( //! lambda function [this](std::error_code) { if (!socket_.is_open()) return; if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) { std::cout<< "client no data more than <" << duration_cast<milliseconds>(SESSION_TIMEOUT).count() << "> ms, shutdown" << std::endl; socket_.shutdown(asio::socket_base::shutdown_both); socket_.close(); return; } async_deadline_wait(); } ); } /** * @brief SOFTWARE on message handler * @param[in] message The received message * &&表示右值引用,可以將字面常量、臨時對象等右值綁定到右值引用上(也可以綁定到const 左值引用上,但是左值不能綁定到右值引用上) * 右值引用也可以看作起名,只是它起名的對象是一個將亡值。然后延續這個將亡值的生命,直到這個引用銷毀的右值的生命也結束了。 */ void session::on_message(std::string&& message) { using namespace std; try { // print receive data std::cout<<"recv from client is: "<<message<<std::endl; // response to client string send_buf = "hello client, you send data is: " + message; asio::write(socket_, asio::buffer(send_buf)); } catch (exception& ex) { std::cout<<"some exception occured: "<< ex.what() << std::endl; } } } // namespace software
在main函數中傳入io和port,調用 software.hpp中的server(asio::io_context& io, unsigned short port)函數。
在server()函數中while(True)循環體中accept來自client的連接,每次接收到一個client的連接會創建一個session對象,在session對象中處理本次的連接socket。注意,此處采用的是one by one的同步處理方式,只有上一個session處理完成才能處理下一個session的請求,但是同步發送的請求消息不會丟失,只是暫時不會處理和返回;總的來說,server會按照請求的順序進行one by one處理。
在on_message()消息處理函數中會對收到的string數據進行處理(上述程序中以打印代替),然后調用asio::write(socket_, asio::buffer(send_buf))將response發送給client。
編譯:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
執行:./iotest 11112 (監聽端口為11112)
  tips:client1和clinet2可同時與server建立連接并發送數據,但是server會按照連接建立的先后順序對client發送的請求進行one by one處理,比如clinet1先與server建立了連接,那么只有等到clinet1的所有請求執行完成才會處理client2發送的請求;在等待期間client2發送的請求不會處理,但不會丟失。
將session類中的string recv_data_;替換成u_int8_t recv_data_[MAX_RECV_LEN];
on_message()數據處理函數變為:void on_message(const u_int8_t* recv_buf,std::size_t recv_len);
數據發送方式變為:socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){});
#ifndef __SOFTWARE_HPP__ #define __SOFTWARE_HPP__ #include <string> #include <iostream> #include <boost/asio.hpp> #define MAX_RECV_LEN 2048 namespace software { //! Session deadline duration constexpr auto SESSION_TIMEOUT = std::chrono::minutes(2); //! Protocol delimiter to software client;分隔符:接收來自client的string數據必須以"}\n"結尾 static constexpr char const* delimiter = "}"; namespace asio = boost::asio; using tcp = asio::ip::tcp; /** * @brief Session for software * Inherit @class enable_shared_from_this<> * in order to give the lifecycle to io context, * it'll causes the lifecycle automatically end when connection break * (async operation will return when connection break) * @code * asio::io_context io; * session sess(io, std::move(socket)); * io.run(); * @endcode */ class session { public: /* session constructor function */ session(asio::io_context& io, tcp::socket socket); /* session destructor function */ ~session(); private: /*! Async read session socket */ void do_read(); /*! Async wait deadline */ void async_deadline_wait(); /*! software on message handler */ void on_message(const u_int8_t* recv_buf,std::size_t recv_len); private: tcp::socket socket_; // tcp socket u_int8_t recv_data_[MAX_RECV_LEN]; // recv buffer[byte] asio::steady_timer deadline_; // wait deadline time,expire it will disconnect auto }; /** * @brief Start server to software(同步方式accept) * Will serve client one by one(同步方式) * @param[in] io The asio io context * @param[in] port The listen port */ inline void server(asio::io_context& io, unsigned short port) { std::cout<<"sync server start, listen port: " << port << std::endl; tcp::acceptor acceptor(io, tcp::endpoint(tcp::v4(), port)); // 一次處理一個連接[one by one] while (true) { using namespace std; // client請求放在隊列中,循環逐個處理,處理完繼續阻塞 tcp::socket sock(io); acceptor.accept(sock); // 一開始會阻塞在這,等待software client連接 io.restart(); session sess(io, std::move(sock)); // io socket io.run(); // run until session async operations done,調用run()函數進入io事件循環 } } } // namespace software #endif
#include "software.hpp" using namespace std; namespace software { /** * @brief Session construct function * @param[in] io The io context * @param[in] socket The connected session socket */ session::session(asio::io_context& io, tcp::socket socket) : socket_(std::move(socket)) , deadline_(io) { std::cout<<"session created: " << socket_.remote_endpoint() <<std::endl; do_read(); //在構造函數中調用do_read()函數完成對software數據的讀取 async_deadline_wait(); //set on-request-deadline } session::~session() { std::cout<<"session destruct!" << std::endl; } /** * @brief 從software異步讀取數據并存放在recv_data_中 */ void session::do_read() { auto handler = [this](std::error_code ec, std::size_t length) { // recv data success, dispose the received data in [on_message] func if (!ec && socket_.is_open() && length != 0) { on_message(recv_data_, length); memset(recv_data_,0,sizeof(recv_data_));// 將recv_data_擦除為0 do_read(); // Register async read operation again,重新執行讀取操作 } // error occured, shutdown the session else if (socket_.is_open()) { std::cout<<"client offline, close session" << std::endl; socket_.shutdown(asio::socket_base::shutdown_both); // 關閉socket socket_.close(); // 關閉socket deadline_.cancel(); // deadline wait計時取消 } }; std::cout<<"server waiting message..." << std::endl; //block here to receive some byte from client socket_.async_receive(asio::buffer(recv_data_,MAX_RECV_LEN),handler); deadline_.expires_after(SESSION_TIMEOUT); // close session if no request,超時2min自動關閉session } /** * @brief Async wait for the deadline,計時等待函數 * @pre @a deadline_.expires_xxx() must called */ void session::async_deadline_wait() { using namespace std::chrono; deadline_.async_wait( //! lambda function [this](std::error_code) { if (!socket_.is_open()) return; if (deadline_.expiry() <= asio::steady_timer::clock_type::now()) { std::cout<< "client no data more than <" << duration_cast<milliseconds>(SESSION_TIMEOUT).count() << "> ms, shutdown" << std::endl; socket_.shutdown(asio::socket_base::shutdown_both); socket_.close(); return; } async_deadline_wait(); } ); } /** * @brief SOFTWARE on message handler * @param[in] recv_buf The received byte array address * @param[in] recv_len The received byte length */ void session::on_message(const u_int8_t* recv_buf,std::size_t recv_len) { using namespace std; try { // print receive data std::cout<<"recv data length is: "<<recv_len<<" data is: "; for(int i = 0; i<recv_len; i++) printf("%x ",recv_buf[i]); std::cout<<std::endl; // response to client socket_.async_send(asio::buffer(recv_buf,recv_len),[](error_code ec, size_t size){}); } catch (exception& ex) { std::cout<<"some exception occured: "<< ex.what() << std::endl; } } } // namespace software
編譯:g++ main.cpp software.cpp -o iotest -lpthread -lboost_system -std=c++17
執行:./iotest 11112 (監聽端口為11112)
