您好,登錄后才能下訂單哦!
本節介紹了ExecProcNode的其中一個Real函數(ExecHashJoin)。ExecHashJoin函數實現了Hash Join算法。
Plan
所有計劃節點通過將Plan結構作為第一個字段從Plan結構“派生”。這確保了在將節點轉換為計劃節點時,一切都能正常工作。(在執行器中以通用方式傳遞時,節點指針經常被轉換為Plan *)
/* ----------------
* Plan node
*
* All plan nodes "derive" from the Plan structure by having the
* Plan structure as the first field. This ensures that everything works
* when nodes are cast to Plan's. (node pointers are frequently cast to Plan*
* when passed around generically in the executor)
* 所有計劃節點通過將Plan結構作為第一個字段從Plan結構“派生”。
* 這確保了在將節點轉換為計劃節點時,一切都能正常工作。
* (在執行器中以通用方式傳遞時,節點指針經常被轉換為Plan *)
*
* We never actually instantiate any Plan nodes; this is just the common
* abstract superclass for all Plan-type nodes.
* 從未實例化任何Plan節點;這只是所有Plan-type節點的通用抽象超類。
* ----------------
*/
typedef struct Plan
{
NodeTag type;//節點類型
/*
* 成本估算信息;estimated execution costs for plan (see costsize.c for more info)
*/
Cost startup_cost; /* 啟動成本;cost expended before fetching any tuples */
Cost total_cost; /* 總成本;total cost (assuming all tuples fetched) */
/*
* 優化器估算信息;planner's estimate of result size of this plan step
*/
double plan_rows; /* 行數;number of rows plan is expected to emit */
int plan_width; /* 平均行大小(Byte為單位);average row width in bytes */
/*
* 并行執行相關的信息;information needed for parallel query
*/
bool parallel_aware; /* 是否參與并行執行邏輯?engage parallel-aware logic? */
bool parallel_safe; /* 是否并行安全;OK to use as part of parallel plan? */
/*
* Plan類型節點通用的信息.Common structural data for all Plan types.
*/
int plan_node_id; /* unique across entire final plan tree */
List *targetlist; /* target list to be computed at this node */
List *qual; /* implicitly-ANDed qual conditions */
struct Plan *lefttree; /* input plan tree(s) */
struct Plan *righttree;
List *initPlan; /* Init Plan nodes (un-correlated expr
* subselects) */
/*
* Information for management of parameter-change-driven rescanning
* parameter-change-driven重掃描的管理信息.
*
* extParam includes the paramIDs of all external PARAM_EXEC params
* affecting this plan node or its children. setParam params from the
* node's initPlans are not included, but their extParams are.
*
* allParam includes all the extParam paramIDs, plus the IDs of local
* params that affect the node (i.e., the setParams of its initplans).
* These are _all_ the PARAM_EXEC params that affect this node.
*/
Bitmapset *extParam;
Bitmapset *allParam;
} Plan;
JoinState
Hash/NestLoop/Merge Join的基類
/* ----------------
* JoinState information
*
* Superclass for state nodes of join plans.
* Hash/NestLoop/Merge Join的基類
* ----------------
*/
typedef struct JoinState
{
PlanState ps;//基類PlanState
JoinType jointype;//連接類型
//在找到一個匹配inner tuple的時候,如需要跳轉到下一個outer tuple,則該值為T
bool single_match; /* True if we should skip to next outer tuple
* after finding one inner match */
//連接條件表達式(除了ps.qual)
ExprState *joinqual; /* JOIN quals (in addition to ps.qual) */
} JoinState;
HashJoinState
Hash Join運行期狀態結構體
/* these structs are defined in executor/hashjoin.h: */
typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;
typedef struct HashJoinState
{
JoinState js; /* 基類;its first field is NodeTag */
ExprState *hashclauses;//hash連接條件
List *hj_OuterHashKeys; /* 外表條件鏈表;list of ExprState nodes */
List *hj_InnerHashKeys; /* 內表連接條件;list of ExprState nodes */
List *hj_HashOperators; /* 操作符OIDs鏈表;list of operator OIDs */
HashJoinTable hj_HashTable;//Hash表
uint32 hj_CurHashValue;//當前的Hash值
int hj_CurBucketNo;//當前的bucket編號
int hj_CurSkewBucketNo;//行傾斜bucket編號
HashJoinTuple hj_CurTuple;//當前元組
TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
TupleTableSlot *hj_NullOuterTupleSlot;//用于外連接的outer虛擬slot
TupleTableSlot *hj_NullInnerTupleSlot;//用于外連接的inner虛擬slot
TupleTableSlot *hj_FirstOuterTupleSlot;//
int hj_JoinState;//JoinState狀態
bool hj_MatchedOuter;//是否匹配
bool hj_OuterNotEmpty;//outer relation是否為空
} HashJoinState;
ExecHashJoin函數實現了Hash Join算法,實際實現的函數是ExecHashJoinImpl.
ExecHashJoinImpl函數把Hash Join劃分為多個階段/狀態(有限狀態機),保存在HashJoinState->hj_JoinState字段中,這些狀態分別是分別為HJ_BUILD_HASHTABLE/HJ_NEED_NEW_OUTER/HJ_SCAN_BUCKET/HJ_FILL_OUTER_TUPLE/HJ_FILL_INNER_TUPLES/HJ_NEED_NEW_BATCH.
HJ_BUILD_HASHTABLE:創建Hash表;
HJ_NEED_NEW_OUTER:掃描outer relation,計算外表連接鍵的hash值,把相匹配元組放在合適的bucket中;
HJ_SCAN_BUCKET:掃描bucket,匹配的tuple返回
HJ_FILL_OUTER_TUPLE:當前outer relation元組已耗盡,因此檢查是否發出一個虛擬的外連接元組。
HJ_FILL_INNER_TUPLES:已完成一個批處理,但做的是右外連接/完全連接,填充虛擬連接元組
HJ_NEED_NEW_BATCH:開啟下一批次
注意:在work_mem不足以裝下Hash Table時,分批執行.每個批次執行時,會把outer relation與inner relation匹配(指hash值一樣)的tuple會存儲起來,放在合適的批次文件中(hashtable->outerBatchFile[batchno]),以避免多次的outer relation掃描.
#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
/* ----------------------------------------------------------------
* ExecHashJoin
*
* Parallel-oblivious version.
* Parallel-oblivious版本。
* ----------------------------------------------------------------
*/
static TupleTableSlot * /* 返回元組或者NULL;return: a tuple or NULL */
ExecHashJoin(PlanState *pstate)
{
/*
* On sufficiently smart compilers this should be inlined with the
* parallel-aware branches removed.
* 在足夠智能的編譯器上,應該內聯刪除并行感知分支。
*/
return ExecHashJoinImpl(pstate, false);
}
/*
* States of the ExecHashJoin state machine
*/
#define HJ_BUILD_HASHTABLE 1
#define HJ_NEED_NEW_OUTER 2
#define HJ_SCAN_BUCKET 3
#define HJ_FILL_OUTER_TUPLE 4
#define HJ_FILL_INNER_TUPLES 5
#define HJ_NEED_NEW_BATCH 6
/* Returns true if doing null-fill on outer relation */
#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
/* Returns true if doing null-fill on inner relation */
#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
static void ExecParallelHashJoinPartitionOuter(HashJoinState *node);
/* ----------------------------------------------------------------
* ExecHashJoinImpl
*
* This function implements the Hybrid Hashjoin algorithm. It is marked
* with an always-inline attribute so that ExecHashJoin() and
* ExecParallelHashJoin() can inline it. Compilers that respect the
* attribute should create versions specialized for parallel == true and
* parallel == false with unnecessary branches removed.
* 這個函數實現了混合Hash Join算法。
* 它被標記為一個always-inline的屬性(pg_attribute_always_inline),
* 以便ExecHashJoin()和ExecParallelHashJoin()可以內聯它。
* 可以識別該屬性的編譯器應該創建專門針對parallel == true和parallel == false的版本,去掉不必要的分支。
*
* Note: the relation we build hash table on is the "inner"
* the other one is "outer".
* 注意:在inner上創建hash表,另外一個參與連接的成為outer
* ----------------------------------------------------------------
*/
static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
HashState *hashNode;
ExprState *joinqual;
ExprState *otherqual;
ExprContext *econtext;
HashJoinTable hashtable;
TupleTableSlot *outerTupleSlot;
uint32 hashvalue;
int batchno;
ParallelHashJoinState *parallel_state;
/*
* get information from HashJoin node
* 從HashJon Node中獲取信息
*/
joinqual = node->js.joinqual;
otherqual = node->js.ps.qual;
hashNode = (HashState *) innerPlanState(node);
outerNode = outerPlanState(node);
hashtable = node->hj_HashTable;
econtext = node->js.ps.ps_ExprContext;
parallel_state = hashNode->parallel_state;
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle.
* 重置每個元組內存上下文以釋放在前一個元組處理周期中分配的所有表達式計算存儲。
*/
ResetExprContext(econtext);
/*
* run the hash join state machine
* 執行hash join狀態機
*/
for (;;)
{
/*
* It's possible to iterate this loop many times before returning a
* tuple, in some pathological cases such as needing to move much of
* the current batch to a later batch. So let's check for interrupts
* each time through.
* 在返回元組之前,可以多次迭代此循環,在某些"變態"的情況下,
* 例如需要將當前批處理的大部分轉移到下一批處理。
* 所以需要每次檢查中斷。
*/
CHECK_FOR_INTERRUPTS();
switch (node->hj_JoinState)
{
case HJ_BUILD_HASHTABLE://-->HJ_BUILD_HASHTABLE階段
/*
* First time through: build hash table for inner relation.
* 第一次的處理邏輯:為inner relation建立hash表
*/
Assert(hashtable == NULL);
/*
* If the outer relation is completely empty, and it's not
* right/full join, we can quit without building the hash
* table. However, for an inner join it is only a win to
* check this when the outer relation's startup cost is less
* than the projected cost of building the hash table.
* Otherwise it's best to build the hash table first and see
* if the inner relation is empty. (When it's a left join, we
* should always make this check, since we aren't going to be
* able to skip the join on the strength of an empty inner
* relation anyway.)
* 如果外部關系是空的,并且它不是右外/完全連接,可以在不構建哈希表的情況下退出。
* 但是,對于內連接,只有當外部關系的啟動成本小于構建哈希表的預期成本時,才需要檢查這一點。
* 否則,最好先構建哈希表,看看內部關系是否為空。
* (當它是左外連接時,應該始終進行檢查,因為無論如何,都不能基于空的內部關系跳過連接。)
*
* If we are rescanning the join, we make use of information
* gained on the previous scan: don't bother to try the
* prefetch if the previous scan found the outer relation
* nonempty. This is not 100% reliable since with new
* parameters the outer relation might yield different
* results, but it's a good heuristic.
* 如果需要重新掃描連接,將利用上次掃描結果中獲得的信息:
* 如果上次掃描發現外部關系非空,則不必嘗試預取。
* 但這不是100%可靠的,因為有了新的參數,外部關系可能會產生不同的結果,但這是一個很好的啟發式。
*
* The only way to make the check is to try to fetch a tuple
* from the outer plan node. If we succeed, we have to stash
* it away for later consumption by ExecHashJoinOuterGetTuple.
* 進行檢查的唯一方法是從外部plan節點獲取一個元組。
* 如果成功了,就必須通過ExecHashJoinOuterGetTuple將其存儲起來,以便以后使用。
*/
if (HJ_FILL_INNER(node))
{
/* no chance to not build the hash table */
//不構建哈希表是不可能的了
node->hj_FirstOuterTupleSlot = NULL;
}
else if (parallel)
{
/*
* The empty-outer optimization is not implemented for
* shared hash tables, because no one participant can
* determine that there are no outer tuples, and it's not
* yet clear that it's worth the synchronization overhead
* of reaching consensus to figure that out. So we have
* to build the hash table.
* 對于共享哈希表,并沒有實現空外關系優化,因為沒有任何參與者可以確定沒有外部元組,
* 而且還不清楚是否值得為了解決這個問題而進行同步開銷。
* 所以我們要建立哈希表。
*/
node->hj_FirstOuterTupleSlot = NULL;
}
else if (HJ_FILL_OUTER(node) ||
(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
!node->hj_OuterNotEmpty))
{
node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
if (TupIsNull(node->hj_FirstOuterTupleSlot))
{
node->hj_OuterNotEmpty = false;
return NULL;
}
else
node->hj_OuterNotEmpty = true;
}
else
node->hj_FirstOuterTupleSlot = NULL;
/*
* Create the hash table. If using Parallel Hash, then
* whoever gets here first will create the hash table and any
* later arrivals will merely attach to it.
* 創建哈希表。
* 如果使用并行哈希,那么最先到達這里的worker將創建哈希表,之后到達的只會附加到它上面。
*/
hashtable = ExecHashTableCreate(hashNode,
node->hj_HashOperators,
HJ_FILL_INNER(node));
node->hj_HashTable = hashtable;
/*
* Execute the Hash node, to build the hash table. If using
* Parallel Hash, then we'll try to help hashing unless we
* arrived too late.
* 執行哈希節點,以構建哈希表。
* 如果使用并行哈希,那么將嘗試協助哈希運算,除非太晚了。
*/
hashNode->hashtable = hashtable;
(void) MultiExecProcNode((PlanState *) hashNode);
/*
* If the inner relation is completely empty, and we're not
* doing a left outer join, we can quit without scanning the
* outer relation.
* 如果內部關系是空的,并且沒有執行左外連接,可以在不掃描外部關系的情況下退出。
*/
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
return NULL;
/*
* need to remember whether nbatch has increased since we
* began scanning the outer relation
* 需要記住自開始掃描外部關系以來nbatch是否增加了
*/
hashtable->nbatch_outstart = hashtable->nbatch;
/*
* Reset OuterNotEmpty for scan. (It's OK if we fetched a
* tuple above, because ExecHashJoinOuterGetTuple will
* immediately set it again.)
* 掃描前重置OuterNotEmpty。
* (在其上獲取一個tuple是可以的,因為ExecHashJoinOuterGetTuple將立即再次設置它。)
*/
node->hj_OuterNotEmpty = false;//重置OuterNotEmpty為F
if (parallel)
{
//啟用并行
Barrier *build_barrier;
build_barrier = ¶llel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
{
/*
* If multi-batch, we need to hash the outer relation
* up front.
* 如果是多批處理,需要預先散列外部關系。
*/
if (hashtable->nbatch > 1)
ExecParallelHashJoinPartitionOuter(node);
BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
}
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
/* Each backend should now select a batch to work on. */
//每一個后臺worker需選擇批次
hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;//下一循環
}
else
//非并行執行,設置hj_JoinState狀態
node->hj_JoinState = HJ_NEED_NEW_OUTER;
/* FALL THRU */
case HJ_NEED_NEW_OUTER://-->HJ_NEED_NEW_OUTER階段
/*
* We don't have an outer tuple, try to get the next one
* 沒有外部元組,試著獲取下一個
*/
if (parallel)
outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode, node,
&hashvalue);//并行執行
else
outerTupleSlot =
ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);//普通執行
if (TupIsNull(outerTupleSlot))
{
//如outerTupleSlot為NULL
/* end of batch, or maybe whole join */
//完成此批數據處理,或者可能是全連接
if (HJ_FILL_INNER(node))//hj_NullOuterTupleSlot != NULL
{
/* set up to scan for unmatched inner tuples */
//不匹配的行,填充NULL(外連接)
ExecPrepHashTableForUnmatched(node);
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;//需要下一個批次
continue;
}
//設置變量
econtext->ecxt_outertuple = outerTupleSlot;
node->hj_MatchedOuter = false;
/*
* Find the corresponding bucket for this tuple in the main
* hash table or skew hash table.
* 在主哈希表或斜哈希表中為這個元組找到對應的bucket。
*/
node->hj_CurHashValue = hashvalue;
//獲取Hash Bucket并處理此批次
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&node->hj_CurBucketNo, &batchno);
//Hash傾斜優化(某個值的數據特別多)
node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
hashvalue);
node->hj_CurTuple = NULL;
/*
* The tuple might not belong to the current batch (where
* "current batch" includes the skew buckets if any).
* 元組可能不屬于當前批處理(其中“當前批處理”包括傾斜桶-如果有的話)。
*/
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
* 需要將這個外部元組推遲到稍后的批處理。保存在相應的外部批處理文件中。
* 也就是說,INNER和OUTER屬于此批次的數據都可能存儲在外存中
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
&hashtable->outerBatchFile[batchno]);
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
//循環,保持HJ_NEED_NEW_OUTER狀態
continue;
}
/* OK, let's scan the bucket for matches */
//已完成此階段,切換至HJ_SCAN_BUCKET狀態
node->hj_JoinState = HJ_SCAN_BUCKET;
/* FALL THRU */
case HJ_SCAN_BUCKET://-->HJ_SCAN_BUCKET階段
/*
* Scan the selected hash bucket for matches to current outer
* 掃描選定的散列桶,查找與當前外部匹配的散列桶
*/
if (parallel)
{
//并行處理
if (!ExecParallelScanHashBucket(node, econtext))
{
/* out of matches; check for possible outer-join fill */
// 無法匹配,檢查可能的外連接填充,狀態切換為HJ_FILL_OUTER_TUPLE
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
continue;
}
}
else
{
//非并行執行
if (!ExecScanHashBucket(node, econtext))
{
/* out of matches; check for possible outer-join fill */
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;//同上
continue;
}
}
/*
* We've got a match, but still need to test non-hashed quals.
* ExecScanHashBucket already set up all the state needed to
* call ExecQual.
* 發現一個匹配,但仍然需要測試非散列的quals。
* ExecScanHashBucket已經設置了調用ExecQual所需的所有狀態。
*
* If we pass the qual, then save state for next call and have
* ExecProject form the projection, store it in the tuple
* table, and return the slot.
* 如果我們傳遞了qual,那么將狀態保存為下一次調用,
* 并讓ExecProject形成投影,將其存儲在tuple table中,并返回slot。
*
* Only the joinquals determine tuple match status, but all
* quals must pass to actually return the tuple.
* 只有連接條件joinquals確定元組匹配狀態,但所有條件quals必須通過才能返回元組。
*/
if (joinqual == NULL || ExecQual(joinqual, econtext))
{
node->hj_MatchedOuter = true;
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
/* In an antijoin, we never return a matched tuple */
//反連接,則不能返回匹配的元組
if (node->js.jointype == JOIN_ANTI)
{
node->hj_JoinState = HJ_NEED_NEW_OUTER;
continue;
}
/*
* If we only need to join to the first matching inner
* tuple, then consider returning this one, but after that
* continue with next outer tuple.
* 如果只需要連接到第一個匹配的內表元組,那么可以考慮返回這個元組,
* 但是在此之后可以繼續使用下一個外表元組。
*/
if (node->js.single_match)
node->hj_JoinState = HJ_NEED_NEW_OUTER;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);//執行投影操作
else
InstrCountFiltered2(node, 1);//其他條件不匹配
}
else
InstrCountFiltered1(node, 1);//連接條件不匹配
break;
case HJ_FILL_OUTER_TUPLE://-->HJ_FILL_OUTER_TUPLE階段
/*
* The current outer tuple has run out of matches, so check
* whether to emit a dummy outer-join tuple. Whether we emit
* one or not, the next state is NEED_NEW_OUTER.
* 當前外部元組已耗盡匹配項,因此檢查是否發出一個虛擬的外連接元組。
* 不管是否發出一個,下一個狀態是NEED_NEW_OUTER。
*/
node->hj_JoinState = HJ_NEED_NEW_OUTER;//切換狀態為HJ_NEED_NEW_OUTER
if (!node->hj_MatchedOuter &&
HJ_FILL_OUTER(node))
{
/*
* Generate a fake join tuple with nulls for the inner
* tuple, and return it if it passes the non-join quals.
* 為內部元組生成一個帶有null的假連接元組,并在滿足非連接條件quals時返回它。
*/
econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);//投影操作
else
InstrCountFiltered2(node, 1);
}
break;
case HJ_FILL_INNER_TUPLES://-->HJ_FILL_INNER_TUPLES階段
/*
* We have finished a batch, but we are doing right/full join,
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
* 已經完成了一個批處理,但是做的是右外/完全連接,
所以必須在繼續下一個批處理之前發出散列表中任何不匹配的內部元組。
*/
if (!ExecScanHashTableForUnmatched(node, econtext))
{
/* no more unmatched tuples */
//不存在更多不匹配的元組,切換狀態為HJ_NEED_NEW_BATCH(開始下一批次)
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
/*
* Generate a fake join tuple with nulls for the outer tuple,
* and return it if it passes the non-join quals.
* 為外表元組生成一個帶有null的假連接元組,并在滿足非連接條件quals時返回它。
*/
econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);
else
InstrCountFiltered2(node, 1);
break;
case HJ_NEED_NEW_BATCH://-->HJ_NEED_NEW_BATCH階段
/*
* Try to advance to next batch. Done if there are no more.
* 盡量提前到下一批。如果沒有了,就結束。
*/
if (parallel)
{
//并行處理
if (!ExecParallelHashJoinNewBatch(node))
return NULL; /* end of parallel-aware join */
}
else
{
//非并行處理
if (!ExecHashJoinNewBatch(node))
return NULL; /* end of parallel-oblivious join */
}
node->hj_JoinState = HJ_NEED_NEW_OUTER;//切換狀態
break;
default://非法的JoinState
elog(ERROR, "unrecognized hashjoin state: %d",
(int) node->hj_JoinState);
}
}
}
測試腳本如下
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Sort (cost=14828.83..15078.46 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Sort Key: dw.dwbh
-> Hash Join (cost=3176.00..6537.55 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
-> Hash Join (cost=289.00..2277.61 rows=99850 width=32)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
Inner Unique: true
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Seq Scan on public.t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
-> Hash (cost=164.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Seq Scan on public.t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Hash (cost=1637.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
-> Seq Scan on public.t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
(20 rows)
啟動gdb,設置斷點,進入ExecHashJoin
(gdb) b ExecHashJoin
Breakpoint 1 at 0x70292e: file nodeHashjoin.c, line 565.
(gdb) c
Continuing.
Breakpoint 1, ExecHashJoin (pstate=0x2ee1a88) at nodeHashjoin.c:565
565 return ExecHashJoinImpl(pstate, false);
繼續執行,進入第2個Hash Join,即t_grxx & t_dwxx的連接
(gdb) n
Breakpoint 1, ExecHashJoin (pstate=0x2ee1d98) at nodeHashjoin.c:565
565 return ExecHashJoinImpl(pstate, false);
查看輸入參數,ExecProcNode=ExecProcNodeReal=ExecHashJoin
(gdb) p *pstate
$8 = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0}
(gdb)
pstate的lefttree對應的是SeqScan,righttree對應的是Hash,即左樹(outer relation)為t_grxx的順序掃描運算生成的relation,右樹(inner relation)為t_dwxx的順序掃描運算生成的relation(在此relation上創建Hash Table)
(gdb) p *pstate->lefttree
$6 = {type = T_SeqScanState, plan = 0x2fa8ff0, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x71578d <ExecSeqScan>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee27d8, ps_ExprContext = 0x2ee2188, ps_ProjInfo = 0x0, scandesc = 0x7f0710d02bd0}
(gdb) p *pstate->righttree
$9 = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}
進入ExecHashJoinImpl函數
(gdb) step
ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:167
167 HashJoinState *node = castNode(HashJoinState, pstate);
賦值,查看HashJoinState等變量值
(gdb) n
182 joinqual = node->js.joinqual;
(gdb) n
183 otherqual = node->js.ps.qual;
(gdb)
184 hashNode = (HashState *) innerPlanState(node);
(gdb)
185 outerNode = outerPlanState(node);
(gdb)
186 hashtable = node->hj_HashTable;
(gdb)
187 econtext = node->js.ps.ps_ExprContext;
(gdb)
188 parallel_state = hashNode->parallel_state;
(gdb)
194 ResetExprContext(econtext);
(gdb) p *node
$10 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0},
jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230,
hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x0, hj_CurHashValue = 0, hj_CurBucketNo = 0,
hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0, hj_HashTupleSlot = 0x2ee3278,
hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0, hj_JoinState = 1,
hj_MatchedOuter = false, hj_OuterNotEmpty = false}
(gdb) p *otherqual
Cannot access memory at address 0x0
(gdb) p *hashNode
$11 = {ps = {type = T_HashState, plan = 0x2faaf60, state = 0x2ee1758, ExecProcNode = 0x6e4bde <ExecProcNodeFirst>,
ExecProcNodeReal = 0x6fc015 <ExecHash>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2af0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2ee3278, ps_ExprContext = 0x2ee2a30, ps_ProjInfo = 0x0, scandesc = 0x0}, hashtable = 0x0,
hashkeys = 0x2f22740, shared_info = 0x0, hinstrument = 0x0, parallel_state = 0x0}
(gdb) p *hashtable
Cannot access memory at address 0x0
(gdb) p parallel_state
$12 = (ParallelHashJoinState *) 0x0
(gdb)
進入HJ_BUILD_HASHTABLE處理邏輯,創建Hash表
(gdb) p node->hj_JoinState
$13 = 1
HJ_BUILD_HASHTABLE->執行相關判斷,本例為內連接,因此不存在FILL_OUTER等情況
(gdb) n
216 Assert(hashtable == NULL);
(gdb)
241 if (HJ_FILL_INNER(node))
(gdb)
246 else if (parallel)
(gdb)
258 else if (HJ_FILL_OUTER(node) ||
(gdb)
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb)
HJ_BUILD_HASHTABLE->outer node的啟動成本低于創建Hash表的總成本而且outer relation為空(初始化node->hj_OuterNotEmpty為false),那么嘗試獲取outer relation的第一個元組,如為NULL,則可快速返回NULL,否則設置node->hj_OuterNotEmpty標記為T
258 else if (HJ_FILL_OUTER(node) ||
(gdb)
260 !node->hj_OuterNotEmpty))
(gdb)
259 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
(gdb)
262 node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
(gdb)
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb)
269 node->hj_OuterNotEmpty = true;
HJ_BUILD_HASHTABLE->創建Hash Table
(gdb) n
263 if (TupIsNull(node->hj_FirstOuterTupleSlot))
(gdb)
281 HJ_FILL_INNER(node));
(gdb)
279 hashtable = ExecHashTableCreate(hashNode,
(gdb)
HJ_BUILD_HASHTABLE->Hash Table(HashJoinTable結構體)的內存結構
bucket數量為16384(16K),取對數結果為14(即log2_nbuckets/log2_nbuckets_optimal的結果值)
skewEnabled為F,沒有啟用傾斜優化
(gdb) p *hashtable
$14 = {nbuckets = 16384, log2_nbuckets = 14, nbuckets_original = 16384, nbuckets_optimal = 16384,
log2_nbuckets_optimal = 14, buckets = {unshared = 0x2fb1260, shared = 0x2fb1260}, keepNulls = false, skewEnabled = false,
skewBucket = 0x0, skewBucketLen = 0, nSkewBuckets = 0, skewBucketNums = 0x0, nbatch = 1, curbatch = 0,
nbatch_original = 1, nbatch_outstart = 1, growEnabled = true, totalTuples = 0, partialTuples = 0, skewTuples = 0,
innerBatchFile = 0x0, outerBatchFile = 0x0, outer_hashfunctions = 0x3053b68, inner_hashfunctions = 0x3053bc0,
hashStrict = 0x3053c18, spaceUsed = 0, spaceAllowed = 16777216, spacePeak = 0, spaceUsedSkew = 0,
spaceAllowedSkew = 335544, hashCxt = 0x3053a50, batchCxt = 0x2f8b170, chunks = 0x0, current_chunk = 0x0, area = 0x0,
parallel_state = 0x0, batches = 0x0, current_chunk_shared = 9187201950435737471}
HJ_BUILD_HASHTABLE->使用的Hash函數
(gdb) p *hashtable->inner_hashfunctions
$15 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',
fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}
(gdb) p *hashtable->outer_hashfunctions
$16 = {fn_addr = 0x4c8a0a <hashtext>, fn_oid = 400, fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002',
fn_extra = 0x0, fn_mcxt = 0x3053a50, fn_expr = 0x0}
HJ_BUILD_HASHTABLE->賦值,并執行此Hash Node節點,結果總元組數為10000
(gdb) n
289 hashNode->hashtable = hashtable;
(gdb)
290 (void) MultiExecProcNode((PlanState *) hashNode);
(gdb)
297 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
(gdb) p hashtable->totalTuples
$18 = 10000
HJ_BUILD_HASHTABLE->批次數為1,只需要執行1個批次即可
(gdb) n
304 hashtable->nbatch_outstart = hashtable->nbatch;
(gdb) p hashtable->nbatch
$19 = 1
HJ_BUILD_HASHTABLE->重置OuterNotEmpty為F
(gdb) n
311 node->hj_OuterNotEmpty = false;
(gdb)
313 if (parallel)
HJ_BUILD_HASHTABLE->非并行執行,切換狀態為HJ_NEED_NEW_OUTER
(gdb)
313 if (parallel)
(gdb) n
340 node->hj_JoinState = HJ_NEED_NEW_OUTER;
HJ_NEED_NEW_OUTER->獲取(執行ExecHashJoinOuterGetTuple)下一個outer relation的一個元組
349 if (parallel)
(gdb) n
354 outerTupleSlot =
(gdb)
357 if (TupIsNull(outerTupleSlot))
(gdb) p *outerTupleSlot
$20 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = true,
tts_tuple = 0x2f88300, tts_tupleDescriptor = 0x7f0710d02bd0, tts_mcxt = 0x2ee1640, tts_buffer = 507, tts_nvalid = 1,
tts_values = 0x2ee22a8, tts_isnull = 0x2ee22d0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 2, tts_fixedTupleDescriptor = true}
HJ_NEED_NEW_OUTER->設置相關變量
(gdb) n
371 econtext->ecxt_outertuple = outerTupleSlot;
(gdb)
372 node->hj_MatchedOuter = false;
(gdb)
378 node->hj_CurHashValue = hashvalue;
(gdb)
379 ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb) p hashvalue
$21 = 2324234220
(gdb) n
381 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
(gdb)
383 node->hj_CurTuple = NULL;
(gdb) p *node
$22 = {js = {ps = {type = T_HashJoinState, plan = 0x2faaff8, state = 0x2ee1758, ExecProcNode = 0x70291d <ExecHashJoin>,
ExecProcNodeReal = 0x70291d <ExecHashJoin>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x2ee2070, righttree = 0x2ee2918, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2f20d98, ps_ExprContext = 0x2ee1fb0, ps_ProjInfo = 0x2ee3550, scandesc = 0x0},
jointype = JOIN_INNER, single_match = true, joinqual = 0x0}, hashclauses = 0x2f21430, hj_OuterHashKeys = 0x2f22230,
hj_InnerHashKeys = 0x2f22740, hj_HashOperators = 0x2f227a0, hj_HashTable = 0x2f88ee8, hj_CurHashValue = 2324234220,
hj_CurBucketNo = 16364, hj_CurSkewBucketNo = -1, hj_CurTuple = 0x0, hj_OuterTupleSlot = 0x2f212f0,
hj_HashTupleSlot = 0x2ee3278, hj_NullOuterTupleSlot = 0x0, hj_NullInnerTupleSlot = 0x0, hj_FirstOuterTupleSlot = 0x0,
hj_JoinState = 2, hj_MatchedOuter = false, hj_OuterNotEmpty = true}
(gdb) p *econtext
$25 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x2ee2248,
ecxt_per_query_memory = 0x2ee1640, ecxt_per_tuple_memory = 0x2f710c0, ecxt_param_exec_vals = 0x0,
ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true,
domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x2ee1758, ecxt_callbacks = 0x0}
(gdb) p *node->hj_HashTupleSlot
$26 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,
tts_tuple = 0x0, tts_tupleDescriptor = 0x2ee3060, tts_mcxt = 0x2ee1640, tts_buffer = 0, tts_nvalid = 0,
tts_values = 0x2ee32d8, tts_isnull = 0x2ee32f0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}
HJ_NEED_NEW_OUTER->切換狀態為HJ_SCAN_BUCKET,開始掃描Hash Table
(gdb) n
407 node->hj_JoinState = HJ_SCAN_BUCKET;
(gdb)
HJ_SCAN_BUCKET->不匹配,切換狀態為HJ_FILL_OUTER_TUPLE
(gdb)
416 if (parallel)
(gdb) n
427 if (!ExecScanHashBucket(node, econtext))
(gdb)
430 node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
(gdb)
431 continue;
(gdb)
HJ_FILL_OUTER_TUPLE->切換狀態為HJ_NEED_NEW_OUTER
不管是否獲得/發出一個元組,下一個狀態是NEED_NEW_OUTER
209 switch (node->hj_JoinState)
(gdb)
483 node->hj_JoinState = HJ_NEED_NEW_OUTER;
HJ_FILL_OUTER_TUPLE->由于不是外連接,無需FILL,回到HJ_NEED_NEW_OUTER處理邏輯
(gdb) n
485 if (!node->hj_MatchedOuter &&
(gdb)
486 HJ_FILL_OUTER(node))
(gdb)
485 if (!node->hj_MatchedOuter &&
(gdb)
549 }
(gdb)
HJ_SCAN_BUCKET->在SCAN_BUCKET成功掃描的位置設置斷點
(gdb) b nodeHashjoin.c:441
Breakpoint 3 at 0x7025c3: file nodeHashjoin.c, line 441.
(gdb) c
Continuing.
Breakpoint 3, ExecHashJoinImpl (pstate=0x2ee1d98, parallel=false) at nodeHashjoin.c:447
447 if (joinqual == NULL || ExecQual(joinqual, econtext))
HJ_SCAN_BUCKET->存在匹配的元組,設置相關標記
(gdb) n
449 node->hj_MatchedOuter = true;
(gdb)
450 HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
(gdb)
453 if (node->js.jointype == JOIN_ANTI)
(gdb) n
464 if (node->js.single_match)
(gdb)
465 node->hj_JoinState = HJ_NEED_NEW_OUTER;
(gdb)
HJ_SCAN_BUCKET->執行投影操作并返回
467 if (otherqual == NULL || ExecQual(otherqual, econtext))
(gdb)
468 return ExecProject(node->js.ps.ps_ProjInfo);
(gdb)
總的來說,Hash Join的實現是創建inner relation的Hash Table,然后獲取outer relation的元組,如匹配則執行投影操作返回相應的元組,除了創建HT外,其他步驟不斷的變換狀態執行,直至滿足Portal要求的元組數量為止.
Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。