您好,登錄后才能下訂單哦!
本節介紹了PostgreSQL提交事務的具體實現邏輯,主要解析了函數CommitTransaction->RecordTransactionCommit的實現邏輯。
TransactionState
事務狀態結構體
/*
* transaction states - transaction state from server perspective
* 事務狀態枚舉 - 服務器視角的事務狀態
*/
typedef enum TransState
{
TRANS_DEFAULT, /* idle 空閑 */
TRANS_START, /* transaction starting 事務啟動 */
TRANS_INPROGRESS, /* inside a valid transaction 進行中 */
TRANS_COMMIT, /* commit in progress 提交中 */
TRANS_ABORT, /* abort in progress 回滾中 */
TRANS_PREPARE /* prepare in progress 準備中 */
} TransState;
/*
* transaction block states - transaction state of client queries
* 事務塊狀態 - 客戶端查詢的事務狀態
*
* Note: the subtransaction states are used only for non-topmost
* transactions; the others appear only in the topmost transaction.
* 注意:subtransaction只用于非頂層事務;其他字段用于頂層事務.
*/
typedef enum TBlockState
{
/* not-in-transaction-block states 未進入事務塊狀態 */
TBLOCK_DEFAULT, /* idle 空閑 */
TBLOCK_STARTED, /* running single-query transaction 單個查詢事務 */
/* transaction block states 事務塊狀態 */
TBLOCK_BEGIN, /* starting transaction block 開始事務塊 */
TBLOCK_INPROGRESS, /* live transaction 進行中 */
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隱式事務,進行中 */
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事務,進行中 */
TBLOCK_END, /* COMMIT received 接收到COMMIT */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK 失敗,等待ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received 失敗,已接收ROLLBACK */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received 進行中,接收到ROLLBACK */
TBLOCK_PREPARE, /* live xact, PREPARE received 進行中,接收到PREPARE */
/* subtransaction states 子事務狀態 */
TBLOCK_SUBBEGIN, /* starting a subtransaction 開啟 */
TBLOCK_SUBINPROGRESS, /* live subtransaction 進行中 */
TBLOCK_SUBRELEASE, /* RELEASE received 接收到RELEASE */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS 進行中,接收到COMMIT */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK 失敗,等待ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received 失敗,已接收ROLLBACK */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received 進行中,接收到ROLLBACK */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received 進行中,接收到ROLLBACK TO */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received 失敗,已接收ROLLBACK TO */
} TBlockState;
/*
* transaction state structure
* 事務狀態結構體
*/
typedef struct TransactionStateData
{
//事務ID
TransactionId transactionId; /* my XID, or Invalid if none */
//子事務ID
SubTransactionId subTransactionId; /* my subxact ID */
//保存點名稱
char *name; /* savepoint name, if any */
//保存點級別
int savepointLevel; /* savepoint level */
//低級別的事務狀態
TransState state; /* low-level state */
//高級別的事務狀態
TBlockState blockState; /* high-level state */
//事務嵌套深度
int nestingLevel; /* transaction nesting depth */
//GUC上下文嵌套深度
int gucNestLevel; /* GUC context nesting depth */
//事務生命周期上下文
MemoryContext curTransactionContext; /* my xact-lifetime context */
//查詢資源
ResourceOwner curTransactionOwner; /* my query resources */
//按XID順序保存的已提交的子事務ID
TransactionId *childXids; /* subcommitted child XIDs, in XID order */
//childXids數組大小
int nChildXids; /* # of subcommitted child XIDs */
//分配的childXids數組空間
int maxChildXids; /* allocated size of childXids[] */
//上一個CurrentUserId
Oid prevUser; /* previous CurrentUserId setting */
//上一個SecurityRestrictionContext
int prevSecContext; /* previous SecurityRestrictionContext */
//上一事務是否只讀?
bool prevXactReadOnly; /* entry-time xact r/o state */
//是否處于Recovery?
bool startedInRecovery; /* did we start in recovery? */
//XID是否已保存在WAL Record中?
bool didLogXid; /* has xid been included in WAL record? */
//Enter/ExitParallelMode計數器
int parallelModeLevel; /* Enter/ExitParallelMode counter */
//父事務狀態
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
//結構體指針
typedef TransactionStateData *TransactionState;
RecordTransactionCommit函數,在WAL Record中記錄COMMIT Record,返回最新的XID,如果xact沒有XID,則返回InvalidTransactionId。.
/*
* RecordTransactionCommit
*
* Returns latest XID among xact and its children, or InvalidTransactionId
* if the xact has no XID. (We compute that here just because it's easier.)
* 返回最新的XID,如果xact沒有XID,則返回InvalidTransactionId。
* (我們在這里計算是因為它更簡單。)
*
* If you change this function, see RecordTransactionCommitPrepared also.
*/
static TransactionId
RecordTransactionCommit(void)
{
TransactionId xid = GetTopTransactionIdIfAny();//獲取XID
bool markXidCommitted = TransactionIdIsValid(xid);//標記
TransactionId latestXid = InvalidTransactionId;//最后的XID
int nrels;
RelFileNode *rels;
int nchildren;
TransactionId *children;
int nmsgs = 0;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval = false;
bool wrote_xlog;
/* Get data needed for commit record */
//為WAL Record的commit record準備數據.
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
if (XLogStandbyInfoActive())
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
wrote_xlog = (XactLastRecEnd != 0);
/*
* If we haven't been assigned an XID yet, we neither can, nor do we want
* to write a COMMIT record.
* 如果仍未分配XID,我們既不能也不想寫COMMIT WAL Record。
*/
if (!markXidCommitted)
{
/*
* We expect that every smgrscheduleunlink is followed by a catalog
* update, and hence XID assignment, so we shouldn't get here with any
* pending deletes. Use a real test not just an Assert to check this,
* since it's a bit fragile.
* 我們希望每個smgrscheduleunlink之后都有一個目錄更新,
* 因此進行XID分配,所以我們不應該在這里進行任何刪除。
* 使用真正的測試,而不僅僅是一個斷言來檢查它,因為它有點脆弱。
*/
if (nrels != 0)
elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
/* Can't have child XIDs either; AssignTransactionId enforces this */
//沒有child XIDs,AssignTransactionId會強制實現此邏輯.
Assert(nchildren == 0);
/*
* Transactions without an assigned xid can contain invalidation
* messages (e.g. explicit relcache invalidations or catcache
* invalidations for inplace updates); standbys need to process those.
* We can't emit a commit record without an xid, and we don't want to
* force assigning an xid, because that'd be problematic for e.g.
* vacuum. Hence we emit a bespoke record for the invalidations. We
* don't want to use that in case a commit record is emitted, so they
* happen synchronously with commits (besides not wanting to emit more
* WAL records).
* 沒有指定xid的事務可以包含失效消息
* (例如顯式relcache失效消息或catcache失效消息,用于就地更新);備機需要處理這些消息.
* 我們不能在沒有xid的情況下發出COMMIT WAL Record,
* 而且我們也不想強制分配xid,因為這對于vacuum來說是有問題的。
* 因此,我們發布一個定制的記錄。
* 我們不希望在發出COMMIT WAL Record時使用它,
* 因此它們與提交同步發生(除了不希望發出更多WAL記錄之外)。
*/
if (nmsgs != 0)
{
LogStandbyInvalidations(nmsgs, invalMessages,
RelcacheInitFileInval);
wrote_xlog = true; /* not strictly necessary */
}
/*
* If we didn't create XLOG entries, we're done here; otherwise we
* should trigger flushing those entries the same as a commit record
* would. This will primarily happen for HOT pruning and the like; we
* want these to be flushed to disk in due time.
* 如果我們沒有創建XLOG條目,我們已完成所有工作;
* 否則,我們應該像提交記錄那樣觸發刷新這些條目。
* 這主要發生在HOT pruning等;我們希望在適當的時候將它們刷新到磁盤。
*/
if (!wrote_xlog)
goto cleanup;
}
else
{
bool replorigin;
/*
* Are we using the replication origins feature? Or, in other words,
* are we replaying remote actions?
* 我們正在使用復制源特性嗎?或者,換句話說,我們正在回放遠程操作嗎?
*/
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
/*
* Begin commit critical section and insert the commit XLOG record.
* 開始進入提交關鍵部分并插入commit XLOG記錄。
*/
/* Tell bufmgr and smgr to prepare for commit */
//通知bufmgr和smgr準備提交
BufmgrCommit();
/*
* Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated
* pg_xact. Without this, it is possible for the checkpoint to set
* REDO after the XLOG record but fail to flush the pg_xact update to
* disk, leading to loss of the transaction commit if the system
* crashes a little later.
* 將自己標記為“提交關鍵部分”。
* 這將強制并發檢查點等待,直到我們更新了pg_xact。
* 如果不這樣做,檢查點可以在XLOG記錄之后設置REDO,
* 但是無法將pg_xact更新刷新到磁盤,如果稍后系統崩潰,就會丟失事務提交。
*
* Note: we could, but don't bother to, set this flag in
* RecordTransactionAbort. That's because loss of a transaction abort
* is noncritical; the presumption would be that it aborted, anyway.
* 注意:我們可以在RecordTransactionAbort中設置此標志,但不必費心。
* 這是因為事務中止的損失是無關緊要的;無論如何,假設它會回滾。
*
* It's safe to change the delayChkpt flag of our own backend without
* holding the ProcArrayLock, since we're the only one modifying it.
* This makes checkpoint's determination of which xacts are delayChkpt
* a bit fuzzy, but it doesn't matter.
* 在不保存ProcArrayLock的情況下更改自己的后端delayChkpt標志是安全的,因為只有我們在修改它。
* 這使得檢查點對哪些xacts是delayChkpt的判斷有點模糊,但這無關緊要。
*/
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
SetCurrentTransactionStopTimestamp();
XactLogCommitRecord(xactStopTimestamp,
nchildren, children, nrels, rels,
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
//為該復制源向前移動LSNs
replorigin_session_advance(replorigin_session_origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit
* timestamp if there's no replication origin; otherwise, the
* timestamp was already set in replorigin_session_origin_timestamp by
* replication.
* 記錄提交時間戳。
* 如果沒有復制源,則該值來自普通的提交時間戳;
* 否則,通過復制已經在replorigin_session_origin_timestamp中設置了時間戳。
*
* We don't need to WAL-log anything here, as the commit record
* written above already contains the data.
* 我們不需要WAL-log在這里記錄任何東西,因為上面寫的提交記錄已經包含了數據。
*/
if (!replorigin || replorigin_session_origin_timestamp == 0)
replorigin_session_origin_timestamp = xactStopTimestamp;
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin, false);
}
/*
* Check if we want to commit asynchronously. We can allow the XLOG flush
* to happen asynchronously if synchronous_commit=off, or if the current
* transaction has not performed any WAL-logged operation or didn't assign
* an xid. The transaction can end up not writing any WAL, even if it has
* an xid, if it only wrote to temporary and/or unlogged tables. It can
* end up having written WAL without an xid if it did HOT pruning. In
* case of a crash, the loss of such a transaction will be irrelevant;
* temp tables will be lost anyway, unlogged tables will be truncated and
* HOT pruning will be done again later. (Given the foregoing, you might
* think that it would be unnecessary to emit the XLOG record at all in
* this case, but we don't currently try to do that. It would certainly
* cause problems at least in Hot Standby mode, where the
* KnownAssignedXids machinery requires tracking every XID assignment. It
* might be OK to skip it only when wal_level < replica, but for now we
* don't.)
* 檢查是否希望執行異步提交.
* 如synchronous_commit=off,可以允許異步執行XLOG刷新,或者如果當前事務沒有執行
* WAL-logged操作或者不能分配XID.
* 如果事務只寫入臨時和/或unlogged的表,那么即使它有一個xid,它也不會寫入任何WAL。
* 如果事務執行HOT pruning,那么可以在沒有XID的情況下寫入WAL.
* 在crash的情況下,此類事務引起的問題將無關緊要;臨時表可以隨時廢棄,unlogged表將被階段,
* 而HOT pruning在稍后將被再次執行.
* (鑒于上述情況,您可能認為在本例中根本沒有必要發出XLOG記錄,但我們目前并不嘗試這樣做。
* 至少在熱備份模式下,它肯定會導致問題,因為在這種模式下,KnownAssignedXids機器需要跟蹤每個XID分配。
* 可能只在wal_level < replica時跳過它是可以的,但是現在我們不這樣做。)
*
* However, if we're doing cleanup of any non-temp rels or committing any
* command that wanted to force sync commit, then we must flush XLOG
* immediately. (We must not allow asynchronous commit if there are any
* non-temp tables to be deleted, because we might delete the files before
* the COMMIT record is flushed to disk. We do allow asynchronous commit
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
* 但是,如果我們正在清理任何非臨時的臨時記錄或提交想要強制同步提交的命令,那么我們必須立即刷新XLOG。
* (如存在非臨時表的刪除操作,則不允許異步提交,因為我們可能在COMMIT 記錄刷到磁盤前已刪除了文件.
* 但如果將被刪除的是臨時表,我們確實可以允許異步提交,因為臨時表在crash也會丟棄)
*/
if ((wrote_xlog && markXidCommitted &&
synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
forceSyncCommit || nrels > 0)
{
XLogFlush(XactLastRecEnd);
/*
* Now we may update the CLOG, if we wrote a COMMIT record above
* 現在我們更新CLOG,如果我們在上面已寫入了COMMIT WAL Record.
*/
if (markXidCommitted)
TransactionIdCommitTree(xid, nchildren, children);
}
else
{
//異步提交
/*
* Asynchronous commit case:
* 異步提交:
*
* This enables possible committed transaction loss in the case of a
* postmaster crash because WAL buffers are left unwritten. Ideally we
* could issue the WAL write without the fsync, but some
* wal_sync_methods do not allow separate write/fsync.
* 這可能會導致在postmaster崩潰的情況下出現提交的事務丟失,
* 因為WAL buffer是未持久化的。
* 理想情況下,我們可以在沒有fsync的情況下發出WAL write,
* 但是一些wal_sync_methods不允許單獨的write/fsync。
*
* Report the latest async commit LSN, so that the WAL writer knows to
* flush this commit.
* 反饋最后的異步提交LSN,通知WAL寫入器刷新此commit
*/
XLogSetAsyncXactLSN(XactLastRecEnd);
/*
* We must not immediately update the CLOG, since we didn't flush the
* XLOG. Instead, we store the LSN up to which the XLOG must be
* flushed before the CLOG may be updated.
* 我們不能馬上更新CLOG,因為我們還沒有刷新XLOG.
* 相反的,我們存儲LSN直至在CLOG可能已更新前XLOG必須需要刷新的時候.
*/
if (markXidCommitted)
TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
}
/*
* If we entered a commit critical section, leave it now, and let
* checkpoints proceed.
* 如果已進入commit關鍵區域,已完成工作,可以離開了,讓checkpoints執行相關操作.
*/
if (markXidCommitted)
{
MyPgXact->delayChkpt = false;
END_CRIT_SECTION();
}
/* Compute latestXid while we have the child XIDs handy */
//如持有子XIDs,計算最后的latestXid
latestXid = TransactionIdLatest(xid, nchildren, children);
/*
* Wait for synchronous replication, if required. Similar to the decision
* above about using committing asynchronously we only want to wait if
* this backend assigned an xid and wrote WAL. No need to wait if an xid
* was assigned due to temporary/unlogged tables or due to HOT pruning.
* 如需要,等待同步復制.
* 與上述使用異步提交的決定類似,我們只想在該進程已分配和寫入WAL的情況才等待.
* 臨時/unlogged表或者HOT pruning,不需要等待事務ID是否已分配.
*
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
* 注意在這個場景下,我們必須標記clog,但在procarray中仍顯示為running,并一直持有鎖.
*/
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */
//記錄最后commit記錄的位置
XactLastCommitEnd = XactLastRecEnd;
/* Reset XactLastRecEnd until the next transaction writes something */
//重置XactLastRecEnd直至下個事務寫入數據.
XactLastRecEnd = 0;
cleanup:
/* Clean up local data */
//清除本地數據
if (rels)
pfree(rels);
//返回XID
return latestXid;
}
插入數據,執行commit
10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;
啟動gdb,設置斷點
(gdb) b RecordTransactionCommit
Breakpoint 2 at 0x547528: file xact.c, line 1141.
(gdb) c
Continuing.
Breakpoint 2, RecordTransactionCommit () at xact.c:1141
1141 TransactionId xid = GetTopTransactionIdIfAny();
(gdb)
查看調用棧
(gdb) bt
#0 RecordTransactionCommit () at xact.c:1141
#1 0x00000000005483f2 in CommitTransaction () at xact.c:2070
#2 0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#3 0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#4 0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#5 0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
at postgres.c:4182
#6 0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#7 0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#8 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#9 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#10 0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)
獲取事務ID
(gdb) p xid
$3 = 2411
(gdb)
設置其他變量,markXidCommitted —> True
(gdb) n
1143 TransactionId latestXid = InvalidTransactionId;
(gdb)
1148 int nmsgs = 0;
(gdb)
1149 SharedInvalidationMessage *invalMessages = NULL;
(gdb)
1150 bool RelcacheInitFileInval = false;
(gdb)
1154 nrels = smgrGetPendingDeletes(true, &rels);
(gdb)
1155 nchildren = xactGetCommittedChildren(&children);
(gdb)
1156 if (XLogStandbyInfoActive())
(gdb)
1159 wrote_xlog = (XactLastRecEnd != 0);
(gdb)
1165 if (!markXidCommitted)
(gdb) p latestXid
$4 = 0
(gdb) p markXidCommitted
$5 = true
(gdb) p nrels
$6 = 0
(gdb) p nchildren
$7 = 0
(gdb) p wrote_xlog
$8 = true
(gdb)
markXidCommitted為T,進入相應的處理邏輯.
開始進入提交關鍵部分并插入commit XLOG記錄。
(gdb) n
1214 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
(gdb)
1221 BufmgrCommit();
(gdb) p replorigin
$9 = false
(gdb)
進入提交部分,設置當前事務時間戳
(gdb) n
1240 START_CRIT_SECTION();
(gdb)
1241 MyPgXact->delayChkpt = true;
(gdb)
1243 SetCurrentTransactionStopTimestamp();
(gdb) p *MyPgXact
$10 = {xid = 2411, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = true, nxids = 0 '\000'}
(gdb)
插入XLOG
(gdb) n
1245 XactLogCommitRecord(xactStopTimestamp,
(gdb)
1252 if (replorigin)
(gdb)
設置提交事務數據
(gdb)
1267 if (!replorigin || replorigin_session_origin_timestamp == 0)
(gdb)
1268 replorigin_session_origin_timestamp = xactStopTimestamp;
(gdb)
1270 TransactionTreeSetCommitTsData(xid, nchildren, children,
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
同步刷新XLOG
(gdb)
1301 synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
1304 XLogFlush(XactLastRecEnd);
(gdb)
1309 if (markXidCommitted)
(gdb)
更新CLOG,如果我們在上面已寫入了COMMIT WAL Record.
(gdb)
1310 TransactionIdCommitTree(xid, nchildren, children);
(gdb)
1309 if (markXidCommitted)
(gdb)
退出提交關鍵區域
(gdb)
1340 if (markXidCommitted)
(gdb)
1342 MyPgXact->delayChkpt = false;
(gdb)
1343 END_CRIT_SECTION();
(gdb)
計算最后的latestXid
(gdb)
1347 latestXid = TransactionIdLatest(xid, nchildren, children);
(gdb) n
1358 if (wrote_xlog && markXidCommitted)
(gdb) p latestXid
$11 = 2411
(gdb)
記錄最后commit記錄的位置
(gdb) n
1359 SyncRepWaitForLSN(XactLastRecEnd, true);
(gdb)
1362 XactLastCommitEnd = XactLastRecEnd;
(gdb)
1365 XactLastRecEnd = 0;
(gdb)
1368 if (rels)
(gdb)
1371 return latestXid;
(gdb) p XactLastCommitEnd
$12 = 5522364896
(gdb)
返回,完成調用
(gdb) n
1372 }
(gdb)
CommitTransaction () at xact.c:2087
2087 TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)
DONE!
How Postgres Makes Transactions Atomic
PG Source Code
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。