91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PostgreSQL 源碼解讀(154)- 后臺進程#6(walsender#2)

發布時間:2020-08-17 10:38:21 來源:ITPUB博客 閱讀:326 作者:husthxd 欄目:關系型數據庫

本節繼續介紹PostgreSQL的后臺進程walsender,重點介紹的是調用棧中的exec_replication_command和StartReplication函數.
調用棧如下:


(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、數據結構

StringInfo
StringInfoData結構體保存關于擴展字符串的相關信息.


/*-------------------------
 * StringInfoData holds information about an extensible string.
 * StringInfoData結構體保存關于擴展字符串的相關信息.
 *      data    is the current buffer for the string (allocated with palloc).
 *      data    通過palloc分配的字符串緩存
 *      len     is the current string length.  There is guaranteed to be
 *              a terminating '\0' at data[len], although this is not very
 *              useful when the string holds binary data rather than text.
 *      len     是當前字符串的長度.保證以ASCII 0(\0)結束(data[len] = '\0').
 *              雖然如果存儲的是二進制數據而不是文本時不太好使.
 *      maxlen  is the allocated size in bytes of 'data', i.e. the maximum
 *              string size (including the terminating '\0' char) that we can
 *              currently store in 'data' without having to reallocate
 *              more space.  We must always have maxlen > len.
 *      maxlen  以字節為單位已分配的'data'的大小,限定了最大的字符串大小(包括結尾的ASCII 0)
 *              小于此尺寸的數據可以直接存儲而無需重新分配.
 *      cursor  is initialized to zero by makeStringInfo or initStringInfo,
 *              but is not otherwise touched by the stringinfo.c routines.
 *              Some routines use it to scan through a StringInfo.
 *      cursor  通過makeStringInfo或initStringInfo初始化為0,但不受stringinfo.c例程的影響.
 *              某些例程使用該字段掃描StringInfo
 *-------------------------
 */
typedef struct StringInfoData
{
    char       *data;
    int         len;
    int         maxlen;
    int         cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;

二、源碼解讀

exec_replication_command
exec_replication_command執行復制命令,如cmd_string被識別為WalSender命令,返回T,否則返回F.
其主要邏輯如下:
1.執行相關初始化和校驗
2.切換內存上下文
3.初始化復制掃描器
4.執行事務相關的判斷或校驗
5.初始化輸入輸出消息
6.根據命令類型執行相應的命令
6.1命令類型為T_StartReplicationCmd,調用StartReplication


/*
 * Execute an incoming replication command.
 * 執行復制命令.
 *
 * Returns true if the cmd_string was recognized as WalSender command, false
 * if not.
 * 如cmd_string被識別為WalSender命令,返回T,否則返回F
 */
bool
exec_replication_command(const char *cmd_string)
{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;
    /*
     * If WAL sender has been told that shutdown is getting close, switch its
     * status accordingly to handle the next replication commands correctly.
     * 如果WAL sender已被通知關閉,切換狀態以應對接下來的復制命令.
     */
    if (got_STOPPING)
        WalSndSetState(WALSNDSTATE_STOPPING);
    /*
     * Throw error if in stopping mode.  We need prevent commands that could
     * generate WAL while the shutdown checkpoint is being written.  To be
     * safe, we just prohibit all new commands.
     * 如在stopping模式,則拋出錯誤.
     * 我們需要在shutdown checkpoint寫入期間禁止命令的產生.
     * 安全期間,禁止所有新的命令.
     */
    if (MyWalSnd->state == WALSNDSTATE_STOPPING)
        ereport(ERROR,
                (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
     * command arrives. Clean up the old stuff if there's anything.
     * CREATE_REPLICATION_SLOT ... LOGICAL 導出快照直至下個命令到達.
     * 如存在,則清理舊的stuff.
     * 
     */
    SnapBuildClearExportedSnapshot();
    //檢查中斷
    CHECK_FOR_INTERRUPTS();
    //命令上下文
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_SIZES);
    old_context = MemoryContextSwitchTo(cmd_context);
    //初始化復制掃描器
    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));
    cmd_node = replication_parse_result;
    /*
     * Log replication command if log_replication_commands is enabled. Even
     * when it's disabled, log the command with DEBUG1 level for backward
     * compatibility. Note that SQL commands are not logged here, and will be
     * logged later if log_statement is enabled.
     * 如log_replication_commands啟用,則記錄復制命令在日志中.
     * 就算該選項被禁止,通過DEBUG1級別記錄日志.
     * 注意SQL命令不在這里記錄,在log_statement啟用的情況下在后續進行記錄.
     * 
     */
    if (cmd_node->type != T_SQLCmd)
        ereport(log_replication_commands ? LOG : DEBUG1,
                (errmsg("received replication command: %s", cmd_string)));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
     * called outside of transaction the snapshot should be cleared here.
     * CREATE_REPLICATION_SLOT ... LOGICAL導出快照.
     * 該命令如果在事務的外層被調用,那么快照應在這里清除.
     */
    if (!IsTransactionBlock())
        SnapBuildClearExportedSnapshot();
    /*
     * For aborted transactions, don't allow anything except pure SQL, the
     * exec_simple_query() will handle it correctly.
     * 對于廢棄的事務,除了純SQL外不允許其他命令,exec_simple_query()函數可以正確處理這種情況.
     */
    if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
        ereport(ERROR,
                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                 errmsg("current transaction is aborted, "
                        "commands ignored until end of transaction block")));
    CHECK_FOR_INTERRUPTS();
    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once per command to reduce palloc overhead.
     * 為消息I/O分配緩存.
     * 每個命令執行一次以減少palloc的負載.
     */
    initStringInfo(&output_message);
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);
    /* Report to pgstat that this process is running */
    //向pgstat報告該進程正在運行.
    pgstat_report_activity(STATE_RUNNING, NULL);
    //根據命令類型執行相應的命令
    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            //識別系統
            IdentifySystem();
            break;
        case T_BaseBackupCmd:
            //BASE_BACKUP
            PreventInTransactionBlock(true, "BASE_BACKUP");
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;
        case T_CreateReplicationSlotCmd:
            //創建復制slot
            CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
            break;
        case T_DropReplicationSlotCmd:
            //刪除復制slot
            DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
            break;
        case T_StartReplicationCmd:
            //START_REPLICATION
            {
                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
                PreventInTransactionBlock(true, "START_REPLICATION");
                if (cmd->kind == REPLICATION_KIND_PHYSICAL)
                    StartReplication(cmd);
                else
                    StartLogicalReplication(cmd);
                break;
            }
        case T_TimeLineHistoryCmd:
            //構造時間線歷史 TIMELINE_HISTORY
            PreventInTransactionBlock(true, "TIMELINE_HISTORY");
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;
        case T_VariableShowStmt:
            //
            {
                DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
                VariableShowStmt *n = (VariableShowStmt *) cmd_node;
                GetPGVariable(n->name, dest);
            }
            break;
        case T_SQLCmd:
            //SQL命令
            if (MyDatabaseId == InvalidOid)
                ereport(ERROR,
                        (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
            /* Report to pgstat that this process is now idle */
            pgstat_report_activity(STATE_IDLE, NULL);
            /* Tell the caller that this wasn't a WalSender command. */
            return false;
        default:
            //其他命令
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }
    /* done */
    //執行完畢,回到原來的內存上下文中
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);
    /* Send CommandComplete message */
    //命令結束
    EndCommand("SELECT", DestRemote);
    /* Report to pgstat that this process is now idle */
    //報告狀態
    pgstat_report_activity(STATE_IDLE, NULL);
    return true;
}

