您好,登錄后才能下訂單哦!
本節是ExecHashJoin函數介紹的第二部分,主要介紹了ExecHashJoin中依賴的其他函數的實現邏輯,包括ExecHashTableCreate、ExecChooseHashTableSize等。
Plan
所有計劃節點通過將Plan結構作為第一個字段從Plan結構“派生”。這確保了在將節點轉換為計劃節點時,一切都能正常工作。(在執行器中以通用方式傳遞時,節點指針經常被轉換為Plan *)
/* ----------------
* Plan node
*
* All plan nodes "derive" from the Plan structure by having the
* Plan structure as the first field. This ensures that everything works
* when nodes are cast to Plan's. (node pointers are frequently cast to Plan*
* when passed around generically in the executor)
* 所有計劃節點通過將Plan結構作為第一個字段從Plan結構“派生”。
* 這確保了在將節點轉換為計劃節點時,一切都能正常工作。
* (在執行器中以通用方式傳遞時,節點指針經常被轉換為Plan *)
*
* We never actually instantiate any Plan nodes; this is just the common
* abstract superclass for all Plan-type nodes.
* 從未實例化任何Plan節點;這只是所有Plan-type節點的通用抽象超類。
* ----------------
*/
typedef struct Plan
{
NodeTag type;//節點類型
/*
* 成本估算信息;estimated execution costs for plan (see costsize.c for more info)
*/
Cost startup_cost; /* 啟動成本;cost expended before fetching any tuples */
Cost total_cost; /* 總成本;total cost (assuming all tuples fetched) */
/*
* 優化器估算信息;planner's estimate of result size of this plan step
*/
double plan_rows; /* 行數;number of rows plan is expected to emit */
int plan_width; /* 平均行大小(Byte為單位);average row width in bytes */
/*
* 并行執行相關的信息;information needed for parallel query
*/
bool parallel_aware; /* 是否參與并行執行邏輯?engage parallel-aware logic? */
bool parallel_safe; /* 是否并行安全;OK to use as part of parallel plan? */
/*
* Plan類型節點通用的信息.Common structural data for all Plan types.
*/
int plan_node_id; /* unique across entire final plan tree */
List *targetlist; /* target list to be computed at this node */
List *qual; /* implicitly-ANDed qual conditions */
struct Plan *lefttree; /* input plan tree(s) */
struct Plan *righttree;
List *initPlan; /* Init Plan nodes (un-correlated expr
* subselects) */
/*
* Information for management of parameter-change-driven rescanning
* parameter-change-driven重掃描的管理信息.
*
* extParam includes the paramIDs of all external PARAM_EXEC params
* affecting this plan node or its children. setParam params from the
* node's initPlans are not included, but their extParams are.
*
* allParam includes all the extParam paramIDs, plus the IDs of local
* params that affect the node (i.e., the setParams of its initplans).
* These are _all_ the PARAM_EXEC params that affect this node.
*/
Bitmapset *extParam;
Bitmapset *allParam;
} Plan;
JoinState
Hash/NestLoop/Merge Join的基類
/* ----------------
* JoinState information
*
* Superclass for state nodes of join plans.
* Hash/NestLoop/Merge Join的基類
* ----------------
*/
typedef struct JoinState
{
PlanState ps;//基類PlanState
JoinType jointype;//連接類型
//在找到一個匹配inner tuple的時候,如需要跳轉到下一個outer tuple,則該值為T
bool single_match; /* True if we should skip to next outer tuple
* after finding one inner match */
//連接條件表達式(除了ps.qual)
ExprState *joinqual; /* JOIN quals (in addition to ps.qual) */
} JoinState;
HashJoinState
Hash Join運行期狀態結構體
/* these structs are defined in executor/hashjoin.h: */
typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;
typedef struct HashJoinState
{
JoinState js; /* 基類;its first field is NodeTag */
ExprState *hashclauses;//hash連接條件
List *hj_OuterHashKeys; /* 外表條件鏈表;list of ExprState nodes */
List *hj_InnerHashKeys; /* 內表連接條件;list of ExprState nodes */
List *hj_HashOperators; /* 操作符OIDs鏈表;list of operator OIDs */
HashJoinTable hj_HashTable;//Hash表
uint32 hj_CurHashValue;//當前的Hash值
int hj_CurBucketNo;//當前的bucket編號
int hj_CurSkewBucketNo;//行傾斜bucket編號
HashJoinTuple hj_CurTuple;//當前元組
TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
TupleTableSlot *hj_NullOuterTupleSlot;//用于外連接的outer虛擬slot
TupleTableSlot *hj_NullInnerTupleSlot;//用于外連接的inner虛擬slot
TupleTableSlot *hj_FirstOuterTupleSlot;//
int hj_JoinState;//JoinState狀態
bool hj_MatchedOuter;//是否匹配
bool hj_OuterNotEmpty;//outer relation是否為空
} HashJoinState;
HashJoinTable
Hash表數據結構
typedef struct HashJoinTableData
{
int nbuckets; /* 內存中的hash桶數;# buckets in the in-memory hash table */
int log2_nbuckets; /* 2的對數(nbuckets必須是2的冪);its log2 (nbuckets must be a power of 2) */
int nbuckets_original; /* 首次hash時的桶數;# buckets when starting the first hash */
int nbuckets_optimal; /* 優化后的桶數(每個批次);optimal # buckets (per batch) */
int log2_nbuckets_optimal; /* 2的對數;log2(nbuckets_optimal) */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
//bucket [i]是內存中第i個桶中的元組鏈表的head item
union
{
/* unshared array is per-batch storage, as are all the tuples */
//未共享數組是按批處理存儲的,所有元組均如此
struct HashJoinTupleData **unshared;
/* shared array is per-query DSA area, as are all the tuples */
//共享數組是每個查詢的DSA區域,所有元組均如此
dsa_pointer_atomic *shared;
} buckets;
bool keepNulls; /*如不匹配則存儲NULL元組,該值為T;true to store unmatchable NULL tuples */
bool skewEnabled; /*是否使用傾斜優化?;are we using skew optimization? */
HashSkewBucket **skewBucket; /* 傾斜的hash表桶數;hashtable of skew buckets */
int skewBucketLen; /* skewBucket數組大小;size of skewBucket array (a power of 2!) */
int nSkewBuckets; /* 活動的傾斜桶數;number of active skew buckets */
int *skewBucketNums; /* 活動傾斜桶數組索引;array indexes of active skew buckets */
int nbatch; /* 批次數;number of batches */
int curbatch; /* 當前批次,第一輪為0;current batch #; 0 during 1st pass */
int nbatch_original; /* 在開始inner掃描時的批次;nbatch when we started inner scan */
int nbatch_outstart; /* 在開始outer掃描時的批次;nbatch when we started outer scan */
bool growEnabled; /* 關閉nbatch增加的標記;flag to shut off nbatch increases */
double totalTuples; /* 從inner plan獲得的元組數;# tuples obtained from inner plan */
double partialTuples; /* 通過hashjoin獲得的inner元組數;# tuples obtained from inner plan by me */
double skewTuples; /* 傾斜元組數;# tuples inserted into skew tuples */
/*
* These arrays are allocated for the life of the hash join, but only if
* nbatch > 1. A file is opened only when we first write a tuple into it
* (otherwise its pointer remains NULL). Note that the zero'th array
* elements never get used, since we will process rather than dump out any
* tuples of batch zero.
* 這些數組在散列連接的生命周期內分配,但僅當nbatch > 1時分配。
* 只有當第一次將元組寫入文件時,文件才會打開(否則它的指針將保持NULL)。
* 注意,第0個數組元素永遠不會被使用,因為批次0的元組永遠不會轉儲.
*/
BufFile **innerBatchFile; /* 每個批次的inner虛擬臨時文件緩存;buffered virtual temp file per batch */
BufFile **outerBatchFile; /* 每個批次的outer虛擬臨時文件緩存;buffered virtual temp file per batch */
/*
* Info about the datatype-specific hash functions for the datatypes being
* hashed. These are arrays of the same length as the number of hash join
* clauses (hash keys).
* 有關正在散列的數據類型的特定于數據類型的散列函數的信息。
* 這些數組的長度與散列連接子句(散列鍵)的數量相同。
*/
FmgrInfo *outer_hashfunctions; /* outer hash函數FmgrInfo結構體;lookup data for hash functions */
FmgrInfo *inner_hashfunctions; /* inner hash函數FmgrInfo結構體;lookup data for hash functions */
bool *hashStrict; /* 每個hash操作符是嚴格?is each hash join operator strict? */
Size spaceUsed; /* 元組使用的當前內存空間大小;memory space currently used by tuples */
Size spaceAllowed; /* 空間使用上限;upper limit for space used */
Size spacePeak; /* 峰值的空間使用;peak space used */
Size spaceUsedSkew; /* 傾斜哈希表的當前空間使用情況;skew hash table's current space usage */
Size spaceAllowedSkew; /* 傾斜哈希表的使用上限;upper limit for skew hashtable */
MemoryContext hashCxt; /* 整個散列連接存儲的上下文;context for whole-hash-join storage */
MemoryContext batchCxt; /* 該批次存儲的上下文;context for this-batch-only storage */
/* used for dense allocation of tuples (into linked chunks) */
//用于密集分配元組(到鏈接塊中)
HashMemoryChunk chunks; /* 整個批次使用一個鏈表;one list for the whole batch */
/* Shared and private state for Parallel Hash. */
//并行hash使用的共享和私有狀態
HashMemoryChunk current_chunk; /* 后臺進程的當前chunk;this backend's current chunk */
dsa_area *area; /* 用于分配內存的DSA區域;DSA area to allocate memory from */
ParallelHashJoinState *parallel_state;//并行執行狀態
ParallelHashJoinBatchAccessor *batches;//并行訪問器
dsa_pointer current_chunk_shared;//當前chunk的開始指針
} HashJoinTableData;
typedef struct HashJoinTableData *HashJoinTable;
HashJoinTupleData
Hash連接元組數據
/* ----------------------------------------------------------------
* hash-join hash table structures
*
* Each active hashjoin has a HashJoinTable control block, which is
* palloc'd in the executor's per-query context. All other storage needed
* for the hashjoin is kept in private memory contexts, two for each hashjoin.
* This makes it easy and fast to release the storage when we don't need it
* anymore. (Exception: data associated with the temp files lives in the
* per-query context too, since we always call buffile.c in that context.)
* 每個活動的hashjoin都有一個可散列的控制塊,它在執行程序的每個查詢上下文中都是通過palloc分配的。
* hashjoin所需的所有其他存儲都保存在私有內存上下文中,每個hashjoin有兩個。
* 當不再需要它的時候,這使得釋放它變得簡單和快速。
* (例外:與臨時文件相關的數據也存在于每個查詢上下文中,因為在這種情況下總是調用buffile.c。)
*
* The hashtable contexts are made children of the per-query context, ensuring
* that they will be discarded at end of statement even if the join is
* aborted early by an error. (Likewise, any temporary files we make will
* be cleaned up by the virtual file manager in event of an error.)
* hashtable上下文是每個查詢上下文的子上下文,確保在語句結束時丟棄它們,即使連接因錯誤而提前中止。
* (同樣,如果出現錯誤,虛擬文件管理器將清理創建的任何臨時文件。)
*
* Storage that should live through the entire join is allocated from the
* "hashCxt", while storage that is only wanted for the current batch is
* allocated in the "batchCxt". By resetting the batchCxt at the end of
* each batch, we free all the per-batch storage reliably and without tedium.
* 通過整個連接的存儲空間應從“hashCxt”分配,而只需要當前批處理的存儲空間在“batchCxt”中分配。
* 通過在每個批處理結束時重置batchCxt,可以可靠地釋放每個批處理的所有存儲,而不會感到單調乏味。
*
* During first scan of inner relation, we get its tuples from executor.
* If nbatch > 1 then tuples that don't belong in first batch get saved
* into inner-batch temp files. The same statements apply for the
* first scan of the outer relation, except we write tuples to outer-batch
* temp files. After finishing the first scan, we do the following for
* each remaining batch:
* 1. Read tuples from inner batch file, load into hash buckets.
* 2. Read tuples from outer batch file, match to hash buckets and output.
* 在內部關系的第一次掃描中,從執行者那里得到了它的元組。
* 如果nbatch > 1,那么不屬于第一批的元組將保存到批內臨時文件中。
* 相同的語句適用于外關系的第一次掃描,但是我們將元組寫入外部批處理臨時文件。
* 完成第一次掃描后,我們對每批剩余的元組做如下處理:
* 1.從內部批處理文件讀取元組,加載到散列桶中。
* 2.從外部批處理文件讀取元組,匹配哈希桶和輸出。
*
* It is possible to increase nbatch on the fly if the in-memory hash table
* gets too big. The hash-value-to-batch computation is arranged so that this
* can only cause a tuple to go into a later batch than previously thought,
* never into an earlier batch. When we increase nbatch, we rescan the hash
* table and dump out any tuples that are now of a later batch to the correct
* inner batch file. Subsequently, while reading either inner or outer batch
* files, we might find tuples that no longer belong to the current batch;
* if so, we just dump them out to the correct batch file.
* 如果內存中的哈希表太大,可以動態增加nbatch。
* 散列值到批處理的計算是這樣安排的:
* 這只會導致元組進入比以前認為的更晚的批處理,而不會進入更早的批處理。
* 當增加nbatch時,重新掃描哈希表,并將現在屬于后面批處理的任何元組轉儲到正確的內部批處理文件。
* 隨后,在讀取內部或外部批處理文件時,可能會發現不再屬于當前批處理的元組;
* 如果是這樣,只需將它們轉儲到正確的批處理文件即可。
* ----------------------------------------------------------------
*/
/* these are in nodes/execnodes.h: */
/* typedef struct HashJoinTupleData *HashJoinTuple; */
/* typedef struct HashJoinTableData *HashJoinTable; */
typedef struct HashJoinTupleData
{
/* link to next tuple in same bucket */
//link同一個桶中的下一個元組
union
{
struct HashJoinTupleData *unshared;
dsa_pointer shared;
} next;
uint32 hashvalue; /* 元組的hash值;tuple's hash code */
/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} HashJoinTupleData;
#define HJTUPLE_OVERHEAD MAXALIGN(sizeof(HashJoinTupleData))
#define HJTUPLE_MINTUPLE(hjtup) \
((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))
ExecHashTableCreate
ExecHashTableCreate函數初始化hashjoin需要使用的hashtable.
/*----------------------------------------------------------------------------------------------------
HJ_BUILD_HASHTABLE 階段
-----------------------------------------------------------------------------------------------------*/
/* ----------------
* these are defined to avoid confusion problems with "left"
* and "right" and "inner" and "outer". The convention is that
* the "left" plan is the "outer" plan and the "right" plan is
* the inner plan, but these make the code more readable.
* 這些定義是為了避免“左”和“右”以及“內”和“外”的混淆問題。
* 約定是,“左”計劃是“外部”計劃,“右”計劃是內部計劃,但是這些計劃使代碼更具可讀性。
* ----------------
*/
#define innerPlan(node) (((Plan *)(node))->righttree)
#define outerPlan(node) (((Plan *)(node))->lefttree)
/* ----------------------------------------------------------------
* ExecHashTableCreate
*
* create an empty hashtable data structure for hashjoin.
* 初始化hashjoin需要使用的hashtable.
* ----------------------------------------------------------------
*/
HashJoinTable
ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
{
Hash *node;
HashJoinTable hashtable;
Plan *outerNode;
size_t space_allowed;
int nbuckets;
int nbatch;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
int nkeys;
int i;
ListCell *ho;
MemoryContext oldcxt;
/*
* Get information about the size of the relation to be hashed (it's the
* "outer" subtree of this node, but the inner relation of the hashjoin).
* Compute the appropriate size of the hash table.
* 獲取有關要散列的關系大小的信息(它是該節點的“outer”子樹,hashjoin的inner relation)。
* 計算哈希表的適當大小。
*/
node = (Hash *) state->ps.plan;//獲取Hash節點
outerNode = outerPlan(node);//獲取outer relation Plan節點
/*
* If this is shared hash table with a partial plan, then we can't use
* outerNode->plan_rows to estimate its size. We need an estimate of the
* total number of rows across all copies of the partial plan.
* 如果這是帶有部分計劃(并行處理)的共享哈希表,那么不能使用outerNode->plan_rows來估計它的大小。
* 需要估算跨部分計劃的所有副本的行總數。
*/
rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;//獲取總行數
ExecChooseHashTableSize(rows, outerNode->plan_width,
OidIsValid(node->skewTable),
state->parallel_state != NULL,
state->parallel_state != NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
&nbuckets, &nbatch, &num_skew_mcvs);//計算Hash Table的大小尺寸
/* nbuckets must be a power of 2 */
//nbuckets(hash桶數)必須是2的n次方
log2_nbuckets = my_log2(nbuckets);
Assert(nbuckets == (1 << log2_nbuckets));
/*
* Initialize the hash table control block.
* 初始化hash表的控制塊
*
* The hashtable control block is just palloc'd from the executor's
* per-query memory context. Everything else should be kept inside the
* subsidiary hashCxt or batchCxt.
* hashtable控件塊是從執行程序的每個查詢內存上下文中調取的。
* 其他內容都應該保存在附屬hashCxt或batchCxt中。
*/
hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));//分配內存
hashtable->nbuckets = nbuckets;//桶數
hashtable->nbuckets_original = nbuckets;
hashtable->nbuckets_optimal = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->log2_nbuckets_optimal = log2_nbuckets;
hashtable->buckets.unshared = NULL;
hashtable->keepNulls = keepNulls;
hashtable->skewEnabled = false;
hashtable->skewBucket = NULL;
hashtable->skewBucketLen = 0;
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
hashtable->partialTuples = 0;
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
hashtable->spaceUsedSkew = 0;
hashtable->spaceAllowedSkew =
hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
hashtable->chunks = NULL;
hashtable->current_chunk = NULL;
hashtable->parallel_state = state->parallel_state;
hashtable->area = state->ps.state->es_query_dsa;
hashtable->batches = NULL;
#ifdef HJDEBUG
printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
hashtable, nbatch, nbuckets);
#endif
/*
* Create temporary memory contexts in which to keep the hashtable working
* storage. See notes in executor/hashjoin.h.
* 創建臨時內存上下文,以便在其中保持散列表的相關信息。
* 參見executor/hashjoin.h中的注釋。
*/
hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
"HashTableContext",
ALLOCSET_DEFAULT_SIZES);
hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
"HashBatchContext",
ALLOCSET_DEFAULT_SIZES);
/* Allocate data that will live for the life of the hashjoin */
//分配內存,切換至hashCxt
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
/*
* Get info about the hash functions to be used for each hash key. Also
* remember whether the join operators are strict.
* 獲取關于每個散列鍵要使用的散列函數的信息。
* 還要記住連接操作符是否嚴格。
*/
nkeys = list_length(hashOperators);//鍵值數
hashtable->outer_hashfunctions =
(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));//outer relation所使用的hash函數
hashtable->inner_hashfunctions =
(FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));//inner relation所使用的hash函數
hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));//是否嚴格的操作符
i = 0;
foreach(ho, hashOperators)//遍歷hash操作符
{
Oid hashop = lfirst_oid(ho);//hash操作符
Oid left_hashfn;//左函數
Oid right_hashfn;//右函數
//獲取與給定操作符兼容的標準哈希函數的OID,并根據需要對其LHS和/或RHS數據類型進行操作。
if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))//獲取hash函數
elog(ERROR, "could not find hash function for hash operator %u",
hashop);
fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
hashtable->hashStrict[i] = op_strict(hashop);
i++;
}
if (nbatch > 1 && hashtable->parallel_state == NULL)//批次>1而且并行狀態為NULL
{
/*
* allocate and initialize the file arrays in hashCxt (not needed for
* parallel case which uses shared tuplestores instead of raw files)
* 在hashCxt中分配和初始化文件數組(對于使用共享tuplestore而不是原始文件的并行情況不需要)
*/
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));//用于緩存該批次的inner relation的tuple
hashtable->outerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));//用于緩存該批次的outerr relation的tuple
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for them */
//這些文件需要時才會打開……
//…但是要確保為它們建立了臨時表空間
PrepareTempTablespaces();
}
MemoryContextSwitchTo(oldcxt);//切換回原內存上下文
if (hashtable->parallel_state)//并行處理
{
ParallelHashJoinState *pstate = hashtable->parallel_state;
Barrier *build_barrier;
/*
* Attach to the build barrier. The corresponding detach operation is
* in ExecHashTableDetach. Note that we won't attach to the
* batch_barrier for batch 0 yet. We'll attach later and start it out
* in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
* and then loaded while hashing (the standard hybrid hash join
* algorithm), and we'll coordinate that using build_barrier.
*/
build_barrier = &pstate->build_barrier;
BarrierAttach(build_barrier);
/*
* So far we have no idea whether there are any other participants,
* and if so, what phase they are working on. The only thing we care
* about at this point is whether someone has already created the
* SharedHashJoinBatch objects and the hash table for batch 0. One
* backend will be elected to do that now if necessary.
*/
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING))
{
pstate->nbatch = nbatch;
pstate->space_allowed = space_allowed;
pstate->growth = PHJ_GROWTH_OK;
/* Set up the shared state for coordinating batches. */
ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
/*
* Allocate batch 0's hash table up front so we can load it
* directly while hashing.
*/
pstate->nbuckets = nbuckets;
ExecParallelHashTableAlloc(hashtable, 0);
}
/*
* The next Parallel Hash synchronization point is in
* MultiExecParallelHash(), which will progress it all the way to
* PHJ_BUILD_DONE. The caller must not return control from this
* executor node between now and then.
*/
}
else//非并行處理
{
/*
* Prepare context for the first-scan space allocations; allocate the
* hashbucket array therein, and set each bucket "empty".
* 為第一次掃描空間分配準備上下文;在其中分配hashbucket數組,并將每個bucket設置為“空”。
*/
MemoryContextSwitchTo(hashtable->batchCxt);//切換上下文
hashtable->buckets.unshared = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));//分配內存空間
/*
* Set up for skew optimization, if possible and there's a need for
* more than one batch. (In a one-batch join, there's no point in
* it.)
* 如需要多個批處理,設置傾斜優化。(在單批處理連接中,這是沒有意義的。)
*/
if (nbatch > 1)
ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
MemoryContextSwitchTo(oldcxt);//切換上下文
}
return hashtable;//返回Hash表
}
/*
* This routine fills a FmgrInfo struct, given the OID
* of the function to be called.
* 給定要調用的函數的OID,這個例程填充一個FmgrInfo結構體。
*
* The caller's CurrentMemoryContext is used as the fn_mcxt of the info
* struct; this means that any subsidiary data attached to the info struct
* (either by fmgr_info itself, or later on by a function call handler)
* will be allocated in that context. The caller must ensure that this
* context is at least as long-lived as the info struct itself. This is
* not a problem in typical cases where the info struct is on the stack or
* in freshly-palloc'd space. However, if one intends to store an info
* struct in a long-lived table, it's better to use fmgr_info_cxt.
* 調用方的CurrentMemoryContext用作info結構體的fn_mcxt;
* 這意味著附加到info結構體的任何附屬數據(通過fmgr_info本身,或者稍后通過函數調用處理程序)將在該上下文中分配。
* 調用者必須確保這個上下文的生命周期至少與info結構本身一樣。
* 在信息結構位于堆棧上或在新palloc空間中的典型情況下,這不是一個問題。
* 但是,如果希望在long-lived表中存儲信息結構,最好使用fmgr_info_cxt。
*/
void
fmgr_info(Oid functionId, FmgrInfo *finfo)
{
fmgr_info_cxt_security(functionId, finfo, CurrentMemoryContext, false);
}
ExecChooseHashTableSize
ExecChooseHashTableSize函數根據給定要散列的關系的估計大小(行數和平均行寬),計算適當的散列表大小。
/*
* Compute appropriate size for hashtable given the estimated size of the
* relation to be hashed (number of rows and average row width).
* 給定要散列的關系的估計大小(行數和平均行寬),計算適當的散列表大小。
*
* This is exported so that the planner's costsize.c can use it.
* 這些信息已導出以便計劃器costsize.c可以使用
*/
/* Target bucket loading (tuples per bucket) */
#define NTUP_PER_BUCKET 1
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
bool try_combined_work_mem,
int parallel_workers,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs)
{
int tupsize;//元組大小
double inner_rel_bytes;//inner relation大小
long bucket_bytes;//桶大小
long hash_table_bytes;//hash table大小
long skew_table_bytes;//傾斜表大小
long max_pointers;//最大的指針數
long mppow2;//
int nbatch = 1;//批次
int nbuckets;//桶數
double dbuckets;//
/* Force a plausible relation size if no info */
//如relation大小沒有信息,則設定為默認值1000.0
if (ntuples <= 0.0)
ntuples = 1000.0;
/*
* Estimate tupsize based on footprint of tuple in hashtable... note this
* does not allow for any palloc overhead. The manipulations of spaceUsed
* don't count palloc overhead either.
* 根據哈希表中tuple的占用空間估計tupsize…
* 注意,這不允許任何palloc開銷。使用的空間操作也不包括palloc開銷。
*/
tupsize = HJTUPLE_OVERHEAD +
MAXALIGN(SizeofMinimalTupleHeader) +
MAXALIGN(tupwidth);//估算元組大小
inner_rel_bytes = ntuples * tupsize;//inner relation大小
/*
* Target in-memory hashtable size is work_mem kilobytes.
* 目標內存中的散列表大小為work_mem KB。
*/
hash_table_bytes = work_mem * 1024L;
/*
* Parallel Hash tries to use the combined work_mem of all workers to
* avoid the need to batch. If that won't work, it falls back to work_mem
* per worker and tries to process batches in parallel.
* 并行散列試圖使用所有worker的所有work_mem來避免分批處理。
* 如果這不起作用,它將返回到每個worker的work_mem,并嘗試并行處理批處理。
*/
if (try_combined_work_mem)//嘗試融合work_mem
hash_table_bytes += hash_table_bytes * parallel_workers;
*space_allowed = hash_table_bytes;
/*
* If skew optimization is possible, estimate the number of skew buckets
* that will fit in the memory allowed, and decrement the assumed space
* available for the main hash table accordingly.
* 如果可以進行傾斜優化,估算允許內存中容納的傾斜桶的數量,并相應地減少主哈希表的假定可用空間。
*
* We make the optimistic assumption that each skew bucket will contain
* one inner-relation tuple. If that turns out to be low, we will recover
* at runtime by reducing the number of skew buckets.
* 我們樂觀地假設,每個傾斜桶將包含一個內部關系元組。
* 如果結果很低,將通過減少傾斜桶的數量在運行時進行恢復。
*
* hashtable->skewBucket will have up to 8 times as many HashSkewBucket
* pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
* will round up to the next power of 2 and then multiply by 4 to reduce
* collisions.
* hashtable->skewBucket的指針數量將是允許的mcv數量的8倍,
* 因為ExecHashBuildSkewHash將四舍五入到下一個2次方,然后乘以4以減少沖突。
*/
if (useskew)
{
//傾斜優化
skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
/*----------
* Divisor is:
* size of a hash tuple +
* worst-case size of skewBucket[] per MCV +
* size of skewBucketNums[] entry +
* size of skew bucket struct itself
*----------
*/
*num_skew_mcvs = skew_table_bytes / (tupsize +
(8 * sizeof(HashSkewBucket *)) +
sizeof(int) +
SKEW_BUCKET_OVERHEAD);
if (*num_skew_mcvs > 0)
hash_table_bytes -= skew_table_bytes;
}
else
*num_skew_mcvs = 0;//不使用傾斜優化,默認為0
/*
* Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
* memory is filled, assuming a single batch; but limit the value so that
* the pointer arrays we'll try to allocate do not exceed work_mem nor
* MaxAllocSize.
* 設置nbuckets,假設為單批處理,當內存被填滿時,實現NTUP_PER_BUCKET的平均桶負載;
* 但是要限制這個值,以便試圖分配的指針數組不會超過work_mem或MaxAllocSize。
*
* Note that both nbuckets and nbatch must be powers of 2 to make
* ExecHashGetBucketAndBatch fast.
* 注意,nbucket和nbatch都必須是2的冪,才能使ExecHashGetBucketAndBatch更快。
*/
max_pointers = *space_allowed / sizeof(HashJoinTuple);//最大指針數
max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));//控制上限
/* If max_pointers isn't a power of 2, must round it down to one */
//如果max_pointer不是2的冪,則必須四舍五入到符合規則的某個值(如110.1 --> 128)
mppow2 = 1L << my_log2(max_pointers);
if (max_pointers != mppow2)
max_pointers = mppow2 / 2;
/* Also ensure we avoid integer overflow in nbatch and nbuckets */
/* (this step is redundant given the current value of MaxAllocSize) */
//還要確保在nbatch和nbucket中避免整數溢出
//(鑒于MaxAllocSize的當前值,此步驟是多余的)
max_pointers = Min(max_pointers, INT_MAX / 2);//設定上限
dbuckets = ceil(ntuples / NTUP_PER_BUCKET);//取整
dbuckets = Min(dbuckets, max_pointers);//設定上限
nbuckets = (int) dbuckets;//桶數
/* don't let nbuckets be really small, though ... */
//但是,不要讓nbucket非常小……
nbuckets = Max(nbuckets, 1024);//設定下限(1024)
/* ... and force it to be a power of 2. */
//2的冪
nbuckets = 1 << my_log2(nbuckets);
/*
* If there's not enough space to store the projected number of tuples and
* the required bucket headers, we will need multiple batches.
* 如果沒有足夠的空間來存儲預計的元組數量和所需的bucket headers,將需要多個批處理。
*/
bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
if (inner_rel_bytes + bucket_bytes > hash_table_bytes)//inner relation大小 + 桶數大于可用空間
{
/* We'll need multiple batches */
//需要多批次
long lbuckets;
double dbatch;
int minbatch;
long bucket_size;
/*
* If Parallel Hash with combined work_mem would still need multiple
* batches, we'll have to fall back to regular work_mem budget.
* 如果合并了work_mem的并行散列仍然需要多個批處理,將不得不回到常規的work_mem預算。
*/
if (try_combined_work_mem)
{
ExecChooseHashTableSize(ntuples, tupwidth, useskew,
false, parallel_workers,
space_allowed,
numbuckets,
numbatches,
num_skew_mcvs);
return;
}
/*
* Estimate the number of buckets we'll want to have when work_mem is
* entirely full. Each bucket will contain a bucket pointer plus
* NTUP_PER_BUCKET tuples, whose projected size already includes
* overhead for the hash code, pointer to the next tuple, etc.
* 估計work_mem完全用完時需要的桶數。
* 每個桶將包含一個桶指針和NTUP_PER_BUCKET元組,
* 其投影大小已經包括哈希碼的開銷、指向下一個元組的指針等等。
*/
bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));//桶大小
lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
lbuckets = Min(lbuckets, max_pointers);
nbuckets = (int) lbuckets;
nbuckets = 1 << my_log2(nbuckets);
bucket_bytes = nbuckets * sizeof(HashJoinTuple);
/*
* Buckets are simple pointers to hashjoin tuples, while tupsize
* includes the pointer, hash code, and MinimalTupleData. So buckets
* should never really exceed 25% of work_mem (even for
* NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
* 2^N bytes, where we might get more because of doubling. So let's
* look for 50% here.
* Buckets是指向hashjoin元組的簡單指針,而tupsize包含指針、散列代碼和MinimalTupleData。
* 所以Buckets的實際大小不應該超過work_mem的25%(即使對于NTUP_PER_BUCKET=1);
* 除了work_mem值不是2 ^ N個字節這個原因外,翻倍可能會得到更多的,這里試著使用50%
*/
Assert(bucket_bytes <= hash_table_bytes / 2);
/* Calculate required number of batches. */
//計算批次數
dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
dbatch = Min(dbatch, max_pointers);
minbatch = (int) dbatch;
nbatch = 2;
while (nbatch < minbatch)
nbatch <<= 1;
}
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
}
測試腳本如下
testdb=# set enable_nestloop=false;
SET
testdb=# set enable_mergejoin=false;
SET
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je
testdb(# from t_grxx gr inner join t_jfxx jf
testdb(# on gr.dwbh = dw.dwbh
testdb(# and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Sort (cost=14828.83..15078.46 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Sort Key: dw.dwbh
-> Hash Join (cost=3176.00..6537.55 rows=99850 width=47)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
-> Hash Join (cost=289.00..2277.61 rows=99850 width=32)
Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
Inner Unique: true
Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
-> Seq Scan on public.t_grxx gr (cost=0.00..1726.00 rows=100000 width=16)
Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
-> Hash (cost=164.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Seq Scan on public.t_dwxx dw (cost=0.00..164.00 rows=10000 width=20)
Output: dw.dwmc, dw.dwbh, dw.dwdz
-> Hash (cost=1637.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
-> Seq Scan on public.t_jfxx jf (cost=0.00..1637.00 rows=100000 width=20)
Output: jf.ny, jf.je, jf.grbh
(20 rows)
啟動gdb,設置斷點,進入ExecHashTableCreate
(gdb) b ExecHashTableCreate
Breakpoint 1 at 0x6fc75d: file nodeHash.c, line 449.
(gdb) c
Continuing.
Breakpoint 1, ExecHashTableCreate (state=0x1e3cbc8, hashOperators=0x1e59890, keepNulls=false) at nodeHash.c:449
449 node = (Hash *) state->ps.plan;
獲取相關信息
449 node = (Hash *) state->ps.plan;
(gdb) n
450 outerNode = outerPlan(node);
(gdb)
457 rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
(gdb)
462 state->parallel_state != NULL ?
(gdb)
459 ExecChooseHashTableSize(rows, outerNode->plan_width,
(gdb)
獲取Hash節點;
outer節點為順序掃描SeqScan節點
inner(構造hash表的relation)行數為10000
(gdb) p *node
$1 = {plan = {type = T_Hash, startup_cost = 164, total_cost = 164, plan_rows = 10000, plan_width = 20,
parallel_aware = false, parallel_safe = true, plan_node_id = 4, targetlist = 0x1e4bf90, qual = 0x0,
lefttree = 0x1e493e8, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}, skewTable = 16977,
skewColumn = 1, skewInherit = false, rows_total = 0}
(gdb) p *outerNode
$2 = {type = T_SeqScan, startup_cost = 0, total_cost = 164, plan_rows = 10000, plan_width = 20, parallel_aware = false,
parallel_safe = true, plan_node_id = 5, targetlist = 0x1e492b0, qual = 0x0, lefttree = 0x0, righttree = 0x0,
initPlan = 0x0, extParam = 0x0, allParam = 0x0}
(gdb) p rows
$3 = 10000
(gdb)
進入ExecChooseHashTableSize函數
(gdb) step
ExecChooseHashTableSize (ntuples=10000, tupwidth=20, useskew=true, try_combined_work_mem=false, parallel_workers=0,
space_allowed=0x7ffdcf148540, numbuckets=0x7ffdcf14853c, numbatches=0x7ffdcf148538, num_skew_mcvs=0x7ffdcf148534)
at nodeHash.c:677
677 int nbatch = 1;
ExecChooseHashTableSize->計算元組大小(56B)/inner relation大小(約560K)/hash表空間(16M)
(gdb) n
682 if (ntuples <= 0.0)
(gdb)
690 tupsize = HJTUPLE_OVERHEAD +
(gdb)
693 inner_rel_bytes = ntuples * tupsize;
(gdb)
698 hash_table_bytes = work_mem * 1024L;
(gdb)
705 if (try_combined_work_mem)
(gdb) p tupsize
$4 = 56
(gdb) p inner_rel_bytes
$5 = 560000
(gdb) p hash_table_bytes
$6 = 16777216
ExecChooseHashTableSize->使用數據傾斜優化(所需空間從Hash Table中獲取)
(gdb) n
708 *space_allowed = hash_table_bytes;
(gdb)
724 if (useskew)
(gdb)
726 skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
(gdb) p useskew
$8 = true
(gdb) p hash_table_bytes
$9 = 16441672
(gdb) p skew_table_bytes
$10 = 335544
(gdb) p num_skew_mcvs
$11 = (int *) 0x7ffdcf148534
(gdb) p *num_skew_mcvs
$12 = 2396
(gdb)
ExecChooseHashTableSize->獲取最大指針數目(2097152)
(gdb) n
756 max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
(gdb)
758 mppow2 = 1L << my_log2(max_pointers);
(gdb) n
759 if (max_pointers != mppow2)
(gdb) p max_pointers
$13 = 2097152
(gdb) p mppow2
$15 = 2097152
ExecChooseHashTableSize->計算Hash桶數
(gdb) n
764 max_pointers = Min(max_pointers, INT_MAX / 2);
(gdb)
766 dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
(gdb)
767 dbuckets = Min(dbuckets, max_pointers);
(gdb)
768 nbuckets = (int) dbuckets;
(gdb)
770 nbuckets = Max(nbuckets, 1024);
(gdb)
772 nbuckets = 1 << my_log2(nbuckets);
(gdb)
778 bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
(gdb) n
779 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
(gdb)
834 Assert(nbuckets > 0);
(gdb) p dbuckets
$16 = 10000
(gdb) p nbuckets
$17 = 16384
(gdb) p bucket_bytes
$18 = 131072
ExecChooseHashTableSize->只需要一個批次,賦值,返回
835 Assert(nbatch > 0);
(gdb)
837 *numbuckets = nbuckets;
(gdb)
838 *numbatches = nbatch;
(gdb)
839 }
(gdb)
(gdb)
ExecHashTableCreate (state=0x1e3cbc8, hashOperators=0x1e59890, keepNulls=false) at nodeHash.c:468
468 log2_nbuckets = my_log2(nbuckets);
初始化Hash表
468 log2_nbuckets = my_log2(nbuckets);
(gdb) p nbuckets
$19 = 16384
(gdb) n
469 Assert(nbuckets == (1 << log2_nbuckets));
(gdb)
478 hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
(gdb)
479 hashtable->nbuckets = nbuckets;
...
分配內存上下文
...
(gdb)
522 hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
(gdb)
526 hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
(gdb)
532 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
(gdb)
切換上下文,并初始化hash函數
(gdb)
532 oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
(gdb) n
538 nkeys = list_length(hashOperators);
(gdb)
540 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
(gdb) p nkeys
$20 = 1
(gdb) n
539 hashtable->outer_hashfunctions =
(gdb)
542 (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
(gdb)
541 hashtable->inner_hashfunctions =
(gdb)
543 hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
(gdb)
544 i = 0;
初始化Hash操作符
(gdb) n
545 foreach(ho, hashOperators)
(gdb)
547 Oid hashop = lfirst_oid(ho);
(gdb)
551 if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
(gdb)
554 fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
(gdb)
555 fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
(gdb)
556 hashtable->hashStrict[i] = op_strict(hashop);
(gdb)
557 i++;
(gdb)
545 foreach(ho, hashOperators)
(gdb) p *hashtable->hashStrict
$21 = true
(gdb) n
560 if (nbatch > 1 && hashtable->parallel_state == NULL)
分配hash桶內存空間
gdb) n
575 MemoryContextSwitchTo(oldcxt);
(gdb)
577 if (hashtable->parallel_state)
(gdb)
631 MemoryContextSwitchTo(hashtable->batchCxt);
(gdb)
634 palloc0(nbuckets * sizeof(HashJoinTuple));
(gdb)
633 hashtable->buckets.unshared = (HashJoinTuple *)
(gdb) p nbuckets
$23 = 16384
構造完成,返回hash表
(gdb) n
641 if (nbatch > 1)
(gdb)
644 MemoryContextSwitchTo(oldcxt);
(gdb)
647 return hashtable;
(gdb)
648 }
(gdb)
ExecHashJoinImpl (pstate=0x1e3c048, parallel=false) at nodeHashjoin.c:282
282 node->hj_HashTable = hashtable;
(gdb)
DONE!
Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。