On 29/12/22 20:58, Ankit Kumar Pandey wrote:
On 24/12/22 18:22, Ankit Pandey wrote:
Hi,
This is a PoC patch which implements distinct operation in window
aggregates (without order by and for single column aggregation, final
version may vary wrt these limitations). Purpose of this PoC is to
get feedback on the approach used and corresponding implementation,
any nitpicking as deemed reasonable.
Distinct operation is mirrored from implementation in nodeAgg.
Existing partitioning logic determines if row is in partition and
when distinct is required, all tuples for the aggregate column are
stored in tuplesort. When finalize_windowaggregate gets called,
tuples are sorted and duplicates are removed, followed by calling the
transition function on each tuple.
When distinct is not required, the above process is skipped and the
transition function gets called directly and nothing gets inserted
into tuplesort.
Note: For each partition, in tuplesort_begin and tuplesort_end is
involved to rinse tuplesort, so at any time, max tuples in tuplesort
is equal to tuples in a particular partition.
I have verified it for interger and interval column aggregates (to
rule out obvious issues related to data types).
Sample cases:
create table mytable(id int, name text);
insert into mytable values(1, 'A');
insert into mytable values(1, 'A');
insert into mytable values(5, 'B');
insert into mytable values(3, 'A');
insert into mytable values(1, 'A');
select avg(distinct id) over (partition by name) from mytable;
avg
--------------------
2.0000000000000000
2.0000000000000000
2.0000000000000000
2.0000000000000000
5.0000000000000000
select avg(id) over (partition by name) from mytable;
avg
--------------------
1.5000000000000000
1.5000000000000000
1.5000000000000000
1.5000000000000000
5.0000000000000000
select avg(distinct id) over () from mytable;
avg
--------------------
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
3.0000000000000000
select avg(distinct id) from mytable;
avg
--------------------
3.0000000000000000
This is my first-time contribution. Please let me know if anything
can be
improved as I`m eager to learn.
Regards,
Ankit Kumar Pandey
Hi all,
I know everyone is busy with holidays (well, Happy Holidays!) but I
will be glad if someone can take a quick look at this PoC and share
thoughts.
This is my first time contribution so I am pretty sure there will be
some very obvious feedbacks (which will help me to move forward with
this change).
Updated patch with latest master. Last patch was an year old.
--
Regards,
Ankit Kumar Pandey
From cf56d545bb837fc8f1a7630ea4417680256eddd4 Mon Sep 17 00:00:00 2001
From: Ankit Kumar Pandey <itsanki...@gmail.com>
Date: Wed, 23 Nov 2022 00:38:01 +0530
Subject: [PATCH] Implement distinct in Window Aggregates.
---
src/backend/executor/nodeWindowAgg.c | 227 +++++++++++++++++++++++----
src/backend/optimizer/util/clauses.c | 2 +
src/backend/parser/parse_agg.c | 45 ++++++
src/backend/parser/parse_func.c | 19 +--
src/include/nodes/execnodes.h | 1 +
src/include/nodes/primnodes.h | 2 +
6 files changed, 257 insertions(+), 39 deletions(-)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index d61d57e9a8..2d685f3be4 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -154,6 +154,14 @@ typedef struct WindowStatePerAggData
int64 transValueCount; /* number of currently-aggregated rows */
+ /* For DISTINCT in Aggregates */
+ Datum lastdatum; /* used for single-column DISTINCT */
+ FmgrInfo equalfnOne; /* single-column comparisons*/
+
+ Oid *eq_ops; /* used for equality check in DISTINCT */
+ Oid *sort_ops; /* used for sorting distinct columns */
+ bool sort_in; /* FLAG set true if data is stored in tuplesort */
+
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
@@ -163,7 +171,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
WindowStatePerAgg peraggstate);
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate);
+ WindowStatePerAgg peraggstate, Datum value, bool isNull);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
@@ -173,6 +181,9 @@ static void finalize_windowaggregate(WindowAggState *winstate,
Datum *result, bool *isnull);
static void eval_windowaggregates(WindowAggState *winstate);
+static void process_ordered_windowaggregate_single(WindowAggState *winstate,
+ WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate);
static void eval_windowfunction(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
Datum *result, bool *isnull);
@@ -230,6 +241,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
+ peraggstate->lastdatum = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@@ -240,43 +252,21 @@ initialize_windowaggregate(WindowAggState *winstate,
static void
advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
- WindowStatePerAgg peraggstate)
+ WindowStatePerAgg peraggstate, Datum value, bool isNull)
{
LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
- WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
Datum newVal;
- ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
- ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
- /* Skip anything FILTERed out */
- if (filter)
- {
- bool isnull;
- Datum res = ExecEvalExpr(filter, econtext, &isnull);
-
- if (isnull || !DatumGetBool(res))
- {
- MemoryContextSwitchTo(oldContext);
- return;
- }
- }
-
/* We start from 1, since the 0th arg will be the transition value */
- i = 1;
- foreach(arg, wfuncstate->args)
- {
- ExprState *argstate = (ExprState *) lfirst(arg);
- fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
- &fcinfo->args[i].isnull);
- i++;
- }
+ fcinfo->args[1].value = value;
+ fcinfo->args[1].isnull = isNull;
if (peraggstate->transfn.fn_strict)
{
@@ -585,6 +575,10 @@ finalize_windowaggregate(WindowAggState *winstate,
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
+ /* Run transition function for distinct agg */
+ if (perfuncstate->wfunc->aggdistinct)
+ process_ordered_windowaggregate_single(winstate, perfuncstate, peraggstate);
+
/*
* Apply the agg's finalfn if one is provided, else return transValue.
*/
@@ -666,6 +660,16 @@ eval_windowaggregates(WindowAggState *winstate)
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
+ ExprState *filter;
+ bool isnull;
+ WindowFuncExprState *wfuncstate;
+ ListCell *arg;
+ Datum tuple;
+ ExprContext *aggecontext;
+ ListCell *lc;
+ Oid inputTypes[FUNC_MAX_ARGS];
+ WindowStatePerFunc perfuncstate;
+
numaggs = winstate->numaggs;
if (numaggs == 0)
return; /* nothing to do */
@@ -893,6 +897,23 @@ eval_windowaggregates(WindowAggState *winstate)
}
}
+ perfuncstate = &winstate->perfunc[wfuncno];
+ /* Initialize tuplesort for new partition */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ i = 0;
+ foreach(lc, perfuncstate->wfunc->args)
+ {
+ inputTypes[i++] = exprType((Node *) lfirst(lc));
+ }
+ winstate->sortstates =
+ tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ perfuncstate->wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -928,6 +949,7 @@ eval_windowaggregates(WindowAggState *winstate)
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
agg_row_slot))
break; /* must be end of partition */
+
}
/*
@@ -935,14 +957,16 @@ eval_windowaggregates(WindowAggState *winstate)
* current row is not in frame but there might be more in the frame.
*/
ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
+
if (ret < 0)
break;
+
if (ret == 0)
goto next_tuple;
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-
+
/* Accumulate row into the aggregates */
for (i = 0; i < numaggs; i++)
{
@@ -954,9 +978,51 @@ eval_windowaggregates(WindowAggState *winstate)
continue;
wfuncno = peraggstate->wfuncno;
- advance_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ perfuncstate = &winstate->perfunc[wfuncno];
+
+ aggecontext = winstate->tmpcontext;
+
+ wfuncstate = perfuncstate->wfuncstate;
+ filter = wfuncstate->aggfilter;
+
+ oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
+
+ /* Skip anything FILTERed out for aggregates */
+ if (perfuncstate->plain_agg && wfuncstate->aggfilter)
+ {
+ Datum res = ExecEvalExpr(filter, aggecontext, &isnull);
+
+ if (isnull || !DatumGetBool(res))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ }
+
+ /* Fetch tuple and either put them in tuplesort for removal
+ * of duplicates and running partition later or run transition
+ * function right away
+ */
+ foreach(arg, wfuncstate->args)
+ {
+
+ ExprState *argstate = (ExprState *) lfirst(arg);
+ tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
+
+ /* Store in tuplestore */
+ if (perfuncstate->wfunc->aggdistinct)
+ {
+ tuplesort_putdatum(winstate->sortstates, tuple, isnull);
+ peraggstate->sort_in = true;
+ }
+ else
+ {
+ advance_windowaggregate(winstate, &winstate->perfunc[wfuncno],
+ peraggstate, tuple, isnull);
+ }
+
+ }
+ MemoryContextSwitchTo(oldContext);
}
next_tuple:
@@ -1012,6 +1078,75 @@ next_tuple:
}
}
+/*
+ * process_ordered_windowaggregate_single
+ * parallel to process_ordered_aggregate_single in nodeAgg.c
+ */
+static void
+process_ordered_windowaggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
+ WindowStatePerAgg peraggstate)
+{
+ Datum newVal;
+ bool isNull;
+ MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
+ MemoryContext oldContext;
+ Datum oldVal = (Datum) 0;
+ bool oldIsNull = true;
+ bool haveOldVal = false;
+
+ if (peraggstate->sort_in){
+ tuplesort_performsort(winstate->sortstates);
+
+ while (tuplesort_getdatum(winstate->sortstates,
+ true, false, &newVal, &isNull, NULL))
+ {
+ MemoryContextReset(workcontext);
+ oldContext = MemoryContextSwitchTo(workcontext);
+
+ /*
+ * Loop over all tuples in current partition
+ * and remove duplicates
+ */
+ if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
+ perfuncstate->winCollation,
+ oldVal, newVal)))
+ {
+ MemoryContextSwitchTo(oldContext);
+ continue;
+ }
+ else
+ {
+ /* Run transition function over each unique tuple */
+ advance_windowaggregate(winstate, perfuncstate,
+ peraggstate, newVal, isNull);
+ }
+ MemoryContextSwitchTo(oldContext);
+
+ if (!peraggstate->resulttypeByVal)
+ {
+ if (!oldIsNull && false)
+ pfree(DatumGetPointer(oldVal));
+ if (!isNull)
+ oldVal = datumCopy(newVal, true,
+ peraggstate->resulttypeLen);
+ }
+ else
+ oldVal = newVal;
+
+ oldIsNull = isNull;
+ haveOldVal = true;
+ oldVal = newVal;
+ }
+
+ }
+ // clear up tuplesort, next partition will
+ // use a new one
+ tuplesort_end(winstate->sortstates);
+ peraggstate->sort_in = false;
+
+}
+
+
/*
* eval_windowfunction
*
@@ -2947,6 +3082,9 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
get_typlenbyval(aggtranstype,
&peraggstate->transtypeLen,
&peraggstate->transtypeByVal);
+ get_typlenbyval(wfunc->wintype,
+ &peraggstate->inputtypeLen,
+ &peraggstate->inputtypeByVal);
/*
* initval is potentially null, so don't try to access it as a struct
@@ -3014,6 +3152,35 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
else
peraggstate->aggcontext = winstate->aggcontext;
+ /* Handle distinct operation in agg */
+ if (wfunc->aggdistinct)
+ {
+ int numDistinctCols = list_length(wfunc->distinctargs);
+ peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
+ peraggstate->sort_ops = palloc(numDistinctCols * sizeof(Oid));
+ /* Use single tuplesort for all partitions by rinsing it again and again */
+ winstate->sortstates = (Tuplesortstate *)
+ palloc0(sizeof(Tuplesortstate *) * 1);
+
+ /* Initialize tuplesort operators namely sort operator to sort tuples
+ * before running equality op to remove/skip duplicates
+ */
+
+ i=0;
+ foreach(lc, wfunc->distinctargs)
+ {
+ peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
+ peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
+ i++;
+ }
+ fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
+ winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
+ peraggstate->sort_ops[0],
+ wfunc->inputcollid,
+ true,
+ work_mem, NULL, TUPLESORT_NONE);
+ }
+
ReleaseSysCache(aggTuple);
return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index aa584848cf..2513dc2981 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2443,6 +2443,8 @@ eval_const_expressions_mutator(Node *node,
newexpr->winref = expr->winref;
newexpr->winstar = expr->winstar;
newexpr->winagg = expr->winagg;
+ newexpr->aggdistinct = expr->aggdistinct;
+ newexpr->distinctargs = expr->distinctargs;
newexpr->location = expr->location;
return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index f7a1046026..87e6ce1626 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1047,6 +1047,51 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
}
}
+ if (wfunc->aggdistinct){
+ List *argtypes = NIL;
+ List *tlist = NIL;
+ List *torder = NIL;
+ List *tdistinct = NIL;
+ AttrNumber attno = 1;
+ ListCell *lc;
+
+ foreach(lc, wfunc->args)
+ {
+ Expr *arg = (Expr *) lfirst(lc);
+ TargetEntry *tle;
+
+ /* We don't bother to assign column names to the entries */
+ tle = makeTargetEntry(arg, attno++, NULL, false);
+ tlist = lappend(tlist, tle);
+ }
+ torder = transformSortClause(pstate,
+ NIL,
+ &tlist,
+ EXPR_KIND_ORDER_BY,
+ true /* force SQL99 rules */ );
+
+ tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
+
+ foreach(lc, tdistinct)
+ {
+ SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
+
+ if (!OidIsValid(sortcl->sortop))
+ {
+ Node *expr = get_sortgroupclause_expr(sortcl, tlist);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an ordering operator for type %s",
+ format_type_be(exprType(expr))),
+ errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
+ parser_errposition(pstate, exprLocation(expr))));
+ }
+ }
+ wfunc->distinctargs = tdistinct;
+ }
+
+
pstate->p_hasWindowFuncs = true;
}
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index ca14f06308..5b5dc9d938 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,15 +835,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
wfunc->aggfilter = agg_filter;
wfunc->location = location;
-
- /*
- * agg_star is allowed for aggregate functions but distinct isn't
- */
- if (agg_distinct)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("DISTINCT is not implemented for window functions"),
- parser_errposition(pstate, location)));
+ wfunc->aggdistinct = agg_distinct;
/*
* Reject attempt to call a parameterless aggregate without (*)
@@ -856,6 +848,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
NameListToString(funcname)),
parser_errposition(pstate, location)));
+ /*
+ * Distinct is not implemented for aggregates with filter
+ */
+ if (agg_distinct && over->orderClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DISTINCT is not implemented for aggregate functions with ORDER BY"),
+ parser_errposition(pstate, location)));
+
/*
* ordered aggs not allowed in windows yet
*/
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 2cd0a4f472..dfcadc8693 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2523,6 +2523,7 @@ typedef struct WindowAggState
* date for current row */
bool grouptail_valid; /* true if grouptailpos is known up to
* date for current row */
+ Tuplesortstate *sortstates;
TupleTableSlot *first_part_slot; /* first tuple of current or next
* partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 83e40e56d3..2991f04d7f 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -495,6 +495,8 @@ typedef struct WindowFunc
Index winref; /* index of associated WindowClause */
bool winstar; /* true if argument list was really '*' */
bool winagg; /* is function a simple aggregate? */
+ bool aggdistinct; /* do we need distinct values for aggregation? */
+ List *distinctargs;
int location; /* token location, or -1 if unknown */
} WindowFunc;
--
2.37.2