StartReplication
StartReplication處理START_REPLICATION命令.
其主要邏輯如下:
1.執行相關初始化和校驗
2.選擇時間線
3.進入COPY模式
3.1設置狀態
3.2發送CopyBothResponse消息,啟動streaming
3.3初始化相關變量,如共享內存狀態等
3.4進入主循環(WalSndLoop)


/*
 * Handle START_REPLICATION command.
 * 處理START_REPLICATION命令
 *
 * At the moment, this never returns, but an ereport(ERROR) will take us back
 * to the main loop.
 * 該函數不會返回,但ereport(ERROR)調用可以回到主循環
 */
static void
StartReplication(StartReplicationCmd *cmd)
{
    StringInfoData buf;
    XLogRecPtr  FlushPtr;
    if (ThisTimeLineID == 0)
        //時間線校驗
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
    /*
     * We assume here that we're logging enough information in the WAL for
     * log-shipping, since this is checked in PostmasterMain().
     * 在這里,由于在PostmasterMain()假定已為log-shipping記錄了足夠多的信息
     *
     * NOTE: wal_level can only change at shutdown, so in most cases it is
     * difficult for there to be WAL data that we can still see that was
     * written at wal_level='minimal'.
     * 注意:wal_level只能在shutdown的情況下進行修改,
     *   因此在大多數情況下,很難看到在wal_level='minimal'的情況下的WAL數據.
     */
    if (cmd->slotname)
    {
        ReplicationSlotAcquire(cmd->slotname, true);
        //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     (errmsg("cannot use a logical replication slot for physical replication"))));
    }
    /*
     * Select the timeline. If it was given explicitly by the client, use
     * that. Otherwise use the timeline of the last replayed record, which is
     * kept in ThisTimeLineID.
     * 選擇時間線.
     * 如果通過客戶端明確給出,則使用該值.
     * 否則的話,使用最后重放記錄的時間線,在ThisTimeLineID中保存.
     */
    if (am_cascading_walsender)
    {
        /* this also updates ThisTimeLineID */
        //這也會更新ThisTimeLineID變量
        FlushPtr = GetStandbyFlushRecPtr();
    }
    else
        FlushPtr = GetFlushRecPtr();
    if (cmd->timeline != 0)
    {
        XLogRecPtr  switchpoint;
        sendTimeLine = cmd->timeline;
        if (sendTimeLine == ThisTimeLineID)
        {
            sendTimeLineIsHistoric = false;
            sendTimeLineValidUpto = InvalidXLogRecPtr;
        }
        else
        {
            List       *timeLineHistory;
            sendTimeLineIsHistoric = true;
            /*
             * Check that the timeline the client requested exists, and the
             * requested start location is on that timeline.
             * 檢查客戶端請求的時間線是否存在,請求的開始位置是否在該時間線上.
             */
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                         &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);
            /*
             * Found the requested timeline in the history. Check that
             * requested startpoint is on that timeline in our history.
             * 通過歷史文件找到請求的時間線.
             * 在歷史中檢查請求的開始點是否在時間線上.
             *
             * This is quite loose on purpose. We only check that we didn't
             * fork off the requested timeline before the switchpoint. We
             * don't check that we switched *to* it before the requested
             * starting point. This is because the client can legitimately
             * request to start replication from the beginning of the WAL
             * segment that contains switchpoint, but on the new timeline, so
             * that it doesn't end up with a partial segment. If you ask for
             * too old a starting point, you'll get an error later when we
             * fail to find the requested WAL segment in pg_wal.
             * 這是有意為之.我們只檢查在切換點之前沒有fork off的請求的時間線.
             * 我們不會檢查在請求的開始點之前的時間線.
             * 這是因為客戶端可以合法地請求從包含交換點的WAL端的開始處進行復制,
             *   在新的時間線上如此執行,以避免出現由于部分segment的問題導致出錯.
             * 如果客戶端請求一個較舊的開始點,在pg_wal中無法找到請求的WAL段時會報錯.
             *
             * XXX: we could be more strict here and only allow a startpoint
             * that's older than the switchpoint, if it's still in the same
             * WAL segment.
             * XXX: 我們可以更嚴格,如果仍然在同一個WAL segment中,那么可以只允許比切換點舊的開始點
             */
            if (!XLogRecPtrIsInvalid(switchpoint) &&
                switchpoint < cmd->startpoint)
            {
                ereport(ERROR,
                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
                                (uint32) (cmd->startpoint >> 32),
                                (uint32) (cmd->startpoint),
                                cmd->timeline),
                         errdetail("This server's history forked from timeline %u at %X/%X.",
                                   cmd->timeline,
                                   (uint32) (switchpoint >> 32),
                                   (uint32) (switchpoint))));
            }
            sendTimeLineValidUpto = switchpoint;
        }
    }
    else
    {
        sendTimeLine = ThisTimeLineID;
        sendTimeLineValidUpto = InvalidXLogRecPtr;
        sendTimeLineIsHistoric = false;
    }
    streamingDoneSending = streamingDoneReceiving = false;
    /* If there is nothing to stream, don't even enter COPY mode */
    //如果沒有任何東西需要stream,不需要啟動COPY命令
    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        /*
         * When we first start replication the standby will be behind the
         * primary. For some applications, for example synchronous
         * replication, it is important to have a clear state for this initial
         * catchup mode, so we can trigger actions when we change streaming
         * state later. We may stay in this state for a long time, which is
         * exactly why we want to be able to monitor whether or not we are
         * still here.
         * 在首次啟動復制時,standby節點會落后于master節點.
         * 對于某些應用,比如同步復制,對于這種初始的catchup模式有一個干凈的狀態是十分重要的,
         *   因此在改變streaming狀態時我們可以觸發相關的動作.
         * 我們可以處于這種狀態很長時間,這正是我們希望有能力監控我們是否仍在這里的原因.
         */
        //設置狀態
        WalSndSetState(WALSNDSTATE_CATCHUP);
        /* Send a CopyBothResponse message, and start streaming */
        //發送CopyBothResponse消息,啟動streaming
        pq_beginmessage(&buf, 'W');//W->COPY命令?
        pq_sendbyte(&buf, 0);
        pq_sendint16(&buf, 0);
        pq_endmessage(&buf);
        pq_flush();
        /*
         * Don't allow a request to stream from a future point in WAL that
         * hasn't been flushed to disk in this server yet.
         * 不允許請求該服務器上一個尚未刷入到磁盤上的WAL未來位置.
         */
        if (FlushPtr < cmd->startpoint)
        {
            ereport(ERROR,
                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
                            (uint32) (cmd->startpoint >> 32),
                            (uint32) (cmd->startpoint),
                            (uint32) (FlushPtr >> 32),
                            (uint32) (FlushPtr))));
        }
        /* Start streaming from the requested point */
        //從請求點開始streaming
        sentPtr = cmd->startpoint;
        /* Initialize shared memory status, too */
        //初始化共享內存狀態
        SpinLockAcquire(&MyWalSnd->mutex);
        MyWalSnd->sentPtr = sentPtr;
        SpinLockRelease(&MyWalSnd->mutex);
        SyncRepInitConfig();
        /* Main loop of walsender */
        //walsender主循環,開始復制,激活復制
        replication_active = true;
        //主循環
        WalSndLoop(XLogSendPhysical);
        //完結后設置為非活動狀態
        replication_active = false;
        if (got_STOPPING)
            proc_exit(0);//退出
        //設置狀態
        WalSndSetState(WALSNDSTATE_STARTUP);
        Assert(streamingDoneSending && streamingDoneReceiving);
    }
    if (cmd->slotname)
        ReplicationSlotRelease();
    /*
     * Copy is finished now. Send a single-row result set indicating the next
     * timeline.
     * Copy命令已完結.發送單行結果集以提升下一個timeline
     */
    if (sendTimeLineIsHistoric)
    {
        char        startpos_str[8 + 1 + 8 + 1];
        DestReceiver *dest;
        TupOutputState *tstate;
        TupleDesc   tupdesc;
        Datum       values[2];
        bool        nulls[2];
        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
                 (uint32) (sendTimeLineValidUpto >> 32),
                 (uint32) sendTimeLineValidUpto);
        dest = CreateDestReceiver(DestRemoteSimple);
        MemSet(nulls, false, sizeof(nulls));
        /*
         * Need a tuple descriptor representing two columns. int8 may seem
         * like a surprising data type for this, but in theory int4 would not
         * be wide enough for this, as TimeLineID is unsigned.
         */
        tupdesc = CreateTemplateTupleDesc(2);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
                                  INT8OID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
                                  TEXTOID, -1, 0);
        /* prepare for projection of tuple */
        tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
        values[1] = CStringGetTextDatum(startpos_str);
        /* send it to dest */
        do_tup_output(tstate, values, nulls);
        end_tup_output(tstate);
    }
    /* Send CommandComplete message */
    pq_puttextmessage('C', "START_STREAMING");
}

