Hi hackers,

I want to propose to you an old patch for Postgres 11, off-site developed
by Oliver Ford,
but I have permission from him to publish it and to continue it's
development,
that allow distinct aggregates, like select sum(distinct nums) within a
window function.

I have rebased it for current git master branch and have made necessary
changes to it to work with Postgres 13devel.

It's a WIP, because it doesn't have tests yet (I will add them later) and
also, it works for a int, float, and numeric types,
but probably distinct check can be rewritten for possible performance
improvement,
with storing the distinct elements in a hash table which should give a
performance improvement.

If you find the implementation of patch acceptable from committers
perspective,
I will answer to all yours design and review notes and will try to go ahead
with it,
also, I will add this patch to the March commit fest.

For example usage of a patch, if you have time series data, with current
Postgres you will get an error:

postgres=# CREATE TABLE t_demo AS
postgres-#     SELECT   ordinality, day, date_part('week', day) AS week
postgres-#     FROM    generate_series('2020-01-02', '2020-01-15', '1
day'::interval)
postgres-#             WITH ORDINALITY AS day;
SELECT 14
postgres=# SELECT * FROM t_demo;
 ordinality |          day           | week
------------+------------------------+------
          1 | 2020-01-02 00:00:00+02 |    1
          2 | 2020-01-03 00:00:00+02 |    1
          3 | 2020-01-04 00:00:00+02 |    1
          4 | 2020-01-05 00:00:00+02 |    1
          5 | 2020-01-06 00:00:00+02 |    2
          6 | 2020-01-07 00:00:00+02 |    2
          7 | 2020-01-08 00:00:00+02 |    2
          8 | 2020-01-09 00:00:00+02 |    2
          9 | 2020-01-10 00:00:00+02 |    2
         10 | 2020-01-11 00:00:00+02 |    2
         11 | 2020-01-12 00:00:00+02 |    2
         12 | 2020-01-13 00:00:00+02 |    3
         13 | 2020-01-14 00:00:00+02 |    3
         14 | 2020-01-15 00:00:00+02 |    3
(14 rows)

