您好,登錄后才能下訂單哦!
本篇內容主要講解“Nginx事件處理模塊怎么理解”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Nginx事件處理模塊怎么理解”吧!
一、事件模塊主流程講解
事件模塊主要處理兩類事件:① 定時任務 ② I/O事件。其中定時任務是指nginx通過ngx_add_timer添加的事件,原型函數如下:
ngx_event_timer.h static ngx_inline void ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer) { // 計算過期時間點 // 過期時間 = 現在時間 + 過期時間 key = ngx_current_msec + timer; // 設置超時時間到 ev->timer.key = key; // 添加的紅黑樹 ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer); } // 并設置了全局變量方便在任何地方調用 #define ngx_add_timer ngx_event_add_timer
在此列舉一個本文涉及的超時管理:
① 工作進程在爭搶互斥鎖時,因為只有一個進程能夠獲取鎖,如果其他進程無法獲取鎖,不可能一直等待,所以設置accept_mutex_delay,超過了設定的事件就會發生超時處理,也就放棄爭搶鎖二選擇去釋放資源。
ngx_event_accept.c void ngx_event_accept(ngx_event_t *ev) { if (ngx_use_accept_mutex) { // xxx }else{ ngx_add_timer(ev, ecf->accept_mutex_delay); } }
其次就是I/O事件了,很容易理解就是處理讀寫事件,那么I/O事件以epoll為例,先來看下nginx處理事件的主流程:
ngx_process_cycle.c static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { for ( ;; ) { // 不斷循環處理事件 ngx_process_events_and_timers(cycle); } }
事件處理的入口在nginx進程模型的代碼里面,工作進程會一直循環取處理事件,其中的阻塞點就是epoll_wait,處理完一輪事件后又循環這個過程,直到master進程發來關閉的信號。接下來就是真正的開始處理事件的地方:
// 開始處理事件 void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; // ngx_timer_resolution 用于決定使用何種超時策略 // 如果ngx_timer_resolution非零0,則設置timer未無限大 // 另外ngx_timer_resolution還有作用就是控制gettimeofday調用的頻率,不過在x86_64系統中影響已經可忽略了 // 此時采用定時方案,在規定的時間,默認500ms,對紅黑樹中的元素進行一次掃描 // 并處理超時的節點 if (ngx_timer_resolution) { // 先設置時間為無限待,后面會設置未500 timer = NGX_TIMER_INFINITE; flags = 0; } else { // 如果ngx_timer_resolution為0 // 采用超時的方案,首先計算出最快超時的時間,然后等待這個時間段取處理超時的事件, // 處理完超時任務,再次計算下一次的超時時間,不斷地循環處理。 // 查詢紅黑樹,返回超時時間最小的節點,即最左邊的元素 /** * while (node->left != sentinel) { node = node->left; } */ // nginx通過紅黑樹來維護所有的time節點 // 將超時檢測時間設置未最快發生超時的事件對象的超時時刻和當前時刻的差 // timer = (ngx_msec_int_t) (node->key - ngx_current_msec); timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; #if (NGX_WIN32) /* handle signals from master in case of network inactivity */ // 如果timer是無限或者大于500,則設置未500 // 即如果選擇定時方案,會設置定時時間為500ms if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; } #endif } // 處理驚群現象(nginx是一個master,多個work競爭請求) // 驚群現象是指多個線程/進程同時監聽一個socket事件(nginx是多進程程序,共享80端口), // 當事件發生時,會喚醒所有等待的線程/進程,爭搶事件 // 但是最后只有一個線程/進程可以讀取事件成功 // 其他進程/線程爭搶失敗后重新等待或者其他操作,這種浪費資源的現象叫驚群 // 設置通過Accept互斥鎖來解決驚群現象 // 當nginx配置文件中worker的數量大于1時 // 且配置文件打開了accept_mutex時,ngx_use_accept_mutex會設為1 /* * nginx配置 events { accept_mutex on; #設置網路連接序列化,防止驚群現象發生,默認為on multi_accept on; #設置一個進程是否同時接受多個網絡連接,默認為off #use epoll; #事件驅動模型,select|poll|kqueue|epoll|resig|/dev/poll|eventport worker_connections 1024; #最大連接數,默認為512 } */ if (ngx_use_accept_mutex) { // ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; // 其中connection_n表示當前工作進程最大可承受連接數,free_connection_n表現當前空閑連接數 // 假設當前連接數為x,那么ngx_cycle->connection_n / 8 - connection_n + x => x - 7/8 * connection_n // 即當連接數超過最大數的7/8時,ngx_accept_disabled的值將大于0時,當前work處理的連接已經達到飽和 // 此時work不會競爭新的連接,nginx會任務當前函數已經經歷了一輪事件處理 // 即相應的負載減少了一點,并對ngx_accept_disabled自減 if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { // 如果活動連接數還沒有達到飽和,則獲取accept鎖 // 多個work,只有一個可以得到鎖, // 且獲取鎖的過程不是阻塞,獲取成功后,ngx_accept_mutex_held會設為1 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } // 如果ngx_accept_mutex_held = 1表示成功獲取到鎖 if (ngx_accept_mutex_held) { // 搶到了accept鎖的進程會被加上了NGX_POST_EVENTS標志 // 并加入到post_events隊列里 // 設置標志,延遲處理事件 flags |= NGX_POST_EVENTS; } else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } delta = ngx_current_msec; // 獲得互斥鎖的進程開始處理請求 // 主要調用epoll_wait等待事件 // 并根據事件類型加入到隊列ngx_posted_accept_events或者ngx_posted_events (void) ngx_process_events(cycle, timer, flags); // delta就是調用ngx_process_events消耗的時間 // ngx_current_msec的值是通過ngx_time_update函數得到的 // 如果不執行ngx_time_update,則delta依然為0 delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); // 處理新建連接事件 ngx_event_process_posted(cycle, &ngx_posted_accept_events); // accept事件一旦處理完成,當前進程就會釋放互斥鎖 if (ngx_accept_mutex_held) { // 釋放鎖 ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { // 不斷的從紅黑樹的節點中取出時間值最小的,判斷是否超時 // 如果超時就執行他們的事件函數,直到最小的時間不超時 // 處理超時事件 ngx_event_expire_timers(); } // 釋放鎖后開始處理ngx_posted_events隊列中的普通讀寫事件 ngx_event_process_posted(cycle, &ngx_posted_events); }
上一段代碼較長,這里解讀下多做了什么操作:
1. 首先是根據ngx_timer_resolution的值判斷當前定時策略,這個值是在nginx.conf文件可選配置:
timer_resolution 10ms
如果timer_resolution 非0,則選擇定時檢查方案,即設置定時時間,500ms,隔這個時間去紅黑樹中檢查時候有事件超時,如果有則處理超時事件。如果ngx_timer_resolution為0,則采用超時檢測方案,首先先計算最快超時的時間 timer = ngx_event_find_timer(),然后到了timer這個時間去處理超時事件,接著在計算一次timer,依次循環處理超時事件。
2. 根據ngx_use_accept_mutex判斷是否打開了互斥鎖配置,如果打開了互斥鎖配置則開始爭搶鎖,否則的話直接處理事件。首先先了解下爭搶鎖的過程,其核心就是函數ngx_trylock_accept_mutex:
// 各個工作線程會嘗試去獲取鎖 ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { // 非阻塞的方式獲取鎖,返回1表示獲取成功,0表示獲取失敗 if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } // 將cycle->listening中的端口信息添加到epoll事件中 // 即把監聽socket加入到epoll中進行監聽 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } // 監聽完成后會獲取到 ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } // 獲取鎖失敗的情況 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); // 獲取鎖失敗:本來就擁有鎖的情況 if (ngx_accept_mutex_held) { // 本來擁有鎖就直接將監聽套接口從自身的事件監聽機制刪除 if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; } // 擁有鎖標志置位0 ngx_accept_mutex_held = 0; } return NGX_OK; }
代碼較累贅,還是畫個圖說明下流程:
拿到鎖后就開始處理事件了。開始處理之前先 flags |= NGX_POST_EVENTS延遲事件實際處理的事件,那么是如何延遲的呢?實際是在函數ngx_epoll_process_events中:
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { rev->handler(rev); } }
我們已經直到影響flags的值是變量ngx_timer_resolution,其中有如下這一段代碼:
void ngx_process_events_and_timers(ngx_cycle_t *cycle) { if (ngx_timer_resolution) { flags = 0; } else { flags = NGX_UPDATE_TIME; } }
flags被設置為0或者1,所以執行 flags |= NGX_POST_EVENTS會導致flags & NGX_POST_EVENTS為非零,所以來自epoll_wait的事件會被緩存在隊列里面。
3. 接下來的操作就是要處理事件了,按照2中爭搶鎖的過程,搶到鎖的進程會用于事件的監聽套接口,調用ngx_process_events方法處理事件,實際上是調用了ngx_epoll_module.c中的ngx_epoll_process_events方法:
ngx_epoll_module.c static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { events = epoll_wait(ep, event_list, (int) nevents, timer); // 定時方案:回調函數會調用ngx_timer_signal_handler方法,并設置ngx_event_timer_alarm = 1,如果沒有執行回 // 調函數,則不會執行ngx_time_update。 // 超時方案:flags = 1,則flags & NGX_UPDATE_TIME一定為真,所以每次會調用該方法 if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) { // 更新ngx_current_msec的值 ngx_time_update(); } // 緩存事件到隊列 if (flags & NGX_POST_EVENTS) { queue = rev->accept ? &ngx_posted_accept_events : &ngx_posted_events; ngx_post_event(rev, queue); } else { rev->handler(rev); } }
代碼中可以看出,其中accept事件(即監聽端口上的可讀事件)會被緩存到隊列ngx_posted_accept_events,普通事件會被緩存到隊列ngx_posted_events。
4. 緩存完事件,接下來就是處理新建連接事件(accept事件),因為當前進程已經監聽了某個客戶端的端口,該端口的請求中的可讀事件先要處理下,該讀的數據讀完,即處理隊列ngx_posted_accept_events中的新建連接事件,如果在處理新建連接期間還有新的請求連接事件,會阻塞,等待下次進程獲取鎖后讀取。讀完可讀事件后就執行解鎖操作ngx_shmtx_unlock。
5. 鎖釋放完之后就處理連接套接口之后的連接事件了,即保存在隊列ngx_posted_events中的事件。
void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted) { ngx_queue_t *q; ngx_event_t *ev; while (!ngx_queue_empty(posted)) { q = ngx_queue_head(posted); ev = ngx_queue_data(q, ngx_event_t, queue); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev); ngx_delete_posted_event(ev); ev->handler(ev); } }
可以看出,就是不斷遍歷隊列,調用對應的handler處理事件。
二、介紹事件處理模型的初始化
因為我們上面的講解都是以epoll為例的,接下來就解釋下ngnix如何來選擇事件處理機制,首先先看下nginx.conf中的events模塊配置:
events { // 選擇使用的事件處理機制,這里使用epoll use epoll; // 是否同時接受多個網絡請求 multi_accept on; // 是否激活互斥鎖 accept_mutex on; // 設置最大可用連接數 worker_connections 65535; // 配置http連接的超時時間 keepalive_timeout 60; // 客戶端request請求中header的緩存大小 client_header_buffer_size 4k; // 靜態文件的緩存大小和緩存時間,比如html/css/image open_file_cache max=65535 inactive=60s; // 設置每次檢查緩存有效性的時間間隔 open_file_cache_valid 80s; // 靜態文件有效緩存時間內最少使用次數 open_file_cache_min_uses 1; // 設置是否允許緩存錯誤信息 open_file_cache_errors on; }
配置比較詳細,其實相關的也就是 use epoll配置。
首先nginx定義了統一的事件處理接口,封裝了各種事件處理機制(epoll/poll/select等)的執行函數:
ngx_event.h typedef struct { // 將一個事件(讀事件/寫事件)添加到事件驅動機制 ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 將一個事件(讀事件/寫事件)從事件驅動中上刪除 ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 啟用一個已經添加的事件(代碼暫時未使用) ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 禁用一個已經添加的事件(代碼暫時未使用) ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); // 將指定連接關聯的描述符加入到多路復用監控里 ngx_int_t (*add_conn)(ngx_connection_t *c); // 從多路復用監控里刪除指定連接關聯的描述符 ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); // 僅在多線程環境下調用,目前未使用 ngx_int_t (*notify)(ngx_event_handler_pt handler); // 阻塞等待發生,并對發生的事件逐個處理 ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,ngx_uint_t flags); // 初始化事件驅動模塊 ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); // 回收資源 void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
對封裝的接口定義全局的變量,以及該接口函數也定義相應的全局函數
ngx_event_actions_t ngx_event_actions ngx_process_events ngx_event_actions.process_events ngx_done_events ngx_event_actions.done ngx_add_event ngx_event_actions.add ngx_del_event ngx_event_actions.del ngx_add_conn ngx_event_actions.add_conn ngx_del_conn ngx_event_actions.del_conn ngx_notify ngx_event_actions.notify
所以添加一個事件(讀/寫)只需要調用ngx_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)。那么重點就是如何對ngx_event_actions進行賦值?搜索event/modules會發現在各種事件處理機制的模塊里面都存在對ngx_event_actions的賦值:
D:\mycode\nginx\src\event\modules\ngx_devpoll_module.c: 186: ngx_event_actions = ngx_devpoll_module_ctx.actions; D:\mycode\nginx\src\event\modules\ngx_epoll_module.c: 189: ngx_event_actions = ngx_epoll_module_ctx.actions D:\mycode\nginx\src\event\modules\ngx_eventport_module.c: 279: ngx_event_actions = ngx_eventport_module_ctx.actions; D:\mycode\nginx\src\event\modules\ngx_kqueue_module.c: 224: ngx_event_actions = ngx_kqueue_module_ctx.actions; D:\mycode\nginx\src\event\modules\ngx_poll_module.c: 96: ngx_event_actions = ngx_poll_module_ctx.actions; D:\mycode\nginx\src\event\modules\ngx_select_module.c: 105: ngx_event_actions = ngx_select_module_ctx.actions; D:\mycode\nginx\src\event\modules\ngx_win32_select_module.c: 106: ngx_event_actions = ngx_select_module_ctx.actions;
而系統采用哪個賦值語句取決于用戶在events模塊的配置中的use epoll
具體初始化的過程:
static char * ngx_event_core_init_conf(ngx_cycle_t *cycle, void *conf) { ngx_conf_init_uint_value(ecf->use, module->ctx_index); } #define ngx_conf_init_size_value(conf, default) // 用戶沒有指定則設置默認的值 if (conf == NGX_CONF_UNSET_SIZE) { conf = default; }
以上配置項通過函數ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module)獲取到,即ecf->use的值是epoll值的序號,而序號的設置在ngx_module.c中:
ngx_int_t ngx_preinit_modules(void) { ngx_uint_t i; for (i = 0; ngx_modules[i]; i++) { ngx_modules[i]->index = i; ngx_modules[i]->name = ngx_module_names[i]; } ngx_modules_n = i; ngx_max_module = ngx_modules_n + NGX_MAX_DYNAMIC_MODULES; return NGX_OK; }
在ngx_preinit_modules會循環遍歷所有ngx_modules中的值,并對模塊的index值賦值,用于標識這個模塊,所以這里存在疑問的是ngx_modules的值來自于哪里?查看源碼,發現在nginx.c中存在賦值語句 modules = ngx_dlsym(handle, "ngx_modules"):
ngx_event.c: ngx_event_process_init(ngx_cycle_t *cycle) { // 返回events模塊的配置集合 ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module); // 遍歷所有模塊 for (m = 0; cycle->modules[m]; m++) { if (cycle->modules[m]->type != NGX_EVENT_MODULE) { continue; } // 如果不等于ecf->use的值就繼續遍歷 if (cycle->modules[m]->ctx_index != ecf->use) { continue; } //找到用戶指定的事件處理機制 module = cycle->modules[m]->ctx; // 執行該模塊的初始化函數 if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { /* fatal */ exit(2); } break; } }
所以我們可以知道nginx對事件具體的處理邏輯定義在event/modules各個處理機制的模塊里,我們取其中一個為例(ngx_epoll_module.c),在上面的賦值語句ngx_event_actions = ngx_epoll_module_ctx.actions,其中ngx_event_actions封裝了nginx統一的事件處理調用函數,而ngx_epoll_module_ctx則定義了epoll模塊的上下文信息,是ngx_event_module_t類型的靜態變量,ngx_epoll_module.c中有如下定義:
typedef struct { ngx_str_t *name; void *(*create_conf)(ngx_cycle_t *cycle); char *(*init_conf)(ngx_cycle_t *cycle, void *conf); ngx_event_actions_t actions; } ngx_event_module_t;
具體的賦值語句就是定義靜態變量ngx_epoll_module_ctx:
static ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_create_conf, /* create configuration */ ngx_epoll_init_conf, /* init configuration */ // 對actions的賦值 { // 添加事件(讀/寫事件) ngx_epoll_add_event, /* add an event */ ngx_epoll_del_event, /* delete an event */ ngx_epoll_add_event, /* enable an event */ ngx_epoll_del_event, /* disable an event */ ngx_epoll_add_connection, /* add an connection */ ngx_epoll_del_connection, /* delete an connection */ #if (NGX_HAVE_EVENTFD) ngx_epoll_notify, /* trigger a notify */ #else NULL, /* trigger a notify */ #endif ngx_epoll_process_events, /* process the events */ ngx_epoll_init, /* init the events */ ngx_epoll_done, /* done the events */ } };
到這里,就已經通過ngx_event_actions = ngx_epoll_module_ctx.actions賦值語句將epoll的處理函數賦值給了nginx的全局變量ngx_event_actions,所以當我們調用全局函數ngx_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)實際上就是調用ngx_epoll_add_event。
到此,相信大家對“Nginx事件處理模塊怎么理解”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。