三、跟蹤分析

在主節點上用gdb跟蹤postmaster,在PostgresMain上設置斷點后啟動standby節點,進入斷點


[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

第一個命令是IDENTIFY_SYSTEM,第二個命令才是需要跟蹤的對象START_REPLICATION


(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

1.執行相關初始化和校驗


(gdb) n
1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb) 
1454        SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, 
  writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456        CHECK_FOR_INTERRUPTS();
(gdb)

2.切換內存上下文


(gdb) 
1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb) 
1461        old_context = MemoryContextSwitchTo(cmd_context);
(gdb)

3.初始化復制掃描器


(gdb) 
1463        replication_scanner_init(cmd_string);
(gdb) n
1464        parse_rc = replication_yyparse();
(gdb) 
1465        if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb) 
(gdb) n
1471        cmd_node = replication_parse_result;
(gdb)
(gdb) 
1479        if (cmd_node->type != T_SQLCmd)
(gdb) n
1480            ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)

4.執行事務相關的判斷或校驗


(gdb) n
1487        if (!IsTransactionBlock())
(gdb) 
1488            SnapBuildClearExportedSnapshot();
(gdb) 
1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb) 
1500        CHECK_FOR_INTERRUPTS();
(gdb)

5.初始化輸入輸出消息


(gdb) 
1506        initStringInfo(&output_message);
(gdb) 
1507        initStringInfo(&reply_message);
(gdb) 
1508        initStringInfo(&tmpbuf);
(gdb) 
1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根據命令類型執行相應的命令
6.1命令類型為T_StartReplicationCmd,調用StartReplication