postgres=# SELECT   *,
postgres-#         array_agg(DISTINCT week) OVER (ORDER BY day ROWS
postgres(#                                 BETWEEN 2 PRECEDING AND 2
FOLLOWING)
postgres-# FROM    t_demo;
ERROR:  DISTINCT is not implemented for window functions
LINE 2:         array_agg(DISTINCT week) OVER (ORDER BY day ROWS
                ^

So you will need to write something like this:

postgres=# SELECT  *, (SELECT array_agg(DISTINCT unnest) FROM unnest(x)) AS
b
postgres-# FROM
postgres-# (
postgres(#         SELECT  *,
postgres(#                 array_agg(week) OVER (ORDER BY day ROWS
postgres(#                         BETWEEN 2 PRECEDING AND 2 FOLLOWING) AS x
postgres(#         FROM    t_demo
postgres(# ) AS a;
 ordinality |          day           | week |      x      |   b
------------+------------------------+------+-------------+-------
          1 | 2020-01-02 00:00:00+02 |    1 | {1,1,1}     | {1}
          2 | 2020-01-03 00:00:00+02 |    1 | {1,1,1,1}   | {1}
          3 | 2020-01-04 00:00:00+02 |    1 | {1,1,1,1,2} | {1,2}
          4 | 2020-01-05 00:00:00+02 |    1 | {1,1,1,2,2} | {1,2}
          5 | 2020-01-06 00:00:00+02 |    2 | {1,1,2,2,2} | {1,2}
          6 | 2020-01-07 00:00:00+02 |    2 | {1,2,2,2,2} | {1,2}
          7 | 2020-01-08 00:00:00+02 |    2 | {2,2,2,2,2} | {2}
          8 | 2020-01-09 00:00:00+02 |    2 | {2,2,2,2,2} | {2}
          9 | 2020-01-10 00:00:00+02 |    2 | {2,2,2,2,2} | {2}
         10 | 2020-01-11 00:00:00+02 |    2 | {2,2,2,2,3} | {2,3}
         11 | 2020-01-12 00:00:00+02 |    2 | {2,2,2,3,3} | {2,3}
         12 | 2020-01-13 00:00:00+02 |    3 | {2,2,3,3,3} | {2,3}
         13 | 2020-01-14 00:00:00+02 |    3 | {2,3,3,3}   | {2,3}
         14 | 2020-01-15 00:00:00+02 |    3 | {3,3,3}     | {3}
(14 rows)

With attached version, you will get the desired results:

postgres=# SELECT   *,
postgres-#         array_agg(DISTINCT week) OVER (ORDER BY day ROWS
postgres(#                                 BETWEEN 2 PRECEDING AND 2
FOLLOWING)
postgres-# FROM    t_demo;
 ordinality |          day           | week | array_agg
------------+------------------------+------+-----------
          1 | 2020-01-02 00:00:00+02 |    1 | {1}
          2 | 2020-01-03 00:00:00+02 |    1 | {1}
          3 | 2020-01-04 00:00:00+02 |    1 | {1,2}
          4 | 2020-01-05 00:00:00+02 |    1 | {1,2}
          5 | 2020-01-06 00:00:00+02 |    2 | {1,2}
          6 | 2020-01-07 00:00:00+02 |    2 | {1,2}
          7 | 2020-01-08 00:00:00+02 |    2 | {2}
          8 | 2020-01-09 00:00:00+02 |    2 | {2}
          9 | 2020-01-10 00:00:00+02 |    2 | {2}
         10 | 2020-01-11 00:00:00+02 |    2 | {2,3}
         11 | 2020-01-12 00:00:00+02 |    2 | {2,3}
         12 | 2020-01-13 00:00:00+02 |    3 | {2,3}
         13 | 2020-01-14 00:00:00+02 |    3 | {2,3}
         14 | 2020-01-15 00:00:00+02 |    3 | {3}
(14 rows)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 4cc7da268d..66ab18bab6 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -34,9 +34,14 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/nbtree.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_aggregate.h"
+#include "catalog/pg_am.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+#include "commands/defrem.h"
+#include "common/int.h"
 #include "executor/executor.h"
 #include "executor/nodeWindowAgg.h"
 #include "miscadmin.h"
@@ -156,6 +161,12 @@ typedef struct WindowStatePerAggData
 
 	/* Data local to eval_windowaggregates() */
 	bool		restart;		/* need to restart this agg in this cycle? */
+
+	FmgrInfo	distinct_cmpfn;
+	int16		distinct_typlen;
+	void	   *distinctArr;	/* Array of DISTINCT values */
+	int64		distinctSize;	/* Current size of the working distinctArr */
+	int64		distinctCount;	/* Current number of values in the working distinctArr */
 } WindowStatePerAggData;
 
 static void initialize_windowaggregate(WindowAggState *winstate,
@@ -209,6 +220,24 @@ initialize_windowaggregate(WindowAggState *winstate,
 {
 	MemoryContext oldContext;
 
+		if (perfuncstate->wfunc->windistinct)
+		{
+			if (peraggstate->distinctSize > 0)
+			{
+				pfree(peraggstate->distinctArr);
+				peraggstate->distinctCount = 0;
+				peraggstate->distinctSize = 0;
+			}
+
+			/* If the type is a fixed length, allocate an initial array of size 16 */
+			Assert(peraggstate->distinct_typlen > 0 || peraggstate->distinct_typlen == -1);
+			if (peraggstate->distinct_typlen > 0)
+			{
+				peraggstate->distinctArr = palloc0(sizeof(int64) * 16);
+				peraggstate->distinctSize = 16;
+			}
+		}
+
 	/*
 	 * If we're using a private aggcontext, we may reset it here.  But if the
 	 * context is shared, we don't know which other aggregates may still need
@@ -278,6 +307,68 @@ advance_windowaggregate(WindowAggState *winstate,
 		i++;
 	}
 
+	if (wfuncstate->wfunc->windistinct && !fcinfo->args[1].isnull)
+	{
+		MemoryContext tupleContext = MemoryContextSwitchTo(oldContext);
+
+		if (peraggstate->distinct_typlen > 0)
+		{
+			for (i = 0; i < peraggstate->distinctCount; i++)
+			{
+				if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn,
+												fcinfo->args[1].value,
+											   *((int64 *)peraggstate->distinctArr + i))) == 0)
+					return;
+			}
+
+			if (peraggstate->distinctCount == peraggstate->distinctSize)
+			{
+				peraggstate->distinctSize *= 2;
+				peraggstate->distinctArr = (int64 *) repalloc(peraggstate->distinctArr,
+															  sizeof(int64) * peraggstate->distinctSize);
+			}
+			*((int64 *)peraggstate->distinctArr + peraggstate->distinctCount) = fcinfo->args[1].value;
+		}
+		else if (peraggstate->distinct_typlen == -1)
+		{
+			uint64 len, tmp_len = 0;
+			int8 *arr = (int8 *) peraggstate->distinctArr;
+			int8 *arg = (int8 *) DatumGetPointer(fcinfo->args[1].value);
+			int64 varlen = VARSIZE_ANY(fcinfo->args[1].value);
+
+			for (i = 0; i < peraggstate->distinctCount; i++)
+			{
+				uint64 curr_len;
+				if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn,
+												 fcinfo->args[1].value,
+												 PointerGetDatum(arr)) == 0))
+					return;
+				curr_len = VARSIZE_ANY(arr);
+				tmp_len += curr_len;
+				arr += curr_len;
+			}
+
+			if (peraggstate->distinctCount == 0)
+			{
+				len = sh_pow2(varlen);
+				peraggstate->distinctArr = (int8 *) palloc0(len);
+			}
+			else
+			{
+				len = sh_pow2(tmp_len + varlen);
+				if (len > peraggstate->distinctSize)
+					peraggstate->distinctArr = (int8 *) repalloc(peraggstate->distinctArr,
+																 len);
+			}
+			arr = (int8 *) peraggstate->distinctArr + tmp_len;
+			peraggstate->distinctSize = len;
+			for (i = 0; i < varlen; i++) *arr++ = *arg++;
+		}
+
+		peraggstate->distinctCount++;
+		MemoryContextSwitchTo(tupleContext);
+	}
+
 	if (peraggstate->transfn.fn_strict)
 	{
 		/*
@@ -428,6 +519,9 @@ advance_windowaggregate_base(WindowAggState *winstate,
 	ExprContext *econtext = winstate->tmpcontext;
 	ExprState  *filter = wfuncstate->aggfilter;
 
+	if (wfuncstate->wfunc->windistinct)
+		return false;
+
 	oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
 
 	/* Skip anything FILTERed out */
@@ -2610,8 +2704,6 @@ ExecReScanWindowAgg(WindowAggState *node)
 
 /*
  * initialize_peragg
- *
- * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
  */
 static WindowStatePerAggData *
 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
@@ -2651,6 +2743,27 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 			 wfunc->winfnoid);
 	aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
 
+	if (wfunc->windistinct)
+	{
+		Oid			opclass,
+					opfamily,
+					cmp_func_oid;
+		HeapTuple	typeTuple;
+		Form_pg_type typeform;
+
+		opclass = GetDefaultOpClass(wfunc->winaggargtype, BTREE_AM_OID);
+		opfamily = get_opclass_family(opclass);
+		cmp_func_oid = get_opfamily_proc(opfamily, wfunc->winaggargtype, wfunc->winaggargtype, BTORDER_PROC);
+		fmgr_info(cmp_func_oid, &peraggstate->distinct_cmpfn);
+
+		typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(wfunc->winaggargtype));
+		if (!HeapTupleIsValid(typeTuple))
+			elog(ERROR, "cache lookup failed for type %u", wfunc->winaggargtype);
+		typeform = (Form_pg_type) GETSTRUCT(typeTuple);
+		peraggstate->distinct_typlen = typeform->typlen;
+		ReleaseSysCache(typeTuple);
+	}
+
 	/*
 	 * Figure out whether we want to use the moving-aggregate implementation,
 	 * and collect the right set of fields from the pg_attribute entry.
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 54ad62bb7f..fb8058af24 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1493,6 +1493,8 @@ _copyWindowFunc(const WindowFunc *from)
 	COPY_SCALAR_FIELD(winref);
 	COPY_SCALAR_FIELD(winstar);
 	COPY_SCALAR_FIELD(winagg);
+	COPY_SCALAR_FIELD(windistinct);
+	COPY_SCALAR_FIELD(winaggargtype);
 	COPY_LOCATION_FIELD(location);
 
 	return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 5b1ba143b1..24a03097a0 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -264,6 +264,8 @@ _equalWindowFunc(const WindowFunc *a, const WindowFunc *b)
 	COMPARE_SCALAR_FIELD(winref);
 	COMPARE_SCALAR_FIELD(winstar);
 	COMPARE_SCALAR_FIELD(winagg);
+	COMPARE_SCALAR_FIELD(windistinct);
+	COMPARE_SCALAR_FIELD(winaggargtype);
 	COMPARE_LOCATION_FIELD(location);
 
 	return true;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index d76fae44b8..c40dbd0cb4 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1162,6 +1162,8 @@ _outWindowFunc(StringInfo str, const WindowFunc *node)
 	WRITE_UINT_FIELD(winref);
 	WRITE_BOOL_FIELD(winstar);
 	WRITE_BOOL_FIELD(winagg);
+	WRITE_BOOL_FIELD(windistinct);
+	WRITE_OID_FIELD(winaggargtype);
 	WRITE_LOCATION_FIELD(location);
 }
 
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 551ce6c41c..aaa327ce87 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -653,6 +653,8 @@ _readWindowFunc(void)
 	READ_UINT_FIELD(winref);
 	READ_BOOL_FIELD(winstar);
 	READ_BOOL_FIELD(winagg);
+	READ_BOOL_FIELD(windistinct);
+	READ_OID_FIELD(winaggargtype);
 	READ_LOCATION_FIELD(location);
 
 	READ_DONE();
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 2d3ec22407..5cb87c7073 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2451,6 +2451,8 @@ eval_const_expressions_mutator(Node *node,
 				newexpr->winref = expr->winref;
 				newexpr->winstar = expr->winstar;
 				newexpr->winagg = expr->winagg;
+				newexpr->windistinct = expr->windistinct;
+				newexpr->winaggargtype = expr->winaggargtype;
 				newexpr->location = expr->location;
 
 				return (Node *) newexpr;
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 9c3b6ad916..c663403bfb 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -825,13 +825,21 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
 		wfunc->location = location;
 
 		/*
-		 * agg_star is allowed for aggregate functions but distinct isn't
+		 * In a window function, DISTINCT is allowed only for aggregate
+		 * functions and is handled in nodeWindowAgg.c separately from
+		 * when it occurs outside of a window function.
 		 */
 		if (agg_distinct)
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("DISTINCT is not implemented for window functions"),
-					 parser_errposition(pstate, location)));
+		{
+			if (!wfunc->winagg)
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("DISTINCT is only allowed in aggregate window functions"),
+						 parser_errposition(pstate, location)));
+			wfunc->windistinct = true;
+			agg_distinct = false;
+			wfunc->winaggargtype = actual_arg_types[0];
+		}
 
 		/*
 		 * Reject attempt to call a parameterless aggregate without (*)
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 116e00bce4..bf4a86b0a9 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -9464,7 +9464,11 @@ get_windowfunc_expr(WindowFunc *wfunc, deparse_context *context)
 	if (wfunc->winstar)
 		appendStringInfoChar(buf, '*');
 	else
+	{
+		if (wfunc->windistinct)
+			appendStringInfoString(buf, "DISTINCT ");
 		get_rule_expr((Node *) wfunc->args, context, true);
+	}
 
 	if (wfunc->aggfilter != NULL)
 	{
diff --git a/src/include/common/int.h b/src/include/common/int.h
index a2972218e7..6fb2885977 100644
--- a/src/include/common/int.h
+++ b/src/include/common/int.h
@@ -434,4 +434,24 @@ pg_mul_u64_overflow(uint64 a, uint64 b, uint64 *result)
 #endif
 }
 
+
+/* calculate ceil(log base 2) of num */
+static inline uint64
+sh_log2(uint64 num)
+{
+	int			i;
+	uint64		limit;
+
+	for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
+		;
+	return i;
+}
+
+/* calculate first power of 2 >= num */
+static inline uint64
+sh_pow2(uint64 num)
+{
+	return ((uint64) 1) << sh_log2(num);
+}
+
 #endif							/* COMMON_INT_H */
diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 5a6783f653..669d97cac2 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -57,6 +57,8 @@
  *	  backwards, unless they're empty or already at their optimal position.
  */
 
+#include "common/int.h"
+
 /* helpers */
 #define SH_MAKE_PREFIX(a) CppConcat(a,_)
 #define SH_MAKE_NAME(name) SH_MAKE_NAME_(SH_MAKE_PREFIX(SH_PREFIX),name)
@@ -215,27 +217,6 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb);
 #ifndef SIMPLEHASH_H
 #define SIMPLEHASH_H
 
-/* FIXME: can we move these to a central location? */
-
-/* calculate ceil(log base 2) of num */
-static inline uint64
-sh_log2(uint64 num)
-{
-	int			i;
-	uint64		limit;
-
-	for (i = 0, limit = 1; limit < num; i++, limit <<= 1)
-		;
-	return i;
-}
-
-/* calculate first power of 2 >= num */
-static inline uint64
-sh_pow2(uint64 num)
-{
-	return ((uint64) 1) << sh_log2(num);
-}
-
 #ifdef FRONTEND
 #define sh_error(...) pg_log_error(__VA_ARGS__)
 #define sh_log(...) pg_log_info(__VA_ARGS__)
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index d73be2ad46..710eea443f 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -379,6 +379,8 @@ typedef struct WindowFunc
 	Index		winref;			/* index of associated WindowClause */
 	bool		winstar;		/* true if argument list was really '*' */
 	bool		winagg;			/* is function a simple aggregate? */
+	bool		windistinct;	/* is function a DISTINCT aggregate? */
+   Oid		winaggargtype; /* arg type of function, used for DISTINCT */
 	int			location;		/* token location, or -1 if unknown */
 } WindowFunc;
 

Reply via email to