您好,登錄后才能下訂單哦!
本篇內容主要講解“PostgreSQL中StartLogStreamer分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“PostgreSQL中StartLogStreamer分析”吧!
本節簡單介紹了PostgreSQL的備份工具pg_basebackup源碼中實際執行備份邏輯的BaseBackup中對WAL數據進行備份的實現函數StartLogStreamer.
logstreamer_param
WAL data streamer參數.
typedef struct { ////后臺連接 PGconn *bgconn; //開始位置 XLogRecPtr startptr; //目錄或者tar文件,依賴于使用的模式 char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */ //系統標識符 char *sysidentifier; //時間線 int timeline; } logstreamer_param;
StreamCtl
接收xlog流數據時的全局參數
/* * Global parameters when receiving xlog stream. For details about the individual fields, * see the function comment for ReceiveXlogStream(). * 接收xlog流數據時的全局參數. * 每個域字段的詳細解釋,參見ReceiveXlogStream()函數注釋. */ typedef struct StreamCtl { //streaming的開始位置 XLogRecPtr startpos; /* Start position for streaming */ //時間線 TimeLineID timeline; /* Timeline to stream data from */ //系統標識符 char *sysidentifier; /* Validate this system identifier and * timeline */ //standby超時信息 int standby_message_timeout; /* Send status messages this often */ //是否同步(寫入時是否馬上Flush WAL data) bool synchronous; /* Flush immediately WAL data on write */ //在已歸檔的數據中標記segment為已完成 bool mark_done; /* Mark segment as done in generated archive */ //刷新到磁盤上以確保數據的一致性狀態(是否已刷新到磁盤上) bool do_sync; /* Flush to disk to ensure consistent state of * data */ //在返回T時停止streaming stream_stop_callback stream_stop; /* Stop streaming when returns true */ //如有效,監測該socket中的輸入并檢查stream_stop()的返回 pgsocket stop_socket; /* if valid, watch for input on this socket * and check stream_stop() when there is any */ //如何寫WAL WalWriteMethod *walmethod; /* How to write the WAL */ //附加到部分接受文件的后綴 char *partial_suffix; /* Suffix appended to partially received files */ //使用的replication slot,如無則為NULL char *replication_slot; /* Replication slot to use, or NULL */ } StreamCtl;
StartLogStreamer
StartLogStreamer用于在備份時初始化后臺進程用于接收WAL.接收進程將創建自己的數據庫連接以并行的方式對文件進行streaming復制.
/* * Initiate background process for receiving xlog during the backup. * The background stream will use its own database connection so we can * stream the logfile in parallel with the backups. * 在備份時初始化后臺進程用于接收WAL. * 后臺stream進程將用自己的數據庫連接以使以并行的方式stream文件. */ static void StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) { //參數 logstreamer_param *param; uint32 hi, lo;//高位/低位 char statusdir[MAXPGPATH]; param = pg_malloc0(sizeof(logstreamer_param)); param->timeline = timeline; param->sysidentifier = sysidentifier; /* Convert the starting position */ //轉換開始位置(高低位轉換) if (sscanf(startpos, "%X/%X", &hi, &lo) != 2) { fprintf(stderr, _("%s: could not parse write-ahead log location \"%s\"\n"), progname, startpos); exit(1); } //開始位置,轉換為64bit的地址 param->startptr = ((uint64) hi) << 32 | lo; /* Round off to even segment position */ //按segment取整 param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz); #ifndef WIN32 //WIN32使用的代碼 /* Create our background pipe */ if (pipe(bgpipe) < 0) { fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), progname, strerror(errno)); exit(1); } #endif /* Get a second connection */ //獲取第二個連接 param->bgconn = GetConnection(); if (!param->bgconn) /* Error message already written in GetConnection() */ exit(1); /* In post-10 cluster, pg_xlog has been renamed to pg_wal */ //在PG 10,pg_xlog已命名為pg_wal snprintf(param->xlog, sizeof(param->xlog), "%s/%s", basedir, PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? "pg_xlog" : "pg_wal"); /* Temporary replication slots are only supported in 10 and newer */ //臨時復制slots只在PG10+支持 if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) temp_replication_slot = false; /* * Create replication slot if requested * 如要求,則創建復制slot */ //static char *replication_slot = NULL; //static bool temp_replication_slot = true; if (temp_replication_slot && !replication_slot) //創建replication slot replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn)); if (temp_replication_slot || create_slot) { //創建replication slot if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, temp_replication_slot, true, true, false)) exit(1); if (verbose) { //顯示診斷信息 if (temp_replication_slot) fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"), progname, replication_slot); else fprintf(stderr, _("%s: created replication slot \"%s\"\n"), progname, replication_slot); } } if (format == 'p') { /* * Create pg_wal/archive_status or pg_xlog/archive_status (and thus * pg_wal or pg_xlog) depending on the target server so we can write * to basedir/pg_wal or basedir/pg_xlog as the directory entry in the * tar file may arrive later. * 基于目標服務器創建pg_wal/archive_status或pg_xlog/archive_status, * 這樣可以寫入到basedir/pg_wal 貨 basedir/pg_xlog,可作為后續訪問的tar文件目錄條目 */ snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status", basedir, PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? "pg_xlog" : "pg_wal"); if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST) { fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), progname, statusdir, strerror(errno)); exit(1); } } /* * Start a child process and tell it to start streaming. On Unix, this is * a fork(). On Windows, we create a thread. * 啟動子進程開始streaming. * 在UNIX平臺,是一個fork進程,在Windows平臺,創建線程. */ #ifndef WIN32 //UNIX:fork進程 bgchild = fork(); if (bgchild == 0) { //這是子進程,返回0 /* in child process */ //啟動新進程 exit(LogStreamerMain(param)); } else if (bgchild < 0) { fprintf(stderr, _("%s: could not create background process: %s\n"), progname, strerror(errno)); exit(1); } /* * Else we are in the parent process and all is well. * 在父進程中,返回的bgchild是子進程PID. */ atexit(kill_bgchild_atexit); #else /* WIN32 */ //WIN32:創建線程 bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL); if (bgchild == 0) { fprintf(stderr, _("%s: could not create background thread: %s\n"), progname, strerror(errno)); exit(1); } #endif }
LogStreamerMain
WAL流復制主函數,用于fork后的子進程調用
static int LogStreamerMain(logstreamer_param *param) { StreamCtl stream;//接收xlog流數據時的全局參數 in_log_streamer = true; //初始化StreamCtl結構體 MemSet(&stream, 0, sizeof(stream)); stream.startpos = param->startptr; stream.timeline = param->timeline; stream.sysidentifier = param->sysidentifier; stream.stream_stop = reached_end_position; #ifndef WIN32 stream.stop_socket = bgpipe[0]; #else stream.stop_socket = PGINVALID_SOCKET; #endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; stream.do_sync = do_sync; stream.mark_done = true; stream.partial_suffix = NULL; stream.replication_slot = replication_slot; if (format == 'p') stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync); else stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync); //接收數據 if (!ReceiveXlogStream(param->bgconn, &stream)) /* * Any errors will already have been reported in the function process, * but we need to tell the parent that we didn't shutdown in a nice * way. * 在函數執行過程中出現的錯誤已通過警告的方式發出, * 但仍需要告知父進程不能優雅的關閉本進程. */ return 1; if (!stream.walmethod->finish()) { fprintf(stderr, _("%s: could not finish writing WAL files: %s\n"), progname, strerror(errno)); return 1; } //結束連接 PQfinish(param->bgconn); //普通文件格式 if (format == 'p') FreeWalDirectoryMethod(); else FreeWalTarMethod(); //是否內存 pg_free(stream.walmethod); return 0; }
備份命令
pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
啟動gdb跟蹤
[xdb@localhost ~]$ gdb pg_basebackup GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7 Copyright (C) 2013 Free Software Foundation, Inc. License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> This is free software: you are free to change and redistribute it. There is NO WARRANTY, to the extent permitted by law. Type "show copying" and "show warranty" for details. This GDB was configured as "x86_64-redhat-linux-gnu". For bug reporting instructions, please see: <http://www.gnu.org/software/gdb/bugs/>... Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done. (gdb) b StartLogStreamer Breakpoint 1 at 0x403e6b: file pg_basebackup.c, line 555. (gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v (gdb) r Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib64/libthread_db.so.1". Password: pg_basebackup: initiating base backup, waiting for checkpoint to complete pg_basebackup: checkpoint completed pg_basebackup: write-ahead log start point: 0/57000060 on timeline 16 pg_basebackup: starting background WAL receiver Breakpoint 1, StartLogStreamer (startpos=0x7fffffffdf60 "0/57000060", timeline=16, sysidentifier=0x61f1a0 "6666964067616600474") at pg_basebackup.c:555 555 param = pg_malloc0(sizeof(logstreamer_param)); (gdb)
輸入參數
startpos=0x7fffffffdf60 “0/57000060”,
timeline=16,
sysidentifier=0x61f1a0 “6666964067616600474”
構造參數
(gdb) n 556 param->timeline = timeline; (gdb) 557 param->sysidentifier = sysidentifier; (gdb) 560 if (sscanf(startpos, "%X/%X", &hi, &lo) != 2) (gdb) 567 param->startptr = ((uint64) hi) << 32 | lo; (gdb) p hi $1 = 0 (gdb) p lo $2 = 1459617888 (gdb) n 569 param->startptr -= XLogSegmentOffset(param->startptr, WalSegSz); (gdb) n 573 if (pipe(bgpipe) < 0) (gdb) p *param $3 = {bgconn = 0x0, startptr = 1459617792, xlog = '\000' <repeats 1023 times>, sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16} (gdb)
建立連接,創建replication slot
(gdb) n 583 param->bgconn = GetConnection(); (gdb) 584 if (!param->bgconn) (gdb) 591 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? (gdb) 589 snprintf(param->xlog, sizeof(param->xlog), "%s/%s", (gdb) 595 if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) (gdb) 601 if (temp_replication_slot && !replication_slot) (gdb) 602 replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn)); (gdb) 603 if (temp_replication_slot || create_slot) (gdb) 605 if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, (gdb) 609 if (verbose) (gdb) 611 if (temp_replication_slot) (gdb) 612 fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"), (gdb) pg_basebackup: created temporary replication slot "pg_basebackup_59378" 620 if (format == 'p') (gdb) (gdb) n 630 PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? (gdb) 628 snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
創建備份目錄
(gdb) 633 if (pg_mkdir_p(statusdir, pg_dir_create_mode) != 0 && errno != EEXIST) (gdb) p *param $4 = {bgconn = 0x62a280, startptr = 1459617792, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16} (gdb) n 647 bgchild = fork(); (gdb) ############# [xdb@localhost backup]$ ls pg_wal
fork進程,父進程返回子進程的PID
(gdb) n 647 bgchild = fork(); (gdb) n Detaching after fork from child process 43001. 648 if (bgchild == 0) (gdb) p bgchild $5 = 43001 (gdb)
子進程(PID=43001)
[xdb@localhost backup]$ ps -ef|grep 43001 xdb 43001 42820 1 11:54 pts/1 00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v [xdb@localhost backup]$ ps -ef|grep 192.168.26.25 xdb 42820 42756 0 11:48 pts/1 00:00:00 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v xdb 43001 42820 0 11:54 pts/1 00:00:01 /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
完成調用
(gdb) n 653 else if (bgchild < 0) (gdb) 672 } (gdb) BaseBackup () at pg_basebackup.c:1937 1937 for (i = 0; i < PQntuples(res); i++) (gdb)
pg_wal目錄中的數據
[xdb@localhost backup]$ ls -l ./pg_wal/ total 16388 -rw-------. 1 xdb xdb 16777216 Mar 18 11:54 000000100000000000000057 -rw-------. 1 xdb xdb 217 Mar 18 11:54 00000010.history drwx------. 2 xdb xdb 35 Mar 18 11:54 archive_status [xdb@localhost backup]$
到此,相信大家對“PostgreSQL中StartLogStreamer分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。