(gdb) n
1513        switch (cmd_node->type)
(gdb) 
1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb) 
1536                    PreventInTransactionBlock(true, "START_REPLICATION");
(gdb) 
1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb) 
1539                        StartReplication(cmd);

進入StartReplication


1539                        StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532     if (ThisTimeLineID == 0)
(gdb)

1.執行相關初始化和校驗


(gdb) n
546     if (cmd->slotname)
(gdb) 
560     if (am_cascading_walsender)
(gdb)

2.選擇時間線


(gdb) n
568     if (cmd->timeline != 0)
(gdb) 
572         sendTimeLine = cmd->timeline;
(gdb) 
573         if (sendTimeLine == ThisTimeLineID)
(gdb) 
575             sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576             sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb) 
634     streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, 
  startpoint = 1560281088, options = 0x0}
(gdb)

3.進入COPY模式


(gdb) n
637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)

3.1設置狀態


648         WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)

3.2發送CopyBothResponse消息,啟動streaming


(gdb) n
651         pq_beginmessage(&buf, 'W');
(gdb) 
652         pq_sendbyte(&buf, 0);
(gdb) 
653         pq_sendint16(&buf, 0);
(gdb) 
654         pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0:  0
(gdb) x/32hb buf->data
0x1df53b0:  0   0   0   127 127 127 127 127
0x1df53b8:  127 127 127 127 127 127 127 127
0x1df53c0:  127 127 127 127 127 127 127 127
0x1df53c8:  127 127 127 127 127 127 127 127
(gdb)

