您好,登錄后才能下訂單哦!
小編給大家分享一下nginx如何動態修改upstream ngx_http_dyups_module,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
nginx 動態修改upstream不reload nginx模塊,ngx_http_dyups_module分析。
主要圍繞https://github.com/yzprofile/ngx_http_dyups_module/blob/master/ngx_http_dyups_module.c進行分析記錄下來。
開整......
在create_main_conf的時候初始化這個數組
static void * ngx_http_dyups_create_main_conf(ngx_conf_t *cf)
{
...
if (ngx_array_init(&dmcf->dy_upstreams, cf->pool, 1024, sizeof(ngx_http_dyups_srv_conf_t)) != NGX_OK)
{
return NULL;
}
...
}
static ngx_http_module_t ngx_http_dyups_module_ctx = {
ngx_http_dyups_pre_conf, /* preconfiguration */
ngx_http_dyups_init, /* postconfiguration */
ngx_http_dyups_create_main_conf, /* create main configuration */
ngx_http_dyups_init_main_conf, /* init main configuration */
ngx_http_dyups_create_srv_conf, /* create server configuration */
NULL, /* merge server configuration */
NULL, /* create location configuration */
NULL /* merge location configuration */
};
在dyups init的時把upstream中的conf取出來放進去。
初始化dy_upstream鏈以及全局ngx_http_dyups_deleted_upstream。
static ngx_int_t ngx_http_dyups_init(ngx_conf_t *cf)
{
...
dmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_dyups_module);
umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
for (i = 0; i < umcf->upstreams.nelts; i++) {
duscf = ngx_array_push(&dmcf->dy_upstreams);
// 清零
ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));
duscf->pool = NULL;
// 賦值
duscf->upstream = uscfp[i];
duscf->dynamic = (uscfp[i]->port == 0
&& uscfp[i]->srv_conf && uscfp[i]->servers
&& uscfp[i]->flags & NGX_HTTP_UPSTREAM_CREATE);
duscf->deleted = 0;
// 賦值index
duscf->idx = i;
}
...
}
shm初始化是在ngx_http_dyups_init_main_conf函數中實現的,同時設置了read_mesg的超時時間,并且指定了大小。
static char *ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf)
{
...
if (dmcf->read_msg_timeout == NGX_CONF_UNSET_MSEC) {
// 一秒一次
dmcf->read_msg_timeout = 1000;
}
if (dmcf->shm_size == NGX_CONF_UNSET_UINT) {
dmcf->shm_size = 2 * 1024 * 1024;
}
return ngx_http_dyups_init_shm(cf, conf);
...
}
static char *ngx_http_dyups_init_shm(ngx_conf_t *cf, void *conf)
{
...
shm_zone = ngx_shared_memory_add(cf, &dmcf->shm_name, dmcf->shm_size,
&ngx_http_dyups_module);
shm_zone->data = cf->pool;
// 加進去的這個名頭的共享內存塊的init函數會在初始化的時候統一調用
shm_zone->init = ngx_http_dyups_init_shm_zone;
...
}
static ngx_int_t ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
{
...
shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
sh = ngx_slab_alloc(shpool, sizeof(ngx_dyups_shctx_t));
if (sh == NULL) {
return NGX_ERROR;
}
// 全局變量,sh和shpool
ngx_dyups_global_ctx.sh = sh;
ngx_dyups_global_ctx.shpool = shpool;
// 初始化msg->queue
ngx_queue_init(&sh->msg_queue);
sh->version = 0;
sh->status = NULL;
...
}
該函數在啟動進程時候調用,設定了一些定時器。
初始化共享內存,判斷如果是非正常退出的,那么重新加載upstream配置。
static ngx_int_t ngx_http_dyups_init_process(ngx_cycle_t *cycle)
{
...
// 設定定時器來定時read msg,同步信息
timer = &ngx_dyups_global_ctx.msg_timer;
timer->handler = ngx_http_dyups_read_msg;
ngx_add_timer(timer, dmcf->read_msg_timeout);
// 拿到全局的pool和sh
shpool = ngx_dyups_global_ctx.shpool;
sh = ngx_dyups_global_ctx.sh;
ngx_shmtx_lock(&shpool->mutex);
// 初始化的時候肯定是NULL,,申請對應數量進程數的內存
if (sh->status == NULL) {
sh->status = ngx_slab_alloc_locked(shpool,
sizeof(ngx_dyups_status_t) * ccf->worker_processes);
if (sh->status == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
ngx_memzero(sh->status,
sizeof(ngx_dyups_status_t) * ccf->worker_processes);
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
ngx_shmtx_unlock(&shpool->mutex);
// 判斷version,如果不是0的話,說明version已經在同步中被++了,所以是進程掛掉再被拉起來
if (sh->version != 0) {
//...
}
最核心的是ngx_http_dyups_read_msg函數,里面的是ngx_http_dyups_read_msg_locked函數
static void ngx_http_dyups_read_msg_locked(ngx_event_t *ev)
{
...
sh = ngx_dyups_global_ctx.sh;
shpool = ngx_dyups_global_ctx.shpool;
for (i = 0; i < ccf->worker_processes; i++) {
status = &sh->status[i];
if (status->pid == 0 || status->pid == ngx_pid) {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"[dyups] process %P update time %ui",
status->pid, status->time);
// 遍歷全部進程,將對應的pid賦值
status->pid = ngx_pid;
status->time = now;
break;
}
}
// 遍歷消息隊列
for (q = ngx_queue_last(&sh->msg_queue);
q != ngx_queue_sentinel(&sh->msg_queue);
q = ngx_queue_prev(q))
{
// 如果該msg的count和進程數一致,就是大家都同步過了,把這個msg刪掉
if (msg->count == ccf->worker_processes) {
t = ngx_queue_next(q); ngx_queue_remove(q); q = t;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] destroy msg %V:%V",&msg->name, &msg->content);
ngx_dyups_destroy_msg(shpool, msg);
continue;
}
found = 0;
for (i = 0; i < msg->count; i++) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,"[dyups] msg pids [%P]", msg->pid[i]);
if (msg->pid[i] == ngx_pid) {
found = 1;
break;
}
}
// 如果發現該進程了,就說明已經同步過了。
if (found) {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui found", &msg->name, msg->count);
continue;
}
// 如果沒發現的話,count++,pid更新
msg->pid[i] = ngx_pid;
msg->count++;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui", &msg->name, msg->count);
// 取出來name和content
name = msg->name;
content = msg->content;
// 執行同步
rc = ngx_dyups_sync_cmd(pool, &name, &content, msg->flag);
if (rc != NGX_OK) {
ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "[dyups] read msg error, may cause the " "config inaccuracy,name:%V, content:%V",&name, &content);
}
}
...
}
static ngx_int_t ngx_dyups_sync_cmd(ngx_pool_t *pool, ngx_str_t *name, ngx_str_t *content, ngx_uint_t flag)
{
...
} else if (flag == NGX_DYUPS_ADD) {
body.start = body.pos = content->data;
body.end = body.last = content->data + content->len;
body.temporary = 1;
rc = ngx_dyups_do_update(name, &body, &rv);
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] sync add: %V rv: %V rc: %i", name, &rv, rc);
if (rc != NGX_HTTP_OK) {
return NGX_ERROR;
}
return NGX_OK;
}
...
}
同步其他進程接受的信息,如果是當前進程處理的就要把信息添加到消息隊列中。
ngx_int_t ngx_dyups_update_upstream(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)
{
...
ngx_http_dyups_read_msg_locked(timer);
// 沙箱測試配置
status = ngx_dyups_sandbox_update(buf, rv);
if (status != NGX_HTTP_OK) {
goto finish;
}
status = ngx_dyups_do_update(name, buf, rv);
if (status == NGX_HTTP_OK) {
//把操作發到隊列中去
if (ngx_http_dyups_send_msg(name, buf, NGX_DYUPS_ADD)) {
ngx_str_set(rv, "alert: update success "
"but not sync to other process");
status = NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
...
}
ngx_http_dyups_send_msg函數分析
static ngx_int_t ngx_http_dyups_send_msg(ngx_str_t *name, ngx_buf_t *body, ngx_uint_t flag)
{
...
// 初始化整個msg,將name和body填充進去
sh->version++;
ngx_queue_insert_head(&sh->msg_queue, &msg->queue);
...
}
在update之前先find尋找對應的upstream。
static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)
{
...
duscfs = dumcf->dy_upstreams.elts;
for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {
duscf = &duscfs[i];
uscf = duscf->upstream;
if (uscf->host.len != name->len
|| ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)
!= 0)
{
continue;
}
*idx = i;
return duscf;
}
...
}
如果尋找到了idx賦值。
一旦發現尋找到了對應name的dy_upstream就先判斷。
然后調用的是ngx_dyups_mark_upstream_delete函數
static void ngx_dyups_mark_upstream_delete(ngx_http_dyups_srv_conf_t *duscf)
{
...
// 獲取umcf和uscf
uscf = duscf->upstream;
umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_upstream_module);
// us獲取這個dynamic upstream下的servers
us = uscf->servers->elts;
for (i = 0; i < uscf->servers->nelts; i++) {
// 標志位置1
us[i].down = 1;
#if (NGX_HTTP_UPSTREAM_CHECK)
if (us[i].addrs) {
// 關閉peer,看宏定義主要關閉健康檢查的peer
ngx_http_upstream_check_delete_dynamic_peer(&uscf->host,
us[i].addrs);
}
#endif
}
// 將upstream對應的index的配置變成一個dummy配置
uscfp[duscf->idx] = &ngx_http_dyups_deleted_upstream;
#if (NGX_HTTP_UPSTREAM_RBTREE)
ngx_rbtree_delete(&umcf->rbtree, &uscf->node);
#endif
duscf->deleted = NGX_DYUPS_DELETING;
...
}
其中最重要的是check_delete_dynamic_peer
void ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name,ngx_addr_t *peer_addr)
{
...
/* 一堆比較 找到choosen*/
chosen = &peer[i];
chosen->shm->ref--;
if (chosen->shm->ref <= 0 && chosen->shm->delete != PEER_DELETED) {
ngx_http_upstream_check_clear_dynamic_peer_shm(chosen->shm);
chosen->shm->delete = PEER_DELETED;
}
ngx_shmtx_unlock(&chosen->shm->mutex);
ngx_http_upstream_check_clear_peer(chosen);
...
}
這樣子刪完一次之后,再find一次,idx大概率就變成-1了,就可以進行創建
static ngx_int_t ngx_dyups_do_update(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)
{
...
if (idx == -1) {
duscf = ngx_array_push(&dumcf->dy_upstreams);
uscfp = ngx_array_push(&umcf->upstreams);
ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));
// 這里為了獲取在umcf中的新upstream的index值。
idx = umcf->upstreams.nelts - 1;
}
duscf->idx = idx;
rc = ngx_dyups_init_upstream(duscf, name, idx);
rc = ngx_dyups_add_server(duscf, buf);
...
}
最重要的就是init_upstream和add_server。
首先是init ipstream,他的傳參其實是dy_srv_conf_t。upstream的name,以及upstream鏈表中對應的index
static ngx_int_t ngx_dyups_init_upstream(ngx_http_dyups_srv_conf_t *duscf, ngx_str_t *name, ngx_uint_t index)
{
...
umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_upstream_module);
uscfp = umcf->upstreams.elts;
/*初始化uscf 也就是upstream的各個結構體*/
uscfp[index] = uscf; // 賦值
duscf->dynamic = 1;
duscf->upstream = uscf;
ctx = ngx_pcalloc(duscf->pool, sizeof(ngx_http_conf_ctx_t));
// 存放ctx
duscf->ctx = ctx;
// insert進去uscf
uscf->node.key = ngx_crc32_short(uscf->host.data, uscf->host.len);
ngx_rbtree_insert(&umcf->rbtree, &uscf->node);
...
}
static ngx_int_t ngx_dyups_add_server(ngx_http_dyups_srv_conf_t *duscf, ngx_buf_t *buf)
{
...
ngx_dyups_parse_upstream(&cf, buf)
...
}
static char * ngx_dyups_parse_upstream(ngx_conf_t *cf, ngx_buf_t *buf)
{
...
b = *buf;
ngx_memzero(&conf_file, sizeof(ngx_conf_file_t));
conf_file.file.fd = NGX_INVALID_FILE;
conf_file.buffer = &b;
cf->conf_file = &conf_file;
return ngx_conf_parse(cf, NULL);
...
}
static ngx_int_t ngx_dyups_do_delete(ngx_str_t *name, ngx_str_t *rv)
{
...
duscf = ngx_dyups_find_upstream(name, &dumy);
// 如果查出來的是NULL,或者是一個已經被標記刪除,或者徹底刪除的,就說明要刪這個有異常
if (duscf == NULL || duscf->deleted) {
ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0, "[dyups] not find upstream %V %p", name, duscf);
ngx_str_set(rv, "not found uptream");
return NGX_HTTP_NOT_FOUND;
}
// 沒問題的話就執行正常刪除
ngx_dyups_mark_upstream_delete(duscf);
...
}
find upstream做了很多事還做了一部分的刪除操作。
static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)
{
...
dumcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_dyups_module);
duscfs = dumcf->dy_upstreams.elts;
for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {
// 這里是在mark_upstream中被標記的
if (duscf->deleted == NGX_DYUPS_DELETING) {
// 確認可以刪除,主要看這個ref的引用計數
if (*(duscf->ref) == 0) {
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] free dynamic upstream in find upstream" " %ui", duscf->idx);
duscf->deleted = NGX_DYUPS_DELETED;
if (duscf->pool) {
ngx_destroy_pool(duscf->pool);
duscf->pool = NULL;
}
}
}
// 如果是deleted或者是deleting,算是沒有找到,除非遍歷完沒找到返回一個deleted。
if (duscf->deleted == NGX_DYUPS_DELETING) {
continue;
}
if (duscf->deleted == NGX_DYUPS_DELETED) {
*idx = i;
duscf_del = duscf;
continue;
}
// 如果找到了就正常返回
if (uscf->host.len != name->len
|| ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)
!= 0)
{
continue;
}
*idx = i;
return duscf;
}
...
}
dyups的peer init get和free函數。
static ngx_int_t ngx_http_dyups_init_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
...
// 設置上下文
ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_dyups_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
// scf指向對應的dscf,ctx的data指向了自己
ctx->scf = dscf;
ctx->data = r->upstream->peer.data;
ctx->get = r->upstream->peer.get;
ctx->free = r->upstream->peer.free;
r->upstream->peer.data = ctx;
r->upstream->peer.get = ngx_http_dyups_get_peer;
r->upstream->peer.free = ngx_http_dyups_free_peer;
// 與客戶端的連接的這個pool注冊一個銷毀函數
cln = ngx_pool_cleanup_add(r->pool, 0);
if (cln == NULL) {
return NGX_ERROR;
}
// 引用計數加一
dscf->ref++;
// 等調用這個就會將ref--。
cln->handler = ngx_http_dyups_clean_request;
cln->data = &dscf->ref;
...
}
// 這個函數是在ngx_dyups_add_server初始化upstream中被賦值的
uscf->peer.init = ngx_http_dyups_init_peer;
// 調用的位置是在這里
static void ngx_http_upstream_init_request(ngx_http_request_t *r)
{
...
if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_http_upstream_connect(r, u);
...
}
static ngx_int_t ngx_http_dyups_get_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_http_dyups_ctx_t *ctx = data;
// 就用之前的peer get來。
return ctx->get(pc, ctx->data);
}
static void ngx_http_dyups_free_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_dyups_ctx_t *ctx = data;
ngx_pool_cleanup_t *cln;
/* upstream connect failed */
if (pc->connection == NULL) {
goto done;
}
if (pc->cached) {
goto done;
}
// free的時候先給ref++,等調用handler后ref--
ctx->scf->ref++;
// pool設置一個銷毀pool的數據結構,賦值給pool->cleanup
cln = ngx_pool_cleanup_add(pc->connection->pool, 0);
if (cln == NULL) {
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "[dyups] dynamic upstream free peer may cause memleak %i",ctx->scf->ref);
goto done;
}
// 銷毀的遞歸函數
cln->handler = ngx_http_dyups_clean_request;
cln->data = &ctx->scf->ref;
done:
// 結束后調用之前保存的free
ctx->free(pc, ctx->data, state);
}
static void ngx_http_dyups_clean_request(void *data)
{
ngx_uint_t *ref = data;
(*ref)--;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, "[dyups] http clean request count %i", *ref);
}
pool在被destroy的時候,會調用這個handler將引用計數減掉。
void ngx_destroy_pool(ngx_pool_t *pool)
{
ngx_pool_t *p, *n;
ngx_pool_large_t *l;
ngx_pool_cleanup_t *c;
for (c = pool->cleanup; c; c = c->next) {
if (c->handler) {
ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,
"run cleanup: %p", c);
c->handler(c->data);
}
}
...
}
以上是“nginx如何動態修改upstream ngx_http_dyups_module”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。