您好,登錄后才能下訂單哦!
本節繼續介紹聚合函數的實現,主要介紹了不使用Hash算法的情況下聚合函數的實現,在這種情況下會先排序后執行聚合,在ExecAgg節點執行前,已完成排序的操作.下面介紹在已完成排序的情況下聚合的實現,主要實現函數是ExecAgg->agg_retrieve_direct.
下面是不使用HashAggregate情況下GroupAggregate的計劃樹:
",,,,,"select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;",,,"psql"
2019-05-16 12:04:45.621 CST,"xdb","testdb",1545,"[local]",5cdce11a.609,5,"SELECT",2019-05-16 12:03:38 CST,3/4,0,LOG,00000,"plan:"," {PLANNEDSTMT
:commandType 1
:queryId 0
:hasReturning false
:hasModifyingCTE false
:canSetTag true
:transientPlan false
:dependsOnRole false
:parallelModeNeeded false
:jitFlags 0
:planTree
{AGG
:startup_cost 52.67
:total_cost 64.42
:plan_rows 200
:plan_width 98
:parallel_aware false
:parallel_safe true
:plan_node_id 0
:targetlist (...
)
:qual <>
:lefttree
{SORT
:startup_cost 52.67
:total_cost 54.52
:plan_rows 740
:plan_width 66
:parallel_aware false
:parallel_safe true
:plan_node_id 1
:targetlist (...
)
:qual <>
:lefttree
{SEQSCAN
:startup_cost 0.00
:total_cost 17.40
:plan_rows 740
:plan_width 66
:parallel_aware false
:parallel_safe true
:plan_node_id 2
:targetlist (...
)
:qual <>
:lefttree <>
:righttree <>
:initPlan <>
:extParam (b)
:allParam (b)
:scanrelid 1
}
:righttree <>
:initPlan <>
:extParam (b)
:allParam (b)
:numCols 1
:sortColIdx 1
:sortOperators 664
:collations 100
:nullsFirst false
}
:righttree <>
:initPlan <>
:extParam (b)
:allParam (b)
:aggstrategy 1
:aggsplit 0
:numCols 1
:grpColIdx 1
:grpOperators 98
:numGroups 200
:aggParams (b)
:groupingSets <>
:chain <>
}
:rtable (...
)
:resultRelations <>
:nonleafResultRelations <>
:rootResultRelations <>
:subplans <>
:rewindPlanIDs (b)
:rowMarks <>
:relationOids (o 270375)
:invalItems <>
:paramExecTypes <>
:utilityStmt <>
:stmt_location 0
:stmt_len 63
}
可以看到,在ExecAgg前會先執行ExecSort.
AggState
聚合函數執行時狀態結構體,內含AggStatePerAgg等結構體
/* ---------------------
* AggState information
*
* ss.ss_ScanTupleSlot refers to output of underlying plan.
* ss.ss_ScanTupleSlot指的是基礎計劃的輸出.
* (ss = ScanState,ps = PlanState)
*
* Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and
* ecxt_aggnulls arrays, which hold the computed agg values for the current
* input group during evaluation of an Agg node's output tuple(s). We
* create a second ExprContext, tmpcontext, in which to evaluate input
* expressions and run the aggregate transition functions.
* 注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls數組,
* 這兩個數組保存了在計算agg節點的輸出元組時當前輸入組已計算的agg值.
* ---------------------
*/
/* these structs are private in nodeAgg.c: */
//在nodeAgg.c中私有的結構體
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
//第一個字段是NodeTag(繼承自ScanState)
ScanState ss; /* its first field is NodeTag */
//targetlist和quals中所有的Aggref
List *aggs; /* all Aggref nodes in targetlist & quals */
//鏈表的大小(可以為0)
int numaggs; /* length of list (could be zero!) */
//pertrans條目大小
int numtrans; /* number of pertrans items */
//Agg策略模式
AggStrategy aggstrategy; /* strategy mode */
//agg-splitting模式,參見nodes.h
AggSplit aggsplit; /* agg-splitting mode, see nodes.h */
//指向當前步驟數據的指針
AggStatePerPhase phase; /* pointer to current phase data */
//步驟數(包括0)
int numphases; /* number of phases (including phase 0) */
//當前步驟
int current_phase; /* current phase number */
//per-Aggref信息
AggStatePerAgg peragg; /* per-Aggref information */
//per-Trans狀態信息
AggStatePerTrans pertrans; /* per-Trans state information */
//長生命周期數據的ExprContexts(hashtable)
ExprContext *hashcontext; /* econtexts for long-lived data (hashtable) */
////長生命周期數據的ExprContexts(每一個GS使用)
ExprContext **aggcontexts; /* econtexts for long-lived data (per GS) */
//輸入表達式的ExprContext
ExprContext *tmpcontext; /* econtext for input expressions */
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
//當前活躍的aggcontext
ExprContext *curaggcontext; /* currently active aggcontext */
//當前活躍的aggregate(如存在)
AggStatePerAgg curperagg; /* currently active aggregate, if any */
#define FIELDNO_AGGSTATE_CURPERTRANS 16
//當前活躍的trans state
AggStatePerTrans curpertrans; /* currently active trans state, if any */
//輸入結束?
bool input_done; /* indicates end of input */
//Agg掃描結束?
bool agg_done; /* indicates completion of Agg scan */
//最后一個grouping set
int projected_set; /* The last projected grouping set */
#define FIELDNO_AGGSTATE_CURRENT_SET 20
//將要解析的當前grouping set
int current_set; /* The current grouping set being evaluated */
//當前投影操作的分組列
Bitmapset *grouped_cols; /* grouped cols in current projection */
//倒序的分組列鏈表
List *all_grouped_cols; /* list of all grouped cols in DESC order */
/* These fields are for grouping set phase data */
//-------- 下面的列用于grouping set步驟數據
//所有步驟中最大的sets大小
int maxsets; /* The max number of sets in any phase */
//所有步驟的數組
AggStatePerPhase phases; /* array of all phases */
//對于phases > 1,已排序的輸入信息
Tuplesortstate *sort_in; /* sorted input to phases > 1 */
//對于下一個步驟,輸入已拷貝
Tuplesortstate *sort_out; /* input is copied here for next phase */
//排序結果的slot
TupleTableSlot *sort_slot; /* slot for sort results */
/* these fields are used in AGG_PLAIN and AGG_SORTED modes: */
//------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
//per-group指針的grouping set編號數組
AggStatePerGroup *pergroups; /* grouping set indexed array of per-group
* pointers */
//當前組的第一個元組拷貝
HeapTuple grp_firstTuple; /* copy of first tuple of current group */
/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
//--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
//是否已填充hash表?
bool table_filled; /* hash table filled yet? */
//hash桶數?
int num_hashes;
//相應的哈希表數據數組
AggStatePerHash perhash; /* array of per-hashtable data */
//per-group指針的grouping set編號數組
AggStatePerGroup *hash_pergroup; /* grouping set indexed array of
* per-group pointers */
/* support for evaluation of agg input expressions: */
//---------- agg輸入表達式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
//首先是->pergroups,然后是hash_pergroup
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
* ->hash_pergroup */
//投影實現機制
ProjectionInfo *combinedproj; /* projection machinery */
} AggState;
/* Primitive options supported by nodeAgg.c: */
//nodeag .c支持的基本選項
#define AGGSPLITOP_COMBINE 0x01 /* substitute combinefn for transfn */
#define AGGSPLITOP_SKIPFINAL 0x02 /* skip finalfn, return state as-is */
#define AGGSPLITOP_SERIALIZE 0x04 /* apply serializefn to output */
#define AGGSPLITOP_DESERIALIZE 0x08 /* apply deserializefn to input */
/* Supported operating modes (i.e., useful combinations of these options): */
//支持的操作模式
typedef enum AggSplit
{
/* Basic, non-split aggregation: */
//基本 : 非split聚合
AGGSPLIT_SIMPLE = 0,
/* Initial phase of partial aggregation, with serialization: */
//部分聚合的初始步驟,序列化
AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
/* Final phase of partial aggregation, with deserialization: */
//部分聚合的最終步驟,反序列化
AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
/* Test whether an AggSplit value selects each primitive option: */
//測試AggSplit選擇了哪些基本選項
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
agg_retrieve_direct
agg_retrieve_direct計算聚合的最終結果,適用于不使用Hash算法的情況.
/*
* ExecAgg for non-hashed case
* 適用于不使用Hash算法的情況.
*/
static TupleTableSlot *
agg_retrieve_direct(AggState *aggstate)
{
Agg *node = aggstate->phase->aggnode;//aggstate Node
ExprContext *econtext;//表達式解析上下文
ExprContext *tmpcontext;//臨時上下文
AggStatePerAgg peragg;//聚合
AggStatePerGroup *pergroups;//分組信息
TupleTableSlot *outerslot;//outer元組slot
TupleTableSlot *firstSlot;//第1個slot
TupleTableSlot *result;//結果元組
bool hasGroupingSets = aggstate->phase->numsets > 0;//是否有grouping set
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int currentSet;
int nextSetSize;
int numReset;
int i;
/*
* get state info from node
* 獲取狀態信息
*
* econtext is the per-output-tuple expression context
* econtext是per-output-tuple表達式上下文
*
* tmpcontext is the per-input-tuple expression context
* tmpcontext是per-input-tuple表達式上下文
*/
econtext = aggstate->ss.ps.ps_ExprContext;
tmpcontext = aggstate->tmpcontext;
peragg = aggstate->peragg;
pergroups = aggstate->pergroups;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
/*
* We loop retrieving groups until we find one matching
* aggstate->ss.ps.qual
* 循環檢索分組直至找到一個匹配aggstate->ss.ps.qual表達式的分組.
*
* For grouping sets, we have the invariant that aggstate->projected_set
* is either -1 (initial call) or the index (starting from 0) in
* gset_lengths for the group we just completed (either by projecting a
* row or by discarding it in the qual).
* 對于grouping set,aggstate->projected_set是個不變量,
* 要么是-1(初始調用),要么是已完成的分組在gset_lengths中的索引編號(從0開始)
* (通過投影一行或者在表達式中丟棄一行實現)
*/
while (!aggstate->agg_done)
{
//----------- 循環處理
/*
* Clear the per-output-tuple context for each group, as well as
* aggcontext (which contains any pass-by-ref transvalues of the old
* group). Some aggregate functions store working state in child
* contexts; those now get reset automatically without us needing to
* do anything special.
* 跟aggcontext(包含原分組通過引用傳遞的轉換值)一樣,每一個分組都會重置per-output-tuple上下文.
* 某些聚合函數在子上下文中存儲工作狀態,這種情況下,不需要做額外的工作,會自動重置.
*
* We use ReScanExprContext not just ResetExprContext because we want
* any registered shutdown callbacks to be called. That allows
* aggregate functions to ensure they've cleaned up any non-memory
* resources.
* 使用ReScanExprContext而不是ResetExprContext是因為我們希望所有已注冊的shutdown回調函數可以調用.
* 這可以允許聚合函數確保它們已清理了所有非內存類資源.
*/
ReScanExprContext(econtext);
/*
* Determine how many grouping sets need to be reset at this boundary.
* 確定有多少grouping sets在此邊界下需要重置.
*/
if (aggstate->projected_set >= 0 &&
aggstate->projected_set < numGroupingSets)
numReset = aggstate->projected_set + 1;
else
numReset = numGroupingSets;
/*
* numReset can change on a phase boundary, but that's OK; we want to
* reset the contexts used in _this_ phase, and later, after possibly
* changing phase, initialize the right number of aggregates for the
* _new_ phase.
* numReset可能在每個階段的邊界處出現變化,但這樣也不會出現問題.
* 我們希望重置在該階段的上下文,并在稍后在可能變化的階段之后,為新的階段初始化正確的聚合編號.
*/
for (i = 0; i < numReset; i++)
{
ReScanExprContext(aggstate->aggcontexts[i]);
}
/*
* Check if input is complete and there are no more groups to project
* in this phase; move to next phase or mark as done.
* 檢查輸入是否完成并且沒有更多的組在本階段用于投影.
* 移到下一個階段或者標記為已完成.
*/
if (aggstate->input_done == true &&
aggstate->projected_set >= (numGroupingSets - 1))
{
if (aggstate->current_phase < aggstate->numphases - 1)
{
//仍在處理中
initialize_phase(aggstate, aggstate->current_phase + 1);
aggstate->input_done = false;
aggstate->projected_set = -1;
numGroupingSets = Max(aggstate->phase->numsets, 1);
node = aggstate->phase->aggnode;
numReset = numGroupingSets;
}
else if (aggstate->aggstrategy == AGG_MIXED)
{
//照理,不會進入這個分支(AGG_MIXED不是Hash才有嗎?)
/*
* Mixed mode; we've output all the grouped stuff and have
* full hashtables, so switch to outputting those.
*/
initialize_phase(aggstate, 0);
aggstate->table_filled = true;
ResetTupleHashIterator(aggstate->perhash[0].hashtable,
&aggstate->perhash[0].hashiter);
select_current_set(aggstate, 0, true);
return agg_retrieve_hash_table(aggstate);
}
else
{
//已完成處理
aggstate->agg_done = true;
break;
}
}
/*
* Get the number of columns in the next grouping set after the last
* projected one (if any). This is the number of columns to compare to
* see if we reached the boundary of that set too.
* 在最后一次投影操作后獲得下一個grouping set的列數.
* 這是要比較的列數,看看我們是否也達到了集合的邊界。
*/
if (aggstate->projected_set >= 0 &&
aggstate->projected_set < (numGroupingSets - 1))
nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
else
nextSetSize = 0;
/*----------
* If a subgroup for the current grouping set is present, project it.
* 如果子分組已存在,則執行投影.
*
* We have a new group if:
* - we're out of input but haven't projected all grouping sets
* (checked above)
* OR
* - we already projected a row that wasn't from the last grouping
* set
* AND
* - the next grouping set has at least one grouping column (since
* empty grouping sets project only once input is exhausted)
* AND
* - the previous and pending rows differ on the grouping columns
* of the next grouping set
*
* 如果出現下面情況,則會有新的分組:
* - 已完成輸入處理,但仍未投影所有的grouping set(上面會執行檢查)
* - 已投影了一行,但這一行并不是從最后一個grouping set而來的
* 同時
* - 下一個grouping set至少有要一個grouping列(因為空grouping sets投影一次輸入就銷毀了)
* 同時
* - 上一個和接下來的行與下一個grouping set中的分組列不同
*----------
*/
tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
if (aggstate->input_done ||
(node->aggstrategy != AGG_PLAIN &&
aggstate->projected_set != -1 &&
aggstate->projected_set < (numGroupingSets - 1) &&
nextSetSize > 0 &&
!ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
tmpcontext)))
{
aggstate->projected_set += 1;
Assert(aggstate->projected_set < numGroupingSets);
Assert(nextSetSize > 0 || aggstate->input_done);
}
else
{
/*
* We no longer care what group we just projected, the next
* projection will always be the first (or only) grouping set
* (unless the input proves to be empty).
* 不再關心剛才已投影的分組,下一個投影通常會是第一個grouping set(除非輸入已驗證為空)
*/
aggstate->projected_set = 0;
/*
* If we don't already have the first tuple of the new group,
* fetch it from the outer plan.
* 如果不再有新分組的第一個元組,則從outer plan中提取一行.
*/
if (aggstate->grp_firstTuple == NULL)
{
//提取一行
outerslot = fetch_input_tuple(aggstate);
if (!TupIsNull(outerslot))
{
//成功提取一行
/*
* Make a copy of the first input tuple; we will use this
* for comparisons (in group mode) and for projection.
* 拷貝之
*/
aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
}
else
{
/* outer plan produced no tuples at all */
//不再產生新行
if (hasGroupingSets)
{
//----------- 存在grouping set
/*
* If there was no input at all, we need to project
* rows only if there are grouping sets of size 0.
* Note that this implies that there can't be any
* references to ungrouped Vars, which would otherwise
* cause issues with the empty output slot.
* 如果根本就不存在輸入,只需要在大小為0的grouping set上投影哪些行即可.
* 注意這意味著不能依賴未分組的Vars,否則的話會導致輸出slot為空.
*
* XXX: This is no longer true, we currently deal with
* this in finalize_aggregates().
* XXX: 這已不復存在,已在finalize_aggregates中進行處理.
*/
aggstate->input_done = true;
while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
{
aggstate->projected_set += 1;
if (aggstate->projected_set >= numGroupingSets)
{
/*
* We can't set agg_done here because we might
* have more phases to do, even though the
* input is empty. So we need to restart the
* whole outer loop.
* 就算輸入為空,但也不能在這里還設置agg_done為T,因為可能還有后續的階段需要處理.
* 因此需要重啟整個外循環.
*/
break;
}
}
if (aggstate->projected_set >= numGroupingSets)
continue;
}
else
{
aggstate->agg_done = true;
/* If we are grouping, we should produce no tuples too */
if (node->aggstrategy != AGG_PLAIN)
return NULL;
}
}
}
/*
* Initialize working state for a new input tuple group.
* 為新輸入的元組組初始化工作狀態.
*/
initialize_aggregates(aggstate, pergroups, numReset);
if (aggstate->grp_firstTuple != NULL)
{
/*
* Store the copied first input tuple in the tuple table slot
* reserved for it. The tuple will be deleted when it is
* cleared from the slot.
* 在元組表slot中拷貝存儲第一個輸入元組.
* 該元組在清理slot時會被刪除.
*/
ExecStoreTuple(aggstate->grp_firstTuple,
firstSlot,
InvalidBuffer,
true);
aggstate->grp_firstTuple = NULL; /* 不需要保留雙份指針. don't keep two pointers */
/* set up for first advance_aggregates call */
//為第一次advance_aggregates調用設置參數
tmpcontext->ecxt_outertuple = firstSlot;
/*
* Process each outer-plan tuple, and then fetch the next one,
* until we exhaust the outer plan or cross a group boundary.
* 處理每一個outer-plan元組,然后提取下一個,
* 直至outer plan已消耗完畢或者已跨越分組邊界.
*/
for (;;)
{
/*
* During phase 1 only of a mixed agg, we need to update
* hashtables as well in advance_aggregates.
* 只有在混合AGG的第一階段,我們還需要在advance_aggregates中更新哈希表.
*/
if (aggstate->aggstrategy == AGG_MIXED &&
aggstate->current_phase == 1)
{
lookup_hash_entries(aggstate);
}
/* Advance the aggregates (or combine functions) */
//推動聚合(或者組合函數)
advance_aggregates(aggstate);
/* Reset per-input-tuple context after each tuple */
//在每一個元組后重置per-input-tuple上下文
ResetExprContext(tmpcontext);
outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot))
{
/* no more outer-plan tuples available */
//已無更多可用的outer slot
if (hasGroupingSets)
{
aggstate->input_done = true;
break;
}
else
{
aggstate->agg_done = true;
break;
}
}
/* set up for next advance_aggregates call */
//為下一次advance_aggregates調用作準備
tmpcontext->ecxt_outertuple = outerslot;
/*
* If we are grouping, check whether we've crossed a group
* boundary.
* 如果是分組,檢查是否已跨越分組邊界.
*/
if (node->aggstrategy != AGG_PLAIN)
{
tmpcontext->ecxt_innertuple = firstSlot;
if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
tmpcontext))
{
aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
break;
}
}
}
}
/*
* Use the representative input tuple for any references to
* non-aggregated input columns in aggregate direct args, the node
* qual, and the tlist. (If we are not grouping, and there are no
* input rows at all, we will come here with an empty firstSlot
* ... but if not grouping, there can't be any references to
* non-aggregated input columns, so no problem.)
* 對于聚合直接參數/節點表達式和投影列tlist中的非聚合輸入列的引用,使用代表性的輸入元組.
* (如果不是grouping而且沒有輸入元組,將使用空的firstSlot,但如果是非grouping,
* 不可能存在依賴非聚合輸入列,因此不會存在問題)
*/
econtext->ecxt_outertuple = firstSlot;
}
Assert(aggstate->projected_set >= 0);
currentSet = aggstate->projected_set;
//投影處理
prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
select_current_set(aggstate, currentSet, false);
finalize_aggregates(aggstate,
peragg,
pergroups[currentSet]);
/*
* If there's no row to project right now, we must continue rather
* than returning a null since there might be more groups.
* 如不需要馬上進行進行投影,必須繼續執行而不是返回NULL,因為還需要處理更多的groups.
*/
result = project_aggregates(aggstate);
if (result)
return result;
}
/* No more groups */
//DONE!
return NULL;
}
測試腳本
-- 禁用并行
set max_parallel_workers_per_gather=0;
-- 禁用hashagg
set enable_hashagg = off;
select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;
跟蹤分析
(gdb) b agg_retrieve_direct
Breakpoint 1 at 0x6ee511: file nodeAgg.c, line 1572.
(gdb) c
Continuing.
Breakpoint 1, agg_retrieve_direct (aggstate=0x268f640) at nodeAgg.c:1572
1572 Agg *node = aggstate->phase->aggnode;
輸入參數
(gdb) p *aggstate
$1 = {ss = {ps = {type = T_AggState, plan = 0x25af578, state = 0x268f428, ExecProcNode = 0x6ee438 <ExecAgg>,
ExecProcNodeReal = 0x6ee438 <ExecAgg>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x268faf0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x2690d50, ps_ExprContext = 0x268fa30, ps_ProjInfo = 0x2690e90, scandesc = 0x26907a0},
ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x2690a78}, aggs = 0x25d4290, numaggs = 3,
numtrans = 3, aggstrategy = AGG_SORTED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x2691290, numphases = 2, current_phase = 1,
peragg = 0x2690f28, pertrans = 0x26b24d0, hashcontext = 0x0, aggcontexts = 0x268f858, tmpcontext = 0x268f878,
curaggcontext = 0x268f970, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1,
current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x0, maxsets = 1, phases = 0x2691258, sort_in = 0x0,
sort_out = 0x0, sort_slot = 0x0, pergroups = 0x25d4da0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 0,
perhash = 0x0, hash_pergroup = 0x0, all_pergroups = 0x25d4da0, combinedproj = 0x0}
需要2個階段,分別是AGG_PLAIN/AGG_SORTED
(gdb) p aggstate->phases[0]
$2 = {aggstrategy = AGG_PLAIN, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x0, aggnode = 0x0,
sortnode = 0x0, evaltrans = 0x0}
(gdb) p aggstate->phases[1]
$3 = {aggstrategy = AGG_SORTED, numsets = 0, gset_lengths = 0x0, grouped_cols = 0x0, eqfunctions = 0x25d4388,
aggnode = 0x25af578, sortnode = 0x0, evaltrans = 0x25d5488}
不存在grouping set.
變量numGroupingSets設置為1
(gdb) n
1580 bool hasGroupingSets = aggstate->phase->numsets > 0;
(gdb) p aggstate->phase->numsets
$5 = 0
(gdb) n
1581 int numGroupingSets = Max(aggstate->phase->numsets, 1);
(gdb) n
1594 econtext = aggstate->ss.ps.ps_ExprContext;
(gdb) p numGroupingSets
$6 = 1
設置內存上下文
(gdb) n
1595 tmpcontext = aggstate->tmpcontext;
(gdb)
1597 peragg = aggstate->peragg;
(gdb) p *econtext
$7 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0,
ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x26a6370, ecxt_param_exec_vals = 0x0,
ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x25d4d48, ecxt_aggnulls = 0x25d4d80, caseValue_datum = 0,
caseValue_isNull = true, domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}
(gdb) p *tmpcontext
$8 = {type = T_ExprContext, ecxt_scantuple = 0x0, ecxt_innertuple = 0x0, ecxt_outertuple = 0x0,
ecxt_per_query_memory = 0x268f310, ecxt_per_tuple_memory = 0x2691320, ecxt_param_exec_vals = 0x0,
ecxt_param_list_info = 0x0, ecxt_aggvalues = 0x0, ecxt_aggnulls = 0x0, caseValue_datum = 0, caseValue_isNull = true,
domainValue_datum = 0, domainValue_isNull = true, ecxt_estate = 0x268f428, ecxt_callbacks = 0x0}
(gdb)
獲取聚合信息,一共有3個
(gdb) n
1598 pergroups = aggstate->pergroups;
(gdb)
1599 firstSlot = aggstate->ss.ss_ScanTupleSlot;
(gdb) p *peragg
$9 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[0]
$10 = {aggref = 0x26a02d0, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[1]
$11 = {aggref = 0x26a0048, transno = 1, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p peragg[2]
$12 = {aggref = 0x269fdc0, transno = 2, finalfn_oid = 1964, finalfn = {fn_addr = 0x978251 <int8_avg>, fn_oid = 1964,
fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x268f310,
fn_expr = 0x25d5190}, numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = -1, resulttypeByVal = false,
shareable = true}
分組只有一個
(gdb) p pergroups[0]
$14 = (AggStatePerGroup) 0x25d4dc0
(gdb) p *pergroups[0]
$15 = {transValue = 0, transValueIsNull = false, noTransValue = false}
進入循環
(gdb) n
1610 while (!aggstate->agg_done)
(gdb)
1624 ReScanExprContext(econtext);
(gdb)
重置內存上下文
(gdb)
1629 if (aggstate->projected_set >= 0 &&
(gdb)
1633 numReset = numGroupingSets;
(gdb)
1642 for (i = 0; i < numReset; i++)
(gdb)
1644 ReScanExprContext(aggstate->aggcontexts[i]);
(gdb)
1642 for (i = 0; i < numReset; i++)
(gdb)
檢查輸入是否已完成處理/本組已完成投影(實際不滿足條件)
(gdb)
1651 if (aggstate->input_done == true &&
(gdb)
1688 if (aggstate->projected_set >= 0 &&
(gdb) p aggstate->input_done
$16 = false
(gdb) p aggstate->projected_set
$17 = -1
設置待處理的元組,為NULL
(gdb)
1711 tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
(gdb)
1712 if (aggstate->input_done ||
(gdb)
(gdb) p *tmpcontext->ecxt_innertuple
Cannot access memory at address 0x0
如果子分組已存在,則執行投影(實際不滿足條件).
(gdb) n
1713 (node->aggstrategy != AGG_PLAIN &&
(gdb) p node->aggstrategy
$18 = AGG_SORTED
(gdb) n
1712 if (aggstate->input_done ||
(gdb)
1714 aggstate->projected_set != -1 &&
(gdb)
1713 (node->aggstrategy != AGG_PLAIN &&
(gdb)
1732 aggstate->projected_set = 0;
(gdb) p aggstate->input_done
$19 = false
(gdb) p aggstate->projected_set
$20 = -1
(gdb) p node->aggstrategy
$21 = AGG_SORTED
(gdb)
從outer plan中提取一行,并拷貝為首行
(gdb) n
1738 if (aggstate->grp_firstTuple == NULL)
(gdb) p aggstate->grp_firstTuple
$22 = (HeapTuple) 0x0
(gdb) n
1740 outerslot = fetch_input_tuple(aggstate);
(gdb)
1741 if (!TupIsNull(outerslot))
(gdb) p *outerslot
$23 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,
tts_tuple = 0x26909f8, tts_tupleDescriptor = 0x26907a0, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 0,
tts_values = 0x2690a18, tts_isnull = 0x2690a30, tts_mintuple = 0x26b8ad8, tts_minhdr = {t_len = 40, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x26b8ad0}, tts_off = 0,
tts_fixedTupleDescriptor = true}
(gdb)
(gdb) n
1747 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
(gdb)
為新輸入的元組組初始化工作狀態.
(gdb)
1797 initialize_aggregates(aggstate, pergroups, numReset);
把元組拷貝到內存上下文中,并執行聚合運算(advance_aggregates)
(gdb) n
1799 if (aggstate->grp_firstTuple != NULL)
(gdb)
1806 ExecStoreTuple(aggstate->grp_firstTuple,
(gdb)
1810 aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
(gdb)
1813 tmpcontext->ecxt_outertuple = firstSlot;
(gdb)
1825 if (aggstate->aggstrategy == AGG_MIXED &&
(gdb)
1832 advance_aggregates(aggstate);
(gdb)
1835 ResetExprContext(tmpcontext);
(gdb)
繼續提取行,拷貝到內存上下文中
(gdb) n
1837 outerslot = fetch_input_tuple(aggstate);
(gdb)
1838 if (TupIsNull(outerslot))
(gdb)
1853 tmpcontext->ecxt_outertuple = outerslot;
(gdb)
1859 if (node->aggstrategy != AGG_PLAIN)
(gdb)
1861 tmpcontext->ecxt_innertuple = firstSlot;
(gdb)
1862 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
(gdb)
1869 }
執行聚合運算,并繼續提取下一行
825 if (aggstate->aggstrategy == AGG_MIXED &&
(gdb)
1832 advance_aggregates(aggstate);
(gdb)
1835 ResetExprContext(tmpcontext);
(gdb)
1837 outerslot = fetch_input_tuple(aggstate);
(gdb)
1838 if (TupIsNull(outerslot))
(gdb)
1853 tmpcontext->ecxt_outertuple = outerslot;
(gdb)
1859 if (node->aggstrategy != AGG_PLAIN)
(gdb)
1861 tmpcontext->ecxt_innertuple = firstSlot;
(gdb)
如果是分組,檢查是否已跨越分組邊界,如已越界在跳出循環.
1862 if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
(gdb)
1865 aggstate->grp_firstTuple = ExecCopySlotTuple(outerslot);
(gdb)
1866 break;
已獲得一行結果行,返回結果
(gdb)
1880 econtext->ecxt_outertuple = firstSlot;
(gdb) n
1883 Assert(aggstate->projected_set >= 0);
(gdb)
1885 currentSet = aggstate->projected_set;
(gdb)
1887 prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);
(gdb) p aggstate->projected_set
$24 = 0
(gdb) n
1889 select_current_set(aggstate, currentSet, false);
(gdb)
1893 pergroups[currentSet]);
(gdb)
1891 finalize_aggregates(aggstate,
(gdb)
1899 result = project_aggregates(aggstate);
(gdb)
1900 if (result)
(gdb)
1901 return result;
(gdb) p *result
$25 = {type = T_TupleTableSlot, tts_isempty = false, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,
tts_tuple = 0x0, tts_tupleDescriptor = 0x2690b38, tts_mcxt = 0x268f310, tts_buffer = 0, tts_nvalid = 4,
tts_values = 0x2690db0, tts_isnull = 0x2690dd0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}
(gdb)
DONE!
PostgreSQL 源碼解讀(178)- 查詢#95(聚合函數)#1相關數據結構
PostgreSQL 源碼解讀(186)- 查詢#102(聚合函數#7-advance_aggregates)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。