您好,登錄后才能下訂單哦!
這篇文章給大家介紹怎樣進行服務器程序的架構分析,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
下面將介紹我曾經做過的一個項目的服務器架構和服務器編程的一些重要細節。
程序運行環境
操作系統:Centos 7.0
編譯器:gcc/g++ 4.8.3、cmake 2.8.11
mysql數據庫:5.5.47
項目代碼管理工具:Visual Studio 2013
一、程序結構
該程序總共有 17 個線程,其中分為 9 個數據庫工作線程 D 和一個日志線程 L,6 個普通工作線程 W,一個主線程 M。(以下會用這些字母來代指這些線程)
(一)、數據庫工作線程的用途
9 個數據庫工作線程在線程啟動之初,與 mysql 建立連接,也就是說每個線程都與 mysql 保持一路連接,共 9 個數據庫連接。
每個數據庫工作線程同時存在兩個任務隊列,***個隊列 A 存放需要執行數據庫增刪查改操作的任務 sqlTask,第二個隊列 B 存放 sqlTask 執行完成后的結果。sqlTask 執行完成后立即放入結果隊列中,因而結果隊列中任務也是一個個的需要執行的任務。大致偽代碼如下:
void db_thread_func() { while (!m_bExit) { if (NULL != (pTask = m_sqlTask.Pop())) { //從m_sqlTask中取出的任務先執行完成后,pTask將攜帶結果數據 pTask->Execute(); //得到結果后,立刻將該任務放入結果任務隊列 m_resultTask.Push(pTask); continue; } sleep(1000); }//end while-loop }
現在的問題來了:
任務隊列 A 中的任務從何而來,目前只有消費者,沒有生產者,那么生產者是誰?
任務隊列 B 中的任務將去何方,目前只有生產者沒有消費者。
這兩個問題先放一會兒,等到后面我再來回答。
(二)工作線程和主線程
在介紹主線程和工作線程具體做什么時,我們介紹下服務器編程中常常抽象出來的幾個概念(這里以 tcp 連接為例):
TcpServer 即 Tcp 服務,服務器需要綁定ip地址和端口號,并在該端口號上偵聽客戶端的連接(往往由一個成員變量 TcpListener 來管理偵聽細節)。所以一個 TcpServer 要做的就是這些工作。除此之外,每當有新連接到來時,TcpServer 需要接收新連接,當多個新連接存在時,TcpServer 需要有條不紊地管理這些連接:連接的建立、斷開等,即產生和管理下文中說的TcpConnection 對象。
一個連接對應一個 TcpConnection 對象,TcpConnection 對象管理著這個連接的一些信息:如連接狀態、本端和對端的 ip 地址和端口號等。
數據通道對象 Channel,Channel 記錄了 socket 的句柄,因而是一個連接上執行數據收發的真正執行者,Channel 對象一般作為 TcpConnection 的成員變量。
TcpSession 對象,是將 Channel 收取的數據進行解包,或者對準備好的數據進行裝包,并傳給 Channel 發送。
歸納起來:一個 TcpServer 依靠 TcpListener 對新連接的偵聽和處理,依靠TcpConnection 對象對連接上的數據進行管理,TcpConnection 實際依靠 Channel 對數據進行收發,依靠 TcpSession 對數據進行裝包和解包。也就是說一個 TcpServer 存在一個 TcpListener,對應多個 TcpConnection,有幾個TcpConnection 就有幾個TcpSession,同時也就有幾個 Channel。
以上說的 TcpServer、TcpListener、TcpConnection、Channel 和 TcpSession 是服務器框架的網絡層。一個好的網絡框架,應該做到與業務代碼脫耦。即上層代碼只需要拿到數據,執行業務邏輯,而不用關注數據的收發和網絡數據包的封包和解包以及網絡狀態的變化(比如網絡斷開與重連)。
拿數據的發送來說:
當業務邏輯將數據交給 TcpSession,TcpSession 將數據裝好包后(裝***程后可以有一些加密或壓縮操作),交給 TcpConnection::SendData(),而TcpConnection::SendData() 實際是調用 Channel::SendData(),因為 Channel 含有 socket 句柄,所以Channel::SendData() 真正調用send()/sendto()/write() 方法將數據發出去。
對于數據的接收,稍微有一點不同:
通過 select()/poll()/epoll() 等IO multiplex技術,確定好了哪些 TcpConnection 上有數據到來后,激活該 TcpConnection 的 Channel 對象去調用recv()/recvfrom()/read() 來收取數據。數據收到以后,將數據交由 TcpSession來處理,最終交給業務層。注意數據收取、解包乃至交給業務層是一定要分開的。我的意思是:***不要解包并交給業務層和數據收取的邏輯放在一起。因為數據收取是 IO 操作,而解包和交給業務層是邏輯計算操作。IO 操作一般比邏輯計算要慢。到底如何安排要根據服務器業務來取舍,也就是說你要想好你的服務器程序的性能瓶頸在網絡 IO 還是邏輯計算,即使是網絡 IO,也可以分為上行操作和下行操作,上行操作即客戶端發數據給服務器,下行即服務器發數據給客戶端。有時候數據上行少,下行大。(如游戲服務器,一個 npc 移動了位置,上行是該客戶端通知服務器自己***位置,而下行確是服務器要告訴在場的每個客戶端)。
工作線程的流程:
while (!m_bQuit) { epoll_or_select_func(); handle_io_events(); handle_other_things(); }
其中 epoll_or_select_func() 即是上文所說的通過 select()/poll()/epoll() 等 IO multiplex 技術,確定好了哪些 TcpConnection 上有數據到來。我的服務器代碼中一般只會監測 socket 可讀事件,而不會監測 socket 可寫事件。至于如何發數據,文章后面會介紹。所以對于可讀事件,以 epoll 為例,這里需要設置的標識位是:
EPOLLIN 普通可讀事件(當連接正常時,產生這個事件,recv()/read()函數返回收到的字節數;當連接關閉,這兩個函數返回0,也就是說我們設置這個標識已經可以監測到新來數據和對端關閉事件)
EPOLLRDHUP 對端關閉事件(linux man 手冊上說這個事件可以監測對端關閉,但我實際調試時發送即使對端關閉也沒觸發這個事件,仍然是EPOLLIN,只不過此時調用recv()/read()函數,返回值會為0,所以實際項目中是否可以通過設置這個標識來監測對端關閉,仍然待考證)
EPOLLPRI 帶外數據
muduo 里面將 epoll_wait 的超時事件設置為 1 毫秒,我的另一個項目將 epoll_wait 超時時間設置為 10 毫秒。這兩個數值供大家參考。
這個項目中,工作線程和主線程都是上文代碼中的邏輯,主線程監聽偵聽socket 上的可讀事件,也就是監測是否有新連接來了。主線程和每個工作線程上都存在一個 epollfd。如果新連接來了,則在主線程的 handle_io_events() 中接受新連接。產生的新連接的socket句柄掛接到哪個線程的 epollfd 上呢?這里采取的做法是 round-robin 算法,即存在一個對象CWorkerThreadManager 記錄了各個工作線程上工作狀態。偽碼大致如下:
void attach_new_fd(int newsocketfd) { workerthread = get_next_worker_thread(next); workerthread.attach_to_epollfd(newsocketfd); ++next; if (next > max_worker_thread_num) next = 0; }
即先從***個工作線程的 epollfd 開始掛接新來 socket,接著累加索引,這樣下次就是第二個工作線程了。如果所以超出工作線程數目,則從***個工作重新開始。這里解決了新連接 socket “負載均衡”的問題。在實際代碼中還有個需要注意的細節就是:epoll_wait 的函數中的 struct epoll_event 數量開始到底要設置多少個才合理?存在的顧慮是,多了浪費,少了不夠用,我在曾經一個項目中直接用的是 4096:
const int EPOLL_MAX_EVENTS = 4096; const int dwSelectTimeout = 10000; struct epoll_event events[EPOLL_MAX_EVENTS]; int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);
我在陳碩的 muduo 網絡庫中發現作者才用了一個比較好的思路,即動態擴張數量:開始是 n個,當發現有事件的 fd 數量已經到達 n 個后,將 struct epoll_event 數量調整成 2n 個,下次如果還不夠,則變成 4n 個,以此類推,作者巧妙地利用 stl::vector 在內存中的連續性來實現了這種思路:
//初始化代碼 std::vector<struct epoll_event> events_(16); //線程循環里面的代碼 while (m_bExit) { int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1); if (numEvents > 0) { if (static_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size() * 2); } } }
讀到這里,你可能覺得工作線程所做的工作也不過就是調用 handle_io_events() 來接收網絡數據,其實不然,工作線程也可以做程序業務邏輯上的一些工作。也就是在handle_other_things() 里面。那如何將這些工作加到 handle_other_things() 中去做呢?寫一個隊列,任務先放入隊列,再讓 handle_other_things() 從隊列中取出來做?我在該項目中也借鑒了muduo庫的做法。即 handle_other_things() 中調用一系列函數指針,偽碼如下:
void do_other_things() { somefunc(); } //m_functors是一個stl::vector,其中每一個元素為一個函數指針 void somefunc() { for (size_t i = 0; i < m_functors.size(); ++i) { m_functors[i](); } m_functors.clear(); }
//m_functors是一個stl::vector,其中每一個元素為一個函數指針 void somefunc() { for (size_t i = 0; i < m_functors.size(); ++i) { m_functors[i](); } m_functors.clear(); }
當任務產生時,只要我們將執行任務的函數 push_back 到 m_functors 這個 stl::vector 對象中即可。但是問題來了,如果是其他線程產生的任務,兩個線程同時操作 m_functors,必然要加鎖,這也會影響效率。muduo 是這樣做的:
void add_task(const Functor& cb) { std::unique_lock<std::mutex> lock(mutex_); m_functors.push_back(cb); } void do_task() { std::vector<Functor> functors; { std::unique_lock<std::mutex> lock(mutex_); functors.swap(m_functors); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } }
看到沒有,利用一個棧變量 functors 將 m_functors 中的任務函數指針倒換(swap)過來了,這樣大大減小了對 m_functors 操作時的加鎖粒度。前后變化:變化前,相當于原來 A 給 B 多少東西,B 消耗多少,A 給的時候,B 不能消耗;B 消耗的時候A不能給。現在變成A將東西放到籃子里面去,B 從籃子里面拿,B 如果拿去一部分后,只有消耗完了才會來拿,或者 A 通知 B 去籃子里面拿,而 B 忙碌時,A 是不會通知 B 來拿,這個時候 A 只管將東西放在籃子里面就可以了。
bool bBusy = false; void add_task(const Functor& cb) { std::unique_lock<std::mutex> lock(mutex_); m_functors_.push_back(cb); //B不忙碌時只管往籃子里面加,不要通知B if (!bBusy) { wakeup_to_do_task(); } } void do_task() { bBusy = true; std::vector<Functor> functors; { std::unique_lock<std::mutex> lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } bBusy = false; }
看,多巧妙的做法!
因為每個工作線程都存在一個 m_functors,現在問題來了,如何將產生的任務均衡地分配給每個工作線程。這個做法類似上文中如何將新連接的 socket 句柄掛載到工作線程的 epollfd 上,也是 round-robin 算法。上文已經描述,此處不再贅述。
還有種情況,就是希望任務產生時,工作線程能夠立馬執行這些任務,而不是等 epoll_wait 超時返回之后。這個時候的做法,就是使用一些技巧喚醒epoll_wait,Linux 系統可以使用socketpair 或 timerevent、eventfd 等技巧。
問題 1 的答案是:業務層產生任務可能會交給數據庫任務隊列A,這里的業務層代碼可能就是工作線程中 do_other_things() 函數執行體中的調用。至于交給這個 9 個數據庫線程的哪一個的任務隊列,同樣采用了 round-robin 算法。所以就存在一個對象 CDbThreadManager來管理這九個數據庫線程。下面的偽碼是向數據庫工作線程中加入任務:
bool CDbThreadManager::AddTask(IMysqlTask* poTask ) { if (m_index >= m_dwThreadsCount) { m_index = 0; } return m_aoMysqlThreads[m_index++].AddTask(poTask); }
同理問題 2 中的消費者也可能就是 do_other_things() 函數執行體中的調用。
現在來說問題 3,業務層的數據產生后,經過 TcpSession 裝包后,需要發送的話,產生任務丟給工作線程的 do_other_things(),然后在相關的 Channel 里面發送,因為沒有監測該 socket 上的可寫事件,所以該數據可能調用 send() 或者 write() 時會阻塞,沒關系,sleep() 一會兒,繼續發送,一直嘗試,到數據發出去。偽碼如下:
bool Channel::Send() { int offset = 0; while (true) { int n = ::send(socketfd, buf + offset, length - offset); if (n == -1) { if (errno == EWOULDBLOCK) { ::sleep(100); continue; } } //對方關閉了socket,這端建議也關閉 else if (n == 0) { close(socketfd); return false; } offset += n; if (offset >= length) break; } return true; }
最后,還有一個模塊日志線程沒有介紹,高性能的日志實現方案目前并不常見。
關于怎樣進行服務器程序的架構分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。