您好,登錄后才能下訂單哦!
本篇內容介紹了“微服務開源框架TARS之有哪些基礎組件”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
先看下框架對TC_ThreadQueue
類的使用如下:
typedef TC_ThreadQueue<tagRecvData*, deque<tagRecvData*> > recv_queue; // 接收隊列 typedef TC_ThreadQueue<tagSendData*, deque<tagSendData*> > send_queue; // 發送隊列
TC_ThreadQueue
的實現比較簡單,在TARS的網絡層實現中可以發現這個類比較重要,因為從框架中收到的網絡包都會加入到這個緩存隊列里面,然后多業務線程 ServantHandle
會調用 waitForRecvQueue
從該隊列里面取網絡數據包,然后調用 dispatch
調用協議消息對應的處理函數,先看下框架對 TC_ThreadQueue
的實現:
/** * @brief 線程安全隊列 */ template<typename T, typename D = deque<T> > class TC_ThreadQueue { public: TC_ThreadQueue():_size(0){}; public: typedef D queue_type; /** * @brief 從頭部獲取數據, 沒有數據拋異常 * * @param t * @return bool: true, 獲取了數據, false, 無數據 */ T front(); /** * @brief 從頭部獲取數據, 沒有數據則等待. * * @param t * @param millsecond(wait = true時才生效) 阻塞等待時間(ms) * 0 表示不阻塞 * -1 永久等待 * @param wait, 是否wait * @return bool: true, 獲取了數據, false, 無數據 */ bool pop_front(T& t, size_t millsecond = 0, bool wait = true); ... ... }
TC_ThreadQueue
使用了C++11標準庫中的<mutex>
和<condition_variable>
用于實現線程鎖和 wait,如下,看下隊列的成員函數:push_front
在隊列前面加入數據,
template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const T& t, bool notify) { if(notify) { std::unique_lock<std::mutex> lock(_mutex); _cond.notify_one(); _queue.push_front(t); ++_size; } else { std::lock_guard<std::mutex> lock (_mutex); _queue.push_front(t); ++_size; } }
如上圖調用push_front
函數的時候調用 std::unique_lock<std::mutex> lock(_mutex)
加鎖 ,避免網絡層接收數據和業務層取同一隊列的數據沖突,_cond.notify_one()
通知等待在該鎖上某一個線程醒過來,調用該函數之前必須加鎖,因為有數據過來了,例如網絡層有線程需要取包并進行分發處理。
再看一個成員函數pop_front
,從頭部獲取數據,沒有數據則等待。millisecond
阻塞等待時間(ms)
0
表示不阻塞
-1
永久等待
template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size_t millsecond, bool wait) { if(wait) { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) { if (millsecond == 0) { return false; } if (millsecond == (size_t) -1) { _cond.wait(lock); } else { //超時了 if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) { return false; } } } if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } else { std::lock_guard<std::mutex> lock (_mutex); if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } }
BindAdapter::waitForRecvQueue
的函數就是調用了pop_front
函數,用于等待接收隊列,函數原型如下:
bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_ptr<RecvContext> &data) { bool bRet = getRecvQueue(handleIndex).pop_front(data); if (!bRet) { return bRet; } --_iRecvBufferSize; return bRet; }
這里BindAdapter::waitForRecvQueue
用于業務線程在等待服務器監聽的適配器收到網絡包后進行業務包的處理,這里傳入的handleIndex
表示接收隊列索引,獲取對應的_rbuffer
。
TC_ThreadLock
類的定義如下
typedef TC_Monitor<TC_ThreadMutex, TC_ThreadCond> TC_ThreadLock;
TC_Monitor
線程鎖監控模板類。通常線程鎖,都通過該類來使用,而不是直接用TC_ThreadMutex
、TC_ThreadRecMutex
。
類的定義template <class T, class P> class TC_Monitor
需要傳入兩個模板參數,TC_Monitor
包括以下成員變量:
mutable int _nnotify; // 上鎖的次數 mutable P _cond; // 條件變量 T _mutex; // 互斥鎖 /** * @brief 定義鎖控制對象 */ typedef TC_LockT<TC_Monitor<T, P> > Lock; typedef TC_TryLockT<TC_Monitor<T, P> > TryLock;
第一個參數 TC_ThreadMutex
代表線程鎖:同一個線程不可以重復加鎖 ,包含成員變量
mutable std::mutex _mutex
延伸閱讀,這里
tc_thread_mutex.h
還包括另外一個循環鎖類TC_ThreadRecMutex
,即一個線程可以加多次鎖,定義如下:
// 定義于tc_monitor.h中 typedef TC_Monitor<TC_ThreadRecMutex, TC_ThreadCond> TC_ThreadRecLock;
第二個參數 TC_ThreadCond
代表線程信號條件類:所有鎖可以在上面等待信號發生,包含線程條件成員變量:
mutable std::condition_variable_any _cond
結合實際的使用場景,TC_Monitor::timedWait()
會調用 TC_ThreadCond
對象的 timedWait
函數,下一步調用 chrono
庫的 milliseconds
;TC_ThreadCond::signal()
實現發送信號,等待在該條件上的一個線程會醒。
TC_LockT
類定義: template <typename T> class TC_LockT
鎖模板類,與其他具體鎖配合使用,構造時候加鎖,析夠的時候解鎖。
TC_LockT
構造函數,傳入互斥量初始化成員變量 _mutex
,TC_LockT
構造函數實現:
TC_LockT(const T& mutex) : _mutex(mutex) { _mutex.lock(); _acquired = true; }
到這里就可以看出 TC_Monitor
定義的 typedef TC_LockT<TC_Monitor<T, P> > Lock
,這里 Lock
類型的模板參數用的是 TC_Monitor
類。
實際使用場景如下:
Lock lock(*this);
TC_LockT
的構造函數,傳入參數 this
為 TC_Monitor
的子類對象,TC_LockT
的構造函數調用_mutex.lock()
;實際就是調用了 TC_Monitor
對象的 lock
函數,TC_Monitor
的 lock
函數實現:
void lock() const { _mutex.lock(); _nnotify = 0; }
這里 _mutex
為 TC_ThreadMutex
對象,進一步調用了 TC_ThreadRecMutex::lock()
成員函數,實現如下:
void TC_ThreadMutex::lock() const { _mutex.lock(); }
然后上面定義的lock棧變量退出函數的時候調用 TC_LockT
的析構函數:實現如下:
virtual ~TC_LockT() { if (_acquired) { _mutex.unlock(); //這里會調用TC_Monitor的unlock函數 } }
TC_Monitor
的 unlock
函數實現:
void unlock() const { notifyImpl(_nnotify); _mutex.unlock(); //這里會調用C++標準庫<mutex>中的unlock }
這里調用 notifyImpl
函數是因為 TC_Monitor
類不只可以實現簡單的互斥鎖功能,還可以實現條件變量Condition功能,其中 notifyImpl
的實現為
void notifyImpl(int nnotify) const { if(nnotify != 0) { if(nnotify == -1) { _cond.broadcast(); return; } else { while(nnotify > 0) { _cond.signal(); --nnotify; } } } }
還是老樣子,先看下項目實際對線程基類的使用。實際項目使用中,我們對 TC_Thread
又封裝了一下,實現了一個BasicThread
類,下面看下 BasicThread
的定義:
class BasicThread : public tars::TC_Thread, public tars::TC_ThreadLock { ... void terminate() { _bTerm = true; { Lock lock(*this); notifyAll(); } getThreadControl().join(); } }
BasicThread
類,繼承了 TC_Thread
和 TC_ThreadLock
,其中 TC_ThreadLock
第二點已經說明過了,所以這里重點看下 TC_Thread
類的使用,TC_Thread
的定義
class TC_Thread : public TC_Runable { ... /** * 使用了C++11標準線程庫std::thread, 構造函數傳參數threadEntry線程函數, * 返回 TC_ThreadControl(_th),其中_th為std::thread對象 */ TC_ThreadControl start(); static void threadEntry(TC_Thread *pThread); //靜態函數, 線程入口 virtual void run() = 0; ... }
下一步看下線程控制類 TC_ThreadControl
的定義:
class TC_ThreadControl { ... explicit TC_ThreadControl(std::thread *th); // 構造,傳入std::thread對象 void join(); // 調用std::thread的join()阻塞當前的線程,直到另外一個線程運行結束 static void sleep(); // 調用std::this_thread::sleep函數線程將暫停執行 ... }
下一步看下 TC_Runable
的定義:
class TC_Runable { public: virtual ~TC_Runable(){}; virtual void run() = 0; //定義了run純虛函數 };
最后看下實際項目中對線程類的使用
class AntiSdkSyncThread : public BasicThread //這里等于多繼承了TC_Thread和TC_ThreadLock兩個類 { void run() //實現基類的純虛函數 { Lock lock(*this); timedWait(10 * 1000); (間隔執行時間,實現了線程的定時執行功能) if(NULL != g_busi_interf) { Int32 ret = g_busi_interf->proc_(); //需要定期執行的函數 } } }
定義好了 AntiSdkSyncThread g_antiSdkSyncThread;
類,那么需要啟動線程的時候執行g_antiSdkSyncThread.start();
就會自然創建線程,并且 threadEntry
線程函數會調用 pThread->run()
多態函數,進程退出的時候調用 g_antiSdkSyncThread.terminate();
。
這里的智能指針可以放在容器中,且線程安全的智能指針,CPP11標準庫的auto_ptr
是不能放在容器中的,貌似已經被淘汰了,目前多數使用CPP11標準庫的shared_ptr
,不過需要編譯器支持CPP11。
TC_HandleBase
智能指針基類的定義如下,所有需要智能指針的類都需要從該對象繼承,其中使用了C++11標準庫中的<atomic>
進行原子計數。
class UTIL_DLL_API TC_HandleBase { public: /** * @brief 復制 * * @return TC_HandleBase& */ TC_HandleBase& operator=(const TC_HandleBase&) { return *this; } /** * @brief 增加計數 */ void incRef() { ++_atomic; } /** * @brief 減少計數 */ void decRef() { if((--_atomic) == 0 && !_bNoDelete) { _bNoDelete = true; delete this; } } /** * @brief 獲取計數. * * @return int 計數值 */ int getRef() const { return _atomic; } /** * @brief 設置不自動釋放. * * @param b 是否自動刪除,true or false */ void setNoDelete(bool b) { _bNoDelete = b; } protected: /** * @brief 構造函數 */ TC_HandleBase() : _atomic(0), _bNoDelete(false) { } /** * @brief 拷貝構造 */ TC_HandleBase(const TC_HandleBase&) : _atomic(0), _bNoDelete(false) { } /** * @brief 析構 */ virtual ~TC_HandleBase() { } protected: std::atomic<int> _atomic; // 引用計數 bool _bNoDelete; // 是否自動刪除 };
下一步看 TC_AutoPtr
智能指針模板類,可以放在容器中,且線程安全的智能指針,該智能指針通過引用計數實現,其構造函數和析構函數定義如下:
template<typename T> class TC_AutoPtr { TC_AutoPtr(T* p = 0) { _ptr = p; if(_ptr) { _ptr->incRef(); //構造函數 引用計算加1 } } ... ~TC_AutoPtr() { if(_ptr) { _ptr->decRef(); //析構函數 引用計算減1 } } }
struct ConnStruct : public TC_HandleBase{...} typedef TC_AutoPtr<ConnStruct> ConnStructPtr;
TC_AutoPtr
拷貝構造調用 _ptr->incRef();
這里 ptr
為 ConnStruct
,ConnStruct
繼承于TC_HandleBase
,等于調用了TC_HandleBaseT<int>::incRef() {++_atomic;}
引用計數原子操作加1、析構引用計數原子操作減1,當引用計數減少到0時根據設置的開關是否要進行刪除來決定是否觸發delete。
// 定義回調函數智能指針,其中SessionCallback父類繼承于TC_HandleBase typedef TC_AutoPtr<SessionCallback> SessionCallbackPtr; //創建回調類SessionCallbackPtr,并傳入初始化參數uin gameid等; SessionCallbackPtr cb = new SessionCallback(iUin, iGameId, iSeqID, iCmd,sSessionID, theServant, current, cs, this); //異步調用sessionserver遠程接口 getSessionPrx()->async_getSession(cb, iUin, iGameId);
接口返回完成,回調SessionCallback::callback_getSession(tars::Int32 ret, const MGComm::SessionValue& retValue)
函數,接收sessionserver
接口的返回的SessionValue
結構。
因為 SessionCallbackPtr
使用了智能指針,所以業務不需要去手動釋放前面 new
出來的 SessionCallbackPtr
,還是比較方便的。
TC_Mysql封裝好的mysql操作類,非線程安全,對于 insert/update 可以有更好的函數封裝,防止SQL注入
使用方式:
TC_Mysql mysql; //初始化mysql,init時不鏈接,請求時自動建立鏈接; //數據庫可以為空; //端口默認為3306 mysql.init("192.168.1.2", "pc", "pc@sn", "db_tars_demo");
通常用:void init(const TC_DBConf& tcDBConf);
直接初始化數據庫。例如:stDirectMysql.init(_stZoneDirectDBConf);
看下TC_DBConf
的定義
struct TC_DBConf { string _host; string _user; string _password; string _database; string _charset; int _port; int _flag; //客戶端標識 TC_DBConf() : _port(0) , _flag(0) {} /** * @brief 讀取數據庫配置. * * @param mpParam 存放數據庫配置的map * dbhost: 主機地址 * dbuser:用戶名 * dbpass:密碼 * dbname:數據庫名稱 * dbport:端口 */ void loadFromMap(const map<string, string> &mpParam) { map<string, string> mpTmp = mpParam; _host = mpTmp["dbhost"]; _user = mpTmp["dbuser"]; _password = mpTmp["dbpass"]; _database = mpTmp["dbname"]; _charset = mpTmp["charset"]; _port = atoi(mpTmp["dbport"].c_str()); _flag = 0; if(mpTmp["dbport"] == "") { _port = 3306; } } };
//進一步看下獲取數據的使用 TC_Mysql::MysqlData data; data = mysql.queryRecord("select * from t_app_users"); for(size_t i = 0; i < data.size(); i++) { //如果不存在ID字段,則拋出異常 cout << data[i]["ID"] << endl; }
查詢出來的mysql數據用MysqlData
封裝
class MysqlData { ... vector<map<string, string> >& data(); ... }
//插入數據,指定數據的類型:數值 或 字符串,對于字符串會自動轉義 map<string, pair<TC_Mysql::FT, string> > m; m["ID"] = make_pair(TC_Mysql::DB_INT, "2334"); m["USERID"] = make_pair(TC_Mysql::DB_STR, "abcttt"); m["APP"] = make_pair(TC_Mysql::DB_STR, "abcapbbp"); m["LASTTIME"] = make_pair(TC_Mysql::DB_INT, "now()"); mysql.replaceRecord("t_user_logs", m);
整個TARS核心就提供一個很完善的網絡框架,包括RPC功能,這里只介紹幾個常用的網絡組件。
提供socket的操作類;支持tcp/udp socket;支持本地域套接字。
再下一層TARS封裝了TC_TCPClient
和TC_UDPClient
兩個類用于實際操作tcp和udp應用。
例如:tcp客戶端
TC_TCPClient stRouterClient; stRouterClient.init(sIP, iPort, iTimeOut); // 這里傳入ip和端口然后調用sendRecv進行消息的收發 Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
注意多線程使用的時候,不能多線程同時send/recv,小心串包。
提供網絡epoll的操作類,默認是ET模式,當狀態發生變化的時候才獲得通知,提供add、mod、del、wait等基礎操作。
提供關鍵成員函數init(const string &sIp, int iPort, int iTimeout)
,傳入 IP 端口 和 超時時間
TC_TCPClient
繼承于 TC_ClientSocket
提供成員函數:
sendRecv
(發送到服務器, 從服務器返回不超過iRecvLen的字節)
sendRecvBySep
( 發送倒服務器, 并等待服務器直到結尾字符, 包含結尾字符)
stRouterClient.init(sIP, iPort, iTimeOut); size_t iRecvLen = sizeof(recvBuf)-1; Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
同理還有TC_UDPClient
實現UDP客戶端。
命令解析類;
通常用于解析命令行參數;
只支持雙—的參數形式
分析main
的輸入參數,支持以下形式的參數:
./main.exe --name=value --param1 param2 param3
TC_Option op; //解析命令行 op.decode(argc, argv); //獲取成對的參數,即獲取 - - 表示的所有參數對 map<string, string> mp = op.getMulti(); //表示非 – 的參數:即 param2, param3 vector<string> d = op.getSingle();
如果value,param有空格或者 --
,用引號括起來就可以了。
配置文件解析類(兼容wbl模式);
支持從string中解析配置文件;
支持生成配置文件;
解析出錯拋出異常;
采用[]獲取配置,如果無配置則拋出異常;
采用get獲取配置,不存在則返回空;
讀取配置文件是線程安全的,insert域等函數非線程安全
TC_Config config; config.parseFile(ServerConfig::BasePath + ServerConfig::ServerName + ".conf"); stTmpGameServerConfig.iGameId = TC_Common::strto<UInt32>(config["/Main/<GameId>"]);
配置文件樣例
<Main> GameId = 3001 ZoneId = 102 AsyncThreadCheckInterval = 1000 ... </Main>
使用get
方法例子:如果讀不到該配置,則返回默認值 sDefault
,即下面例子中的 20000000
stTmpGameServerConfig.iMaxRegNum = TC_Common::strto<Int32>(config.get("/Main/<MaxRegNum>", "20000000"));
TC_Functor
參考loki
庫的設計
仿函數對象調用方式, 即對上述的幾種方式都可以在右側添加一對圓括號,并在括號內部放一組合適的參數來調用,例如 a(p1,p2);
把整個調用(包括參數)封裝一個函數對象, 調用對象建立時就傳入了參數,調用的時候不用傳入參數,例如 A a(p1, p2); a();
簡單又好用的封裝,具體見下面使用例子自然明白:
void TestFunction3(const string &s, int i){ cout << "TestFunction3('">
C函數調用用wrapper封裝:
//調用封裝,構造的時候傳入參數 TC_Functor<void,TL::TLMaker<const string&, int>::Result>::wrapper_type fwrapper3(cmd3, s3, 10); fwrapper3(); //參數已經在構造的時候傳入,調用的時候不用傳參數了
說明:
void
: 函數的返回值
TL::TLMaker<const string&, int>::Result
: 代表參數類型
對于調用的封裝,注意對于傳引用類型,具體的調用時候要保證引用的對象存在。
struct TestMember { void mem3(const string &s, int i) { cout << "TestMember::mem3(" << s << "," << i << ") called" << endl; } } TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(&tm, &TestMember::mem3); cmd3("a", 33);
指向類成員函數的調用用wrapper封裝:
TC_Functor<void, TL::TLMaker<const string&, int>::Result >::wrapper_type fwrapper3(cmd3, "a", 10); fwrapper3();
服務初始化initialize的時候,一般會調用
addServantProtocol(sRouterObj, AppProtocol::parseStream<0, uint16_t, false>,iHeaderLen);
這里設置BindAdapter
的協議解析函數 protocol_functor _pf
為 parseStream
函數,如下:
/** * @param T * @param offset * @param netorder * @param in * @param out * @return int */ template<size_t offset, typename T, bool netorder> static TC_NetWorkBuffer::PACKET_TYPE parseStream(TC_NetWorkBuffer& in,vector<char>& out) { size_t len = offset + sizeof(T); if (in.getBufferLength() < len) { return TC_NetWorkBuffer::PACKET_LESS; } string header; in.getHeader(len, header); assert(header.size() == len); T iHeaderLen = 0; ::memcpy(&iHeaderLen, header.c_str() + offset, sizeof(T)); if (netorder) { iHeaderLen = net2host<T>(iHeaderLen); } //長度保護一下 if (iHeaderLen < (T)(len) || (uint32_t)iHeaderLen > TARS_NET_MAX_PACKAGE_SIZE) { return TC_NetWorkBuffer::PACKET_ERR; } if (in.getBufferLength() < (uint32_t)iHeaderLen) { return TC_NetWorkBuffer::PACKET_LESS; } in.getHeader(iHeaderLen, out); assert(out.size() == iHeaderLen); in.moveHeader(iHeaderLen); return TC_NetWorkBuffer::PACKET_FULL; }
注冊好解析函數之后,網絡層收包調用parseProtocol
函數
int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf) { ... TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); //這里回調前面設置好的協議解析函數,從而實現協議解析 ... }
util/tc_hash_fun.h
中包含了對hash算法的實現,使用 hash_new
,可以對輸入的字節流進行hash得到相當均勻的hash值,使用方式如下
#include "util/tc_hash_fun.h" #include <iterator> #include <iostream> #include <sys/time.h> using namespace tars; using namespace std; int main(int argc, char* *argv[]) { unsigned int i = tars::hash_new<string>()("abcd"); cout << i << endl; return 0; }
class TC_Exception : public exception { /** * @brief 構造函數,提供了一個可以傳入errno的構造函數, * 異常拋出時直接獲取的錯誤信息 * * @param buffer 異常的告警信息 * @param err 錯誤碼, 可用strerror獲取錯誤信息 */ TC_Exception(const string &buffer, int err); }
“微服務開源框架TARS之有哪些基礎組件”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。