91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

redis多路復用技術的示例分析

發布時間:2020-11-17 16:36:57 來源:億速云 閱讀:346 作者:小新 欄目:系統運維

小編給大家分享一下redis多路復用技術的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

redis 是一個單線程卻性能非常好的內存數據庫, 主要用來作為緩存系統。 redis 采用網絡IO多路復用技術來保證在多連接的時候, 系統的高吞吐量。

LINUX IO多路復用原理

redis的多路復用, 提供了select, epoll, evport, kqueue幾種選擇,在編譯的時候來選擇一種。

  • select是POSIX提供的, 一般的操作系統都有支撐;
  • epoll 是LINUX系統內核提供支持的;
  • evport是Solaris系統內核提供支持的;
  • kqueue是Mac 系統提供支持的;

我們一般運行的服務器都是LINUX系統上面, 并且我對Solaris和Mac系統不是很了解, 我們這里重點比較一下select、poll和epoll 3種多路復用的差異。

  • select: 單個進程所能打開的最大連接數有FD_SETSIZE宏定義, 其大小為1024或者2048; FD數目劇增后, 會帶來性能問題;消息傳遞從內核到與到用戶空間,需要copy數據;

    性能問題:
    (1)每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大
    (2)同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大

  • poll: 基本上與select一樣, 不通點在于沒有FD數目的限制, 因為底層實現不是一個數組, 而是鏈表;

  • epoll: FD連接數雖然有限制, 但是很大幾乎可以認為無限制;epoll內核中實現是根據每個fd上的callback函數來實現的,只有活躍的socket才會主動調用callback,所以在活躍socket較少的情況下,使用epoll沒有前面兩者的線性下降的性能問題; 內核和用戶通過共享內存來傳遞消息;

LINUX IO多路復用的接口

select 在LINUX的接口:

#include <sys/select.h>


/* According to earlier standards */

#include <sys/time.h>

#include <sys/types.h>

#include <unistd.h>


int select(int nfds, fd_set *readfds, fd_set *writefds,

                  fd_set *exceptfds, struct timeval *timeout);


void FD_CLR(int fd, fd_set *set);

int  FD_ISSET(int fd, fd_set *set);

void FD_SET(int fd, fd_set *set);

void FD_ZERO(fd_set *set);

select 函數的參數:
- nfds:fd_set的FD的個數, 采用位圖的方式記錄fd_set集合的FD狀態;
- readfds: fd_set 集合中來監控有哪些讀操作沒有被block, 如果有可讀,select
- writefds:fd_set 集合中來監控有哪些寫操作沒有被block;
- exceptfds: fd_set 集合中來監控有哪些except操作沒有被block;
- timeout: FD z最小被block的時間, 如果timeout的2個field都是0, 會立刻返回, 如果該參數是NULL, 會一直block;
select如果有一個或者多個讀操作, 寫操作, except操作不被block, 返回大于1的數值; 若果沒有不被block的FD, 但是某些FD block超時, 返回0; 如果錯誤出現, 返回-1;
FD_XXX函數, 是添加、刪除、清空以及判斷fd_set的工具函數。

select的pseudo 代碼:

while (1){

   int ret = select(streams[]);

   if (ret > 0 ) {

      for i in streams[] {

           if i has data {

              read or write streams[i];

           }

      }        

   } else if (ret == 0) {

      handle timeout FDs;

   }else {

      handle error

   }


}

epoll的LINUX的接口:

#include <sys/epoll.h>


//預定義的EVENT

enum EPOLL_EVENTS                                                                                                                                                                            

  {

    EPOLLIN = 0x001,

#define EPOLLIN EPOLLIN

    EPOLLPRI = 0x002,

#define EPOLLPRI EPOLLPRI

    EPOLLOUT = 0x004,

#define EPOLLOUT EPOLLOUT

    EPOLLRDNORM = 0x040,

#define EPOLLRDNORM EPOLLRDNORM

    EPOLLRDBAND = 0x080,

#define EPOLLRDBAND EPOLLRDBAND

    EPOLLWRNORM = 0x100,

#define EPOLLWRNORM EPOLLWRNORM

    EPOLLWRBAND = 0x200,

#define EPOLLWRBAND EPOLLWRBAND

    EPOLLMSG = 0x400,

#define EPOLLMSG EPOLLMSG

    EPOLLERR = 0x008,

#define EPOLLERR EPOLLERR

    EPOLLHUP = 0x010,

#define EPOLLHUP EPOLLHUP

    EPOLLRDHUP = 0x2000,

#define EPOLLRDHUP EPOLLRDHUP

    EPOLLWAKEUP = 1u << 29,

#define EPOLLWAKEUP EPOLLWAKEUP

    EPOLLONESHOT = 1u << 30,

#define EPOLLONESHOT EPOLLONESHOT

    EPOLLET = 1u << 31

#define EPOLLET EPOLLET

  };


int epoll_create(int size);

//創建epoll對象并回傳其描述符。


int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

//將要交由內核管控的文件描述符加入epoll對象并設置觸發條件。


int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

//等待已注冊之事件被觸發或計時終了。

