On 04/01/23 18:10, Ankit Kumar Pandey wrote:
On 29/12/22 20:58, Ankit Kumar Pandey wrote:
>
> On 24/12/22 18:22, Ankit Pandey wrote:
>> Hi,
>>
>> This is a PoC patch which implements distinct operation in window >> aggregates (without order by and for single column aggregation, final >> version may vary wrt these limitations). Purpose of this PoC is to >> get feedback on the approach used and corresponding implementation, >> any nitpicking as deemed reasonable.
>>
>> Distinct operation is mirrored from implementation in nodeAgg. >> Existing partitioning logic determines if row is in partition and >> when distinct is required, all tuples for the aggregate column are >> stored in tuplesort. When finalize_windowaggregate gets called, >> tuples are sorted and duplicates are removed, followed by calling the >> transition function on each tuple. >> When distinct is not required, the above process is skipped and the >> transition function gets called directly and nothing gets inserted >> into tuplesort. >> Note: For each partition, in tuplesort_begin and tuplesort_end is >> involved to rinse tuplesort, so at any time, max tuples in tuplesort >> is equal to tuples in a particular partition.
>>
>> I have verified it for interger and interval column aggregates (to >> rule out obvious issues related to data types).
>>
>> Sample cases:
>>
>> create table mytable(id int, name text);
>> insert into mytable values(1, 'A');
>> insert into mytable values(1, 'A');
>> insert into mytable values(5, 'B');
>> insert into mytable values(3, 'A');
>> insert into mytable values(1, 'A');
>>
>> select avg(distinct id) over (partition by name) from mytable;
>>         avg
>> --------------------
>> 2.0000000000000000
>> 2.0000000000000000
>> 2.0000000000000000
>> 2.0000000000000000
>> 5.0000000000000000
>>
>> select avg(id) over (partition by name) from mytable;
>>         avg
>> --------------------
>>  1.5000000000000000
>>  1.5000000000000000
>>  1.5000000000000000
>>  1.5000000000000000
>>  5.0000000000000000
>>
>> select avg(distinct id) over () from mytable;
>>         avg
>> --------------------
>>  3.0000000000000000
>>  3.0000000000000000
>>  3.0000000000000000
>>  3.0000000000000000
>>  3.0000000000000000
>>
>> select avg(distinct id)  from mytable;
>>         avg
>> --------------------
>>  3.0000000000000000
>>
>> This is my first-time contribution. Please let me know if anything >> can be
>> improved as I`m eager to learn.
>>
>> Regards,
>> Ankit Kumar Pandey
>
> Hi all,
>
> I know everyone is busy with holidays (well, Happy Holidays!) but I > will be glad if someone can take a quick look at this PoC and share > thoughts.
>
> This is my first time contribution so I am pretty sure there will be > some very obvious feedbacks (which will help me to move forward with > this change).
>
>
Updated patch with latest master. Last patch was an year old.

Attaching patch with rebase from latest HEAD


Thanks,

Ankit
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 9240c691c1..7c07fb0684 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -155,13 +155,6 @@ typedef struct WindowStatePerAggData
 
 	int64		transValueCount;	/* number of currently-aggregated rows */
 
-	Datum		lastdatum;		/* used for single-column DISTINCT */
-	FmgrInfo	equalfnOne; /* single-column comparisons*/
-	Oid			*eq_ops;
-	Oid			*sort_ops;
-
-	bool sort_in;
-
 	/* Data local to eval_windowaggregates() */
 	bool		restart;		/* need to restart this agg in this cycle? */
 } WindowStatePerAggData;
@@ -171,7 +164,7 @@ static void initialize_windowaggregate(WindowAggState *winstate,
 									   WindowStatePerAgg peraggstate);
 static void advance_windowaggregate(WindowAggState *winstate,
 									WindowStatePerFunc perfuncstate,
-									WindowStatePerAgg peraggstate, Datum value, bool isNull);
+									WindowStatePerAgg peraggstate);
 static bool advance_windowaggregate_base(WindowAggState *winstate,
 										 WindowStatePerFunc perfuncstate,
 										 WindowStatePerAgg peraggstate);
@@ -181,9 +174,6 @@ static void finalize_windowaggregate(WindowAggState *winstate,
 									 Datum *result, bool *isnull);
 
 static void eval_windowaggregates(WindowAggState *winstate);
-static void process_ordered_aggregate_single(WindowAggState *winstate, 
-											 WindowStatePerFunc perfuncstate,
-								 			 WindowStatePerAgg peraggstate);
 static void eval_windowfunction(WindowAggState *winstate,
 								WindowStatePerFunc perfuncstate,
 								Datum *result, bool *isnull);
@@ -241,7 +231,6 @@ initialize_windowaggregate(WindowAggState *winstate,
 	peraggstate->transValueIsNull = peraggstate->initValueIsNull;
 	peraggstate->transValueCount = 0;
 	peraggstate->resultValue = (Datum) 0;
-	peraggstate->lastdatum = (Datum) 0;
 	peraggstate->resultValueIsNull = true;
 }
 
@@ -252,21 +241,43 @@ initialize_windowaggregate(WindowAggState *winstate,
 static void
 advance_windowaggregate(WindowAggState *winstate,
 						WindowStatePerFunc perfuncstate,
-						WindowStatePerAgg peraggstate, Datum value, bool isNull)
+						WindowStatePerAgg peraggstate)
 {
 	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
 	int			numArguments = perfuncstate->numArguments;
 	Datum		newVal;
+	ListCell   *arg;
 	int			i;
 	MemoryContext oldContext;
 	ExprContext *econtext = winstate->tmpcontext;
+	ExprState  *filter = wfuncstate->aggfilter;
 
 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
 
+	/* Skip anything FILTERed out */
+	if (filter)
+	{
+		bool		isnull;
+		Datum		res = ExecEvalExpr(filter, econtext, &isnull);
+
+		if (isnull || !DatumGetBool(res))
+		{
+			MemoryContextSwitchTo(oldContext);
+			return;
+		}
+	}
+
 	/* We start from 1, since the 0th arg will be the transition value */
+	i = 1;
+	foreach(arg, wfuncstate->args)
+	{
+		ExprState  *argstate = (ExprState *) lfirst(arg);
 
-	fcinfo->args[1].value = value;
-	fcinfo->args[1].isnull = isNull;
+		fcinfo->args[i].value = ExecEvalExpr(argstate, econtext,
+											 &fcinfo->args[i].isnull);
+		i++;
+	}
 
 	if (peraggstate->transfn.fn_strict)
 	{
@@ -575,10 +586,6 @@ finalize_windowaggregate(WindowAggState *winstate,
 
 	oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
 
-	/* Run transition function for distinct agg */
-	if (perfuncstate->wfunc->aggdistinct)
-		process_ordered_aggregate_single(winstate,  perfuncstate,  peraggstate);
-
 	/*
 	 * Apply the agg's finalfn if one is provided, else return transValue.
 	 */
@@ -660,16 +667,6 @@ eval_windowaggregates(WindowAggState *winstate)
 	TupleTableSlot *agg_row_slot;
 	TupleTableSlot *temp_slot;
 
-	ExprState 	*filter;
-	bool		isnull;
-	WindowFuncExprState *wfuncstate;
-	ListCell 	*arg;
-	Datum 		tuple;
-	ExprContext *aggecontext;
-	ListCell 	*lc;
-	Oid			inputTypes[FUNC_MAX_ARGS];
-	WindowStatePerFunc perfuncstate;
-
 	numaggs = winstate->numaggs;
 	if (numaggs == 0)
 		return;					/* nothing to do */
@@ -897,22 +894,6 @@ eval_windowaggregates(WindowAggState *winstate)
 		}
 	}
 
-	perfuncstate = &winstate->perfunc[wfuncno];
-	if (perfuncstate->wfunc->aggdistinct)
-	{
-		i = 0;
-		foreach(lc, perfuncstate->wfunc->args)
-		{
-			inputTypes[i++] = exprType((Node *) lfirst(lc));
-		}
-		winstate->sortstates =
-					tuplesort_begin_datum(inputTypes[0],
-										peraggstate->sort_ops[0],
-										perfuncstate->wfunc->inputcollid,
-										true,
-										work_mem, NULL, TUPLESORT_NONE);
-	}
-
 	/*
 	 * Non-restarted aggregates now contain the rows between aggregatedbase
 	 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -947,8 +928,7 @@ eval_windowaggregates(WindowAggState *winstate)
 		{
 			if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
 									 agg_row_slot))
-			break;			/* must be end of partition */
-				
+				break;			/* must be end of partition */
 		}
 
 		/*
@@ -956,16 +936,14 @@ eval_windowaggregates(WindowAggState *winstate)
 		 * current row is not in frame but there might be more in the frame.
 		 */
 		ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
-
 		if (ret < 0)
 			break;
-
 		if (ret == 0)
 			goto next_tuple;
 
 		/* Set tuple context for evaluation of aggregate arguments */
 		winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
-		
+
 		/* Accumulate row into the aggregates */
 		for (i = 0; i < numaggs; i++)
 		{
@@ -977,46 +955,9 @@ eval_windowaggregates(WindowAggState *winstate)
 				continue;
 
 			wfuncno = peraggstate->wfuncno;
-			perfuncstate = &winstate->perfunc[wfuncno];
-
-			aggecontext = winstate->tmpcontext;
-
-			wfuncstate = perfuncstate->wfuncstate;
-			filter = wfuncstate->aggfilter;
-
-			oldContext = MemoryContextSwitchTo(aggecontext->ecxt_per_tuple_memory);
-			/* Skip anything FILTERed out for aggregates */
-			if (perfuncstate->plain_agg && wfuncstate->aggfilter)
-			{
-				Datum		res = ExecEvalExpr(filter, aggecontext, &isnull);
-
-				if (isnull || !DatumGetBool(res))
-				{
-					MemoryContextSwitchTo(oldContext);
-					continue;
-				}
-			}
-
-
-			foreach(arg, wfuncstate->args)
-			{
-				
-				ExprState  *argstate = (ExprState *) lfirst(arg);
-				tuple = ExecEvalExpr(argstate, aggecontext, &isnull);
-
-				if (perfuncstate->wfunc->aggdistinct)
-				{
-					tuplesort_putdatum(winstate->sortstates, tuple, isnull);
-					peraggstate->sort_in = true;
-				}
-				else
-				{
-					advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], 
-											peraggstate, tuple, isnull);
-				}
-				
-			}
-			MemoryContextSwitchTo(oldContext);
+			advance_windowaggregate(winstate,
+									&winstate->perfunc[wfuncno],
+									peraggstate);
 		}
 
 next_tuple:
@@ -1072,67 +1013,6 @@ next_tuple:
 	}
 }
 
-
-static void
-process_ordered_aggregate_single(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
-								 WindowStatePerAgg peraggstate)
-{
-	Datum	   newVal;
-	bool	   isNull;
-	MemoryContext workcontext = winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory;
-	MemoryContext oldContext;
-	Datum		oldVal = (Datum) 0;
-	bool		oldIsNull = true;
-	bool		haveOldVal = false;
-
-	if (peraggstate->sort_in){								
-		tuplesort_performsort(winstate->sortstates);
-
-
-		while (tuplesort_getdatum(winstate->sortstates,
-							  true, false, &newVal, &isNull, NULL))
-		{
-			MemoryContextReset(workcontext);
-			oldContext = MemoryContextSwitchTo(workcontext);
-
-			if (haveOldVal && DatumGetBool(FunctionCall2Coll(&peraggstate->equalfnOne,
-												perfuncstate->winCollation,
-												oldVal, newVal)))
-			{
-				MemoryContextSwitchTo(oldContext);
-				continue;
-			} 
-			else
-			{
-
-			advance_windowaggregate(winstate, perfuncstate,
-											peraggstate, newVal, isNull);
-			}
-			MemoryContextSwitchTo(oldContext);
-
-			if (!peraggstate->resulttypeByVal)
-			{
-				if (!oldIsNull && false)
-					pfree(DatumGetPointer(oldVal));
-				if (!isNull)
-					oldVal = datumCopy(newVal, true,
-									   peraggstate->resulttypeLen);
-			}
-			else
-				oldVal = newVal;
-
-			oldIsNull = isNull;
-			haveOldVal = true;
-			oldVal = newVal;
-			}
-
-		}
-		tuplesort_end(winstate->sortstates);
-		peraggstate->sort_in = false;
-
-}
-
-
 /*
  * eval_windowfunction
  *
@@ -3076,9 +2956,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 	get_typlenbyval(aggtranstype,
 					&peraggstate->transtypeLen,
 					&peraggstate->transtypeByVal);
-	get_typlenbyval(wfunc->wintype,
-					&peraggstate->inputtypeLen,
-					&peraggstate->inputtypeByVal);
 
 	/*
 	 * initval is potentially null, so don't try to access it as a struct
@@ -3146,32 +3023,6 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 	else
 		peraggstate->aggcontext = winstate->aggcontext;
 
-	/* Handle distinct operation in agg */
-	if (wfunc->aggdistinct)
-	{
-		int		numDistinctCols = list_length(wfunc->distinctargs);
-		peraggstate->eq_ops = palloc(numDistinctCols * sizeof(Oid));
-		peraggstate->sort_ops =  palloc(numDistinctCols * sizeof(Oid));
-		winstate->sortstates = (Tuplesortstate *)
-								palloc0(sizeof(Tuplesortstate *) * 1);
-
-		/* Initialize tuplesort to handle distinct operation */
-
-		i=0;
-		foreach(lc, wfunc->distinctargs)
-		{
-			peraggstate->eq_ops[i] = ((SortGroupClause *) lfirst(lc))->eqop;
-			peraggstate->sort_ops[i] = ((SortGroupClause *) lfirst(lc))->sortop;
-			i++;
-		}
-		fmgr_info(get_opcode(peraggstate->eq_ops[0]), &peraggstate->equalfnOne);
-		winstate->sortstates = tuplesort_begin_datum(inputTypes[0],
-											peraggstate->sort_ops[0],
-											wfunc->inputcollid,
-											true,
-											work_mem, NULL, TUPLESORT_NONE);
-	}
-	
 	ReleaseSysCache(aggTuple);
 
 	return peraggstate;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 4a755d0604..76e25118f9 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2445,8 +2445,6 @@ eval_const_expressions_mutator(Node *node,
 				newexpr->winref = expr->winref;
 				newexpr->winstar = expr->winstar;
 				newexpr->winagg = expr->winagg;
-				newexpr->aggdistinct = expr->aggdistinct;
-				newexpr->distinctargs = expr->distinctargs;
 				newexpr->location = expr->location;
 
 				return (Node *) newexpr;
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index c3c1f3f922..4fbf80c271 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1048,51 +1048,6 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		}
 	}
 
-	if (wfunc->aggdistinct){
-		List	   *argtypes = NIL;
-		List	   *tlist = NIL;
-		List	   *torder = NIL;
-		List	   *tdistinct = NIL;
-		AttrNumber	attno = 1;
-		ListCell   *lc;
-
-		foreach(lc, wfunc->args)
-		{
-			Expr	   *arg = (Expr *) lfirst(lc);
-			TargetEntry *tle;
-
-			/* We don't bother to assign column names to the entries */
-			tle = makeTargetEntry(arg, attno++, NULL, false);
-			tlist = lappend(tlist, tle);
-		}
-		torder = transformSortClause(pstate,
-									 NIL,
-									 &tlist,
-									 EXPR_KIND_ORDER_BY,
-									 true /* force SQL99 rules */ );
-
-		tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
-
-		foreach(lc, tdistinct)
-		{
-			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
-
-			if (!OidIsValid(sortcl->sortop))
-			{
-				Node	   *expr = get_sortgroupclause_expr(sortcl, tlist);
-
-				ereport(ERROR,
-						(errcode(ERRCODE_UNDEFINED_FUNCTION),
-							errmsg("could not identify an ordering operator for type %s",
-								format_type_be(exprType(expr))),
-							errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
-							parser_errposition(pstate, exprLocation(expr))));
-			}
-		}
-		wfunc->distinctargs = tdistinct;
-	}
-	
-
 	pstate->p_hasWindowFuncs = true;
 }
 
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 89a443eac5..ca14f06308 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -835,7 +835,15 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
 		wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
 		wfunc->aggfilter = agg_filter;
 		wfunc->location = location;
-		wfunc->aggdistinct = agg_distinct;
+
+		/*
+		 * agg_star is allowed for aggregate functions but distinct isn't
+		 */
+		if (agg_distinct)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("DISTINCT is not implemented for window functions"),
+					 parser_errposition(pstate, location)));
 
 		/*
 		 * Reject attempt to call a parameterless aggregate without (*)
@@ -848,16 +856,6 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
 							NameListToString(funcname)),
 					 parser_errposition(pstate, location)));
 
-		/*
-		 * Distinct is not implemented for aggregates with filter
-		 */
-
-		if (agg_distinct && agg_filter)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("DISTINCT is not implemented for aggregate functions with FILTER"),
-					 parser_errposition(pstate, location)));
-
 		/*
 		 * ordered aggs not allowed in windows yet
 		 */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4301db31d8..bc67cb9ed8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2529,7 +2529,6 @@ typedef struct WindowAggState
 									 * date for current row */
 	bool		grouptail_valid;	/* true if grouptailpos is known up to
 									 * date for current row */
-	Tuplesortstate *sortstates;
 
 	TupleTableSlot *first_part_slot;	/* first tuple of current or next
 										 * partition */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index f5dd8f2d07..4220c63ab7 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -558,8 +558,6 @@ typedef struct WindowFunc
 	/* true if argument list was really '*' */
 	bool		winstar pg_node_attr(query_jumble_ignore);
 	/* is function a simple aggregate? */
-	bool		aggdistinct;    /* do we need distinct values for aggregation? */
-	List		*distinctargs;
 	bool		winagg pg_node_attr(query_jumble_ignore);
 	/* token location, or -1 if unknown */
 	int			location;
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 04247ce4a3..747608e3c1 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -1775,22 +1775,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
         5000 |       4500 |   4200 | 01-01-2008
 (10 rows)
 
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
-  depname  | count 
------------+-------
- develop   |     4
- develop   |     4
- develop   |     4
- develop   |     4
- develop   |     4
- personnel |     2
- personnel |     2
- sales     |     3
- sales     |     3
- sales     |     3
-(10 rows)
-
 -- RANGE offset PRECEDING/FOLLOWING with null values
 select x, y,
        first_value(y) over w,
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index a88d1d092c..1009b438de 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -434,9 +434,6 @@ select first_value(salary) over(order by enroll_date range between unbounded pre
 	exclude current row),
 	salary, enroll_date from empsalary;
 
--- with DISTINCT in agg functionn
-select depname, count(DISTINCT enroll_date) over (PARTITION BY depname) FROM empsalary;
-
 -- RANGE offset PRECEDING/FOLLOWING with null values
 select x, y,
        first_value(y) over w,

Reply via email to