3.3初始化相關變量,如共享內存狀態等


(gdb) n
655         pq_flush();
(gdb) 
661         if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672         sentPtr = cmd->startpoint;
(gdb) 
675         SpinLockAcquire(&MyWalSnd->mutex);
(gdb) 
676         MyWalSnd->sentPtr = sentPtr;
(gdb) 
677         SpinLockRelease(&MyWalSnd->mutex);
(gdb) 
679         SyncRepInitConfig();
(gdb) 
682         replication_active = true;

3.4進入主循環(WalSndLoop)


(gdb) 
684         WalSndLoop(XLogSendPhysical);
(gdb)

DONE!

四、參考資料

PG Source Code

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

安乡县| 米林县| 县级市| 乌拉特前旗| 宁安市| 洛川县| 大埔区| 昭通市| 定南县| 吉木萨尔县| 丽江市| 海安县| 治县。| 房山区| 庐江县| 渭南市| 乳源| 延庆县| 滨海县| 镇沅| 泉州市| 丰顺县| 南充市| 东乌| 芷江| 白沙| 日喀则市| 乐都县| 汉中市| 北海市| 济南市| 奈曼旗| 天峻县| 昭平县| 隆林| 汝城县| 繁峙县| 炉霍县| 德化县| 买车| 琼海市|