您好,登錄后才能下訂單哦!
小編給大家分享一下如何實現nginx進程鎖,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
nginx是多進程并發模型應用,直白點就是:有多個worker都在監聽網絡請求,誰接收某個請求,那么后續的事務就由它來完成。如果沒有鎖的存在,那么就是這種場景,當一個請求被系統接入后,所以可以監聽該端口的進程,就會同時去處理該事務。當然了,系統會避免這種糟糕事情的發生,但也就出現了所謂的驚群。(不知道說得對不對,大概是那么個意思吧)
所以,為了避免出現同一時刻,有許多進程監聽,就應該該多個worker間有序地監聽socket. 為了讓多個worker有序,所以就有了本文要講的進程鎖的出現了,只有搶到鎖的進程才可以進行網絡請求的接入操作。
即如下過程:
// worker 核心事務框架 // ngx_event.c void ngx_process_events_and_timers(ngx_cycle_t *cycle) { ngx_uint_t flags; ngx_msec_t timer, delta; if (ngx_timer_resolution) { timer = NGX_TIMER_INFINITE; flags = 0; } else { timer = ngx_event_find_timer(); flags = NGX_UPDATE_TIME; #if (NGX_WIN32) /* handle signals from master in case of network inactivity */ if (timer == NGX_TIMER_INFINITE || timer > 500) { timer = 500; } #endif } if (ngx_use_accept_mutex) { // 為了一定的公平性,避免反復爭搶鎖 if (ngx_accept_disabled > 0) { ngx_accept_disabled--; } else { // 只有搶到鎖的進程,進行 socket 的 accept() 操作 // 其他worker則處理之前接入的請求,read/write操作 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; } if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { timer = ngx_accept_mutex_delay; } } } } // 其他核心事務處理 if (!ngx_queue_empty(&ngx_posted_next_events)) { ngx_event_move_posted_next(cycle); timer = 0; } delta = ngx_current_msec; (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta); ngx_event_process_posted(cycle, &ngx_posted_accept_events); if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); } ngx_event_process_posted(cycle, &ngx_posted_events); } // 獲取鎖,并注冊socket accept() 過程如下 ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } if (ngx_enable_accept_events(cycle) == NGX_ERROR) { // 解鎖操作 ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; }
其他的不必多說,核心即搶到鎖的worker,才可以進行accept操作。而沒有搶到鎖的worker, 則要主動釋放之前的accept()權力。從而達到,同一時刻,只有一個worker在處理accept事件。
鎖這種東西,一般都是編程語言自己定義好的接口,或者固定用法。
比如 java 中的 synchronized xxx, Lock 相關并發包鎖如 CountDownLatch, CyclicBarrier, ReentrantLock, ReentrantReadWriteLock, Semaphore...
比如 python 中的 threading.Lock(), threading.RLock()...
比如 php 中的 flock()...
之所以說是入門級,是因為這都是些接口api, 你只要按照使用規范,調一下就可以了,無需更多知識。但要想用好各細節,則實際不簡單。
nginx因為是使用C語言編寫的,所以肯定是更接近底層些的。能夠通過它的實現,來看鎖如何實現,應該能夠讓我們更能理解鎖的深層次含義。
一般地,鎖包含這么幾個大方向:鎖數據結構定義,上鎖邏輯,解鎖邏輯,以及一些通知機制,超時機制什么的。下面我們就其中幾個方向,看下nginx 實現:
首先要定義出鎖有些什么變量,然后實例化一個值,共享給多進程使用。
// event/ngx_event.c // 全局accept鎖變量定義 ngx_shmtx_t ngx_accept_mutex; // 這個鎖有一個 // atomic 使用 volatile 修飾實現 typedef volatile ngx_atomic_uint_t ngx_atomic_t; typedef struct { #if (NGX_HAVE_ATOMIC_OPS) // 有使用原子更新變量實現鎖,其背后是共享內存區域 ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; #endif #else // 有使用fd實現鎖,fd的背后是一個文件實例 ngx_fd_t fd; u_char *name; #endif ngx_uint_t spin; } ngx_shmtx_t; // 共享內存數據結構定義 typedef struct { u_char *addr; size_t size; ngx_str_t name; ngx_log_t *log; ngx_uint_t exists; /* unsigned exists:1; */ } ngx_shm_t;
有了鎖實例,就可以對其進行上鎖解鎖了。nginx有兩種鎖實現,主要是基于平臺的差異性決定的:基于文件或者基于共享內在實現。基于fd即基于文件的實現,這個還是有點重的操作。如下:
// ngx_shmtx.c ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { ngx_err_t err; err = ngx_trylock_fd(mtx->fd); if (err == 0) { return 1; } if (err == NGX_EAGAIN) { return 0; } #if __osf__ /* Tru64 UNIX */ if (err == NGX_EACCES) { return 0; } #endif ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name); return 0; } // core/ngx_shmtx.c // 1. 上鎖過程 ngx_err_t ngx_trylock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; } // os/unix/ngx_file.c ngx_err_t ngx_lock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; // 調用系統提供的上鎖方法 if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; } return 0; } // 2. 解鎖實現 // core/ngx_shmtx.c void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { ngx_err_t err; err = ngx_unlock_fd(mtx->fd); if (err == 0) { return; } ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name); } // os/unix/ngx_file.c ngx_err_t ngx_unlock_fd(ngx_fd_t fd) { struct flock fl; ngx_memzero(&fl, sizeof(struct flock)); fl.l_type = F_UNLCK; fl.l_whence = SEEK_SET; if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; } return 0; }
重點就是 fcntl() 這個系統api的調用,無他。當然,站在一個旁觀者角度來看,實際就是因為多進程對文件的操作是可見的,所以達到進程鎖的目的。其中,tryLock 和 lock 存在一定的語義差異,即try時,會得到一些是否成功的標識,而直接進行lock時,則不能得到標識。一般會要求阻塞住請求
也許在有些地方,一個鎖實例的初始化,就是一個變量的簡單賦值而已。但在nginx有些不同。首先,需要保證各worker能看到相同的實例或者相當的實例。因為worker是從master處fork()出來的進程,所以只要在master中實例化好的鎖,必然可以保證各worker能拿到一樣的值。那么,到底是不是只是這樣呢?
// 共享鎖的初始化,在ngx master 中進行,后fork()到worker進程 // event/ngx_event.c static ngx_int_t ngx_event_module_init(ngx_cycle_t *cycle) { void ***cf; u_char *shared; size_t size, cl; // 定義一段共享內存 ngx_shm_t shm; ngx_time_t *tp; ngx_core_conf_t *ccf; ngx_event_conf_t *ecf; cf = ngx_get_conf(cycle->conf_ctx, ngx_events_module); ecf = (*cf)[ngx_event_core_module.ctx_index]; if (!ngx_test_config && ngx_process <= NGX_PROCESS_MASTER) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "using the \"%s\" event method", ecf->name); } ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); ngx_timer_resolution = ccf->timer_resolution; #if !(NGX_WIN32) { ngx_int_t limit; struct rlimit rlmt; if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "getrlimit(RLIMIT_NOFILE) failed, ignored"); } else { if (ecf->connections > (ngx_uint_t) rlmt.rlim_cur && (ccf->rlimit_nofile == NGX_CONF_UNSET || ecf->connections > (ngx_uint_t) ccf->rlimit_nofile)) { limit = (ccf->rlimit_nofile == NGX_CONF_UNSET) ? (ngx_int_t) rlmt.rlim_cur : ccf->rlimit_nofile; ngx_log_error(NGX_LOG_WARN, cycle->log, 0, "%ui worker_connections exceed " "open file resource limit: %i", ecf->connections, limit); } } } #endif /* !(NGX_WIN32) */ if (ccf->master == 0) { return NGX_OK; } if (ngx_accept_mutex_ptr) { return NGX_OK; } /* cl should be equal to or greater than cache line size */ cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ #if (NGX_STAT_STUB) size += cl /* ngx_stat_accepted */ + cl /* ngx_stat_handled */ + cl /* ngx_stat_requests */ + cl /* ngx_stat_active */ + cl /* ngx_stat_reading */ + cl /* ngx_stat_writing */ + cl; /* ngx_stat_waiting */ #endif shm.size = size; ngx_str_set(&shm.name, "nginx_shared_zone"); shm.log = cycle->log; // 分配共享內存空間, 使用 mmap 實現 if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } shared = shm.addr; ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; ngx_accept_mutex.spin = (ngx_uint_t) -1; // 基于共享文件或者內存賦值進程鎖,從而實現多進程控制 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl); (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "counter: %p, %uA", ngx_connection_counter, *ngx_connection_counter); ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl); tp = ngx_timeofday(); ngx_random_number = (tp->msec << 16) + ngx_pid; #if (NGX_STAT_STUB) ngx_stat_accepted = (ngx_atomic_t *) (shared + 3 * cl); ngx_stat_handled = (ngx_atomic_t *) (shared + 4 * cl); ngx_stat_requests = (ngx_atomic_t *) (shared + 5 * cl); ngx_stat_active = (ngx_atomic_t *) (shared + 6 * cl); ngx_stat_reading = (ngx_atomic_t *) (shared + 7 * cl); ngx_stat_writing = (ngx_atomic_t *) (shared + 8 * cl); ngx_stat_waiting = (ngx_atomic_t *) (shared + 9 * cl); #endif return NGX_OK; } // core/ngx_shmtx.c // 1. 基于文件進程共享空間, 使用 fd ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { // 由master進程創建,所以是進程安全的操作,各worker直接使用即可 if (mtx->name) { // 如果已經創建好了,則 fd 已被賦值,不能創建了,直接共享fd即可 // fd 的背后是一個文件實例 if (ngx_strcmp(name, mtx->name) == 0) { mtx->name = name; return NGX_OK; } ngx_shmtx_destroy(mtx); } // 使用文件創建的方式鎖共享 mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN, NGX_FILE_DEFAULT_ACCESS); if (mtx->fd == NGX_INVALID_FILE) { ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno, ngx_open_file_n " \"%s\" failed", name); return NGX_ERROR; } // 創建完成即可刪除,后續只基于該fd實例做鎖操作 if (ngx_delete_file(name) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", name); } mtx->name = name; return NGX_OK; } // 2. 基于共享內存的共享鎖的創建 // ngx_shmtx.c ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock; if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } mtx->spin = 2048; #if (NGX_HAVE_POSIX_SEM) mtx->wait = &addr->wait; if (sem_init(&mtx->sem, 1, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); } else { mtx->semaphore = 1; } #endif return NGX_OK; } // os/unix/ngx_shmem.c ngx_int_t ngx_shm_alloc(ngx_shm_t *shm) { shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); if (shm->addr == MAP_FAILED) { ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size); return NGX_ERROR; } return NGX_OK; }
基于fd的鎖實現,本質是基于其背后的文件系統的實現,因為文件系統是進程可見的,所以對于相同fd控制,就是對共同的鎖的控制了。
所謂共享內存,實際就是一塊公共的內存區域,它超出了進程的范圍(受操作系統管理)。就是前面我們看到的mmap()的創建,就是一塊共享內存。
// ngx_shmtx.c ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { // 直接對共享內存區域的值進行改變 // cas 改變成功即是上鎖成功。 return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); } // shm版本的解鎖操作, cas 解析,帶通知 void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } } // 通知等待進程 static void ngx_shmtx_wakeup(ngx_shmtx_t *mtx) { #if (NGX_HAVE_POSIX_SEM) ngx_atomic_uint_t wait; if (!mtx->semaphore) { return; } for ( ;; ) { wait = *mtx->wait; if ((ngx_atomic_int_t) wait <= 0) { return; } if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) { break; } } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait); if (sem_post(&mtx->sem) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_post() failed while wake shmtx"); } #endif }
共享內存版本的鎖的實現,基本就是cas的對內存變量的設置。只是這個面向的內存,是共享區域的內存。
見過了許多的鎖,依然過不好這一關。
鎖到底是什么呢?事實上,鎖就是一個標識位。當有人看到這個標識位后,就主動停止操作,或者進行等等,從而使其看起來起到了鎖的作用。這個標識位,可以設置在某個對象中,也可以為設置在某個全局值中,還可以借助于各種存在介質,比如文件,比如redis,比如zk 。 這都沒有差別。因為問題關鍵不在存放在哪里,而在于如何安全地設置這個標識位。
要實現鎖,一般都需要要一個強有力的底層含義保證,比如cpu層面的cas操作,應用級別的隊列串行原子操作。。。
至于什么,內存鎖,文件鎖,高級鎖,都是有各自的應用場景。而要選好各種鎖,則變成了評價高低地關鍵。此時此刻,你應該能判斷出來的!
以上是“如何實現nginx進程鎖”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。