epoll提供edge-triggered及level-triggered模式。在edge-trigger模式中,epoll_wait僅會在新的事件首次被加入epoll 對象時返回;于level-triggered模式下,epoll_wait在事件狀態未變更前將不斷被觸發。
舉例來說,倘若有一個已經于epoll注冊之管線接獲數據,epoll_wait將返回,并發出數據讀取的信號。現假設緩沖器的數據僅有部分被讀取并處理,在level-triggered模式下,任何對epoll_wait之調用都將即刻返回,直到緩沖器中的數據全部被讀取;然而,在edge-triggered的情境下,epoll_wait僅會于再次接收到新數據(亦即,新數據被寫入管線)時返回。

epoll的實現pseudo 代碼

epollfd = epoll_create()


while (1) {

    active_stream[] = epoll_wait(epollfd)

    for (i=0; i < len(active_stream[]); i++) {

        read or write active_stream[i]

    }

}

redis 多路復用的應用

接下來我們看一下, redis的多路復用如何實現的。整個redis的main函數包含如下3部分:
1、初始化Redis Server參數,這部分代碼通過initServerConfig實現。
2、初始化Redis Server,這部分代碼在initServer里面。
3、啟動事件輪詢器。

這里第一部分, 就是通過配置文件的參數來初始化server對象的參數, 和本文的主題沒有太大關系這里略過。
第二部分, 包含了創建輪詢器, 以及一個時間event隊列, 和file event數組。

void initServer(void) {

    ...

    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    if (server.el == NULL) {

        serverLog(LL_WARNING,

            "Failed creating the event loop. Error message: '%s'",

            strerror(errno));

     }

     ...

         /* Create the timer callback, this is our way to process many background

     * operations incrementally, like clients timeout, eviction of unaccessed

     * expired keys and so forth. */

    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {

        serverPanic("Can't create event loop timers.");

        exit(1);

    }


    /* Create an event handler for accepting new connections in TCP and Unix

     * domain sockets. */

    for (j = 0; j < server.ipfd_count; j++) {

        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,

            acceptTcpHandler,NULL) == AE_ERR)

            {

                serverPanic(

                    "Unrecoverable error creating server.ipfd file event.");

            }

    }

    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,

        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");



    /* Register a readable event for the pipe used to awake the event loop

     * when a blocked client in a module needs attention. */

    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,

        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {

            serverPanic(

                "Error registering the readable event for the module "

                "blocked clients subsystem.");

    }


第三部分, 是整個event loop部分:

int main() {

    // 第一部分

    // 第二部分入口

    initServer();

    ...

    aeSetBeforeSleepProc(server.el,beforeSleep);                                                                                                                                   

    aeSetAfterSleepProc(server.el,afterSleep);

    aeMain(server.el);

    aeDeleteEventLoop(server.el);

    return 0;

}


void aeMain(aeEventLoop *eventLoop) {                                                                                                                                              

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        if (eventLoop->beforesleep != NULL)

            eventLoop->beforesleep(eventLoop);

        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

    }

}


 * The function returns the number of events processed. */

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

         * some event fires. */

        numevents = aeApiPoll(eventLoop, tvp);


        /* After sleep callback. */

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

            eventLoop->aftersleep(eventLoop);


        for (j = 0; j < numevents; j++) {

            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;

            int fd = eventLoop->fired[j].fd;

            int rfired = 0;


        /* note the fe->mask & mask & ... code: maybe an already processed

             * event removed an element that fired and we still didn't

             * processed, so we check if the event is still valid. */

            if (fe->mask & mask & AE_READABLE) {

                rfired = 1;

                fe->rfileProc(eventLoop,fd,fe->clientData,mask);

            }

            if (fe->mask & mask & AE_WRITABLE) {

                if (!rfired || fe->wfileProc != fe->rfileProc)

                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);

            }

            processed++;

        }

    }

    /* Check time events */

    if (flags & AE_TIME_EVENTS)

        processed += processTimeEvents(eventLoop);


    return processed; /* return the number of processed file/time events */

}


static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {

    aeApiState *state = eventLoop->apidata;

    struct epoll_event ee = {0}; /* avoid valgrind warning */

    int mask = eventLoop->events[fd].mask & (~delmask);


    ee.events = 0;

    if (mask & AE_READABLE) ee.events |= EPOLLIN;

    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

    ee.data.fd = fd;

    if (mask != AE_NONE) {

        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);

    } else {

        /* Note, Kernel < 2.6.9 requires a non null event pointer even for

         * EPOLL_CTL_DEL. */

        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);

    }       

}   

以上是redis多路復用技術的示例分析的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

红原县| 清丰县| 三门峡市| 东阳市| 边坝县| 广丰县| 临海市| 蛟河市| 丹凤县| 百色市| 昌邑市| 民权县| 嵊州市| 灵台县| 莱阳市| 六枝特区| 托克托县| 成武县| 稻城县| 湾仔区| 德清县| 瑞金市| 武冈市| 昭觉县| 开江县| 平江县| 荣昌县| 上栗县| 吉林市| 同德县| 巴楚县| 波密县| 汕尾市| 土默特左旗| 和静县| 当阳市| 综艺| 广宁县| 逊克县| 白银市| 黄石市|