Hi hackers, I want to propose to you an old patch for Postgres 11, off-site developed by Oliver Ford, but I have permission from him to publish it and to continue it's development, that allow distinct aggregates, like select sum(distinct nums) within a window function.
I have rebased it for current git master branch and have made necessary changes to it to work with Postgres 13devel. It's a WIP, because it doesn't have tests yet (I will add them later) and also, it works for a int, float, and numeric types, but probably distinct check can be rewritten for possible performance improvement, with storing the distinct elements in a hash table which should give a performance improvement. If you find the implementation of patch acceptable from committers perspective, I will answer to all yours design and review notes and will try to go ahead with it, also, I will add this patch to the March commit fest. For example usage of a patch, if you have time series data, with current Postgres you will get an error: postgres=# CREATE TABLE t_demo AS postgres-# SELECT ordinality, day, date_part('week', day) AS week postgres-# FROM generate_series('2020-01-02', '2020-01-15', '1 day'::interval) postgres-# WITH ORDINALITY AS day; SELECT 14 postgres=# SELECT * FROM t_demo; ordinality | day | week ------------+------------------------+------ 1 | 2020-01-02 00:00:00+02 | 1 2 | 2020-01-03 00:00:00+02 | 1 3 | 2020-01-04 00:00:00+02 | 1 4 | 2020-01-05 00:00:00+02 | 1 5 | 2020-01-06 00:00:00+02 | 2 6 | 2020-01-07 00:00:00+02 | 2 7 | 2020-01-08 00:00:00+02 | 2 8 | 2020-01-09 00:00:00+02 | 2 9 | 2020-01-10 00:00:00+02 | 2 10 | 2020-01-11 00:00:00+02 | 2 11 | 2020-01-12 00:00:00+02 | 2 12 | 2020-01-13 00:00:00+02 | 3 13 | 2020-01-14 00:00:00+02 | 3 14 | 2020-01-15 00:00:00+02 | 3 (14 rows) postgres=# SELECT *, postgres-# array_agg(DISTINCT week) OVER (ORDER BY day ROWS postgres(# BETWEEN 2 PRECEDING AND 2 FOLLOWING) postgres-# FROM t_demo; ERROR: DISTINCT is not implemented for window functions LINE 2: array_agg(DISTINCT week) OVER (ORDER BY day ROWS ^ So you will need to write something like this: postgres=# SELECT *, (SELECT array_agg(DISTINCT unnest) FROM unnest(x)) AS b postgres-# FROM postgres-# ( postgres(# SELECT *, postgres(# array_agg(week) OVER (ORDER BY day ROWS postgres(# BETWEEN 2 PRECEDING AND 2 FOLLOWING) AS x postgres(# FROM t_demo postgres(# ) AS a; ordinality | day | week | x | b ------------+------------------------+------+-------------+------- 1 | 2020-01-02 00:00:00+02 | 1 | {1,1,1} | {1} 2 | 2020-01-03 00:00:00+02 | 1 | {1,1,1,1} | {1} 3 | 2020-01-04 00:00:00+02 | 1 | {1,1,1,1,2} | {1,2} 4 | 2020-01-05 00:00:00+02 | 1 | {1,1,1,2,2} | {1,2} 5 | 2020-01-06 00:00:00+02 | 2 | {1,1,2,2,2} | {1,2} 6 | 2020-01-07 00:00:00+02 | 2 | {1,2,2,2,2} | {1,2} 7 | 2020-01-08 00:00:00+02 | 2 | {2,2,2,2,2} | {2} 8 | 2020-01-09 00:00:00+02 | 2 | {2,2,2,2,2} | {2} 9 | 2020-01-10 00:00:00+02 | 2 | {2,2,2,2,2} | {2} 10 | 2020-01-11 00:00:00+02 | 2 | {2,2,2,2,3} | {2,3} 11 | 2020-01-12 00:00:00+02 | 2 | {2,2,2,3,3} | {2,3} 12 | 2020-01-13 00:00:00+02 | 3 | {2,2,3,3,3} | {2,3} 13 | 2020-01-14 00:00:00+02 | 3 | {2,3,3,3} | {2,3} 14 | 2020-01-15 00:00:00+02 | 3 | {3,3,3} | {3} (14 rows) With attached version, you will get the desired results: postgres=# SELECT *, postgres-# array_agg(DISTINCT week) OVER (ORDER BY day ROWS postgres(# BETWEEN 2 PRECEDING AND 2 FOLLOWING) postgres-# FROM t_demo; ordinality | day | week | array_agg ------------+------------------------+------+----------- 1 | 2020-01-02 00:00:00+02 | 1 | {1} 2 | 2020-01-03 00:00:00+02 | 1 | {1} 3 | 2020-01-04 00:00:00+02 | 1 | {1,2} 4 | 2020-01-05 00:00:00+02 | 1 | {1,2} 5 | 2020-01-06 00:00:00+02 | 2 | {1,2} 6 | 2020-01-07 00:00:00+02 | 2 | {1,2} 7 | 2020-01-08 00:00:00+02 | 2 | {2} 8 | 2020-01-09 00:00:00+02 | 2 | {2} 9 | 2020-01-10 00:00:00+02 | 2 | {2} 10 | 2020-01-11 00:00:00+02 | 2 | {2,3} 11 | 2020-01-12 00:00:00+02 | 2 | {2,3} 12 | 2020-01-13 00:00:00+02 | 3 | {2,3} 13 | 2020-01-14 00:00:00+02 | 3 | {2,3} 14 | 2020-01-15 00:00:00+02 | 3 | {3} (14 rows)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 4cc7da268d..66ab18bab6 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -34,9 +34,14 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/nbtree.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_am.h" #include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "common/int.h" #include "executor/executor.h" #include "executor/nodeWindowAgg.h" #include "miscadmin.h" @@ -156,6 +161,12 @@ typedef struct WindowStatePerAggData /* Data local to eval_windowaggregates() */ bool restart; /* need to restart this agg in this cycle? */ + + FmgrInfo distinct_cmpfn; + int16 distinct_typlen; + void *distinctArr; /* Array of DISTINCT values */ + int64 distinctSize; /* Current size of the working distinctArr */ + int64 distinctCount; /* Current number of values in the working distinctArr */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -209,6 +220,24 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + if (perfuncstate->wfunc->windistinct) + { + if (peraggstate->distinctSize > 0) + { + pfree(peraggstate->distinctArr); + peraggstate->distinctCount = 0; + peraggstate->distinctSize = 0; + } + + /* If the type is a fixed length, allocate an initial array of size 16 */ + Assert(peraggstate->distinct_typlen > 0 || peraggstate->distinct_typlen == -1); + if (peraggstate->distinct_typlen > 0) + { + peraggstate->distinctArr = palloc0(sizeof(int64) * 16); + peraggstate->distinctSize = 16; + } + } + /* * If we're using a private aggcontext, we may reset it here. But if the * context is shared, we don't know which other aggregates may still need @@ -278,6 +307,68 @@ advance_windowaggregate(WindowAggState *winstate, i++; } + if (wfuncstate->wfunc->windistinct && !fcinfo->args[1].isnull) + { + MemoryContext tupleContext = MemoryContextSwitchTo(oldContext); + + if (peraggstate->distinct_typlen > 0) + { + for (i = 0; i < peraggstate->distinctCount; i++) + { + if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn, + fcinfo->args[1].value, + *((int64 *)peraggstate->distinctArr + i))) == 0) + return; + } + + if (peraggstate->distinctCount == peraggstate->distinctSize) + { + peraggstate->distinctSize *= 2; + peraggstate->distinctArr = (int64 *) repalloc(peraggstate->distinctArr, + sizeof(int64) * peraggstate->distinctSize); + } + *((int64 *)peraggstate->distinctArr + peraggstate->distinctCount) = fcinfo->args[1].value; + } + else if (peraggstate->distinct_typlen == -1) + { + uint64 len, tmp_len = 0; + int8 *arr = (int8 *) peraggstate->distinctArr; + int8 *arg = (int8 *) DatumGetPointer(fcinfo->args[1].value); + int64 varlen = VARSIZE_ANY(fcinfo->args[1].value); + + for (i = 0; i < peraggstate->distinctCount; i++) + { + uint64 curr_len; + if (DatumGetInt32(FunctionCall2(&peraggstate->distinct_cmpfn, + fcinfo->args[1].value, + PointerGetDatum(arr)) == 0)) + return; + curr_len = VARSIZE_ANY(arr); + tmp_len += curr_len; + arr += curr_len; + } + + if (peraggstate->distinctCount == 0) + { + len = sh_pow2(varlen); + peraggstate->distinctArr = (int8 *) palloc0(len); + } + else + { + len = sh_pow2(tmp_len + varlen); + if (len > peraggstate->distinctSize) + peraggstate->distinctArr = (int8 *) repalloc(peraggstate->distinctArr, + len); + } + arr = (int8 *) peraggstate->distinctArr + tmp_len; + peraggstate->distinctSize = len; + for (i = 0; i < varlen; i++) *arr++ = *arg++; + } + + peraggstate->distinctCount++; + MemoryContextSwitchTo(tupleContext); + } + if (peraggstate->transfn.fn_strict) { /* @@ -428,6 +519,9 @@ advance_windowaggregate_base(WindowAggState *winstate, ExprContext *econtext = winstate->tmpcontext; ExprState *filter = wfuncstate->aggfilter; + if (wfuncstate->wfunc->windistinct) + return false; + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); /* Skip anything FILTERed out */ @@ -2610,8 +2704,6 @@ ExecReScanWindowAgg(WindowAggState *node) /* * initialize_peragg - * - * Almost same as in nodeAgg.c, except we don't support DISTINCT currently. */ static WindowStatePerAggData * initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, @@ -2651,6 +2743,27 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid); aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + if (wfunc->windistinct) + { + Oid opclass, + opfamily, + cmp_func_oid; + HeapTuple typeTuple; + Form_pg_type typeform; + + opclass = GetDefaultOpClass(wfunc->winaggargtype, BTREE_AM_OID); + opfamily = get_opclass_family(opclass); + cmp_func_oid = get_opfamily_proc(opfamily, wfunc->winaggargtype, wfunc->winaggargtype, BTORDER_PROC); + fmgr_info(cmp_func_oid, &peraggstate->distinct_cmpfn); + + typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(wfunc->winaggargtype)); + if (!HeapTupleIsValid(typeTuple)) + elog(ERROR, "cache lookup failed for type %u", wfunc->winaggargtype); + typeform = (Form_pg_type) GETSTRUCT(typeTuple); + peraggstate->distinct_typlen = typeform->typlen; + ReleaseSysCache(typeTuple); + } + /* * Figure out whether we want to use the moving-aggregate implementation, * and collect the right set of fields from the pg_attribute entry. diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 54ad62bb7f..fb8058af24 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1493,6 +1493,8 @@ _copyWindowFunc(const WindowFunc *from) COPY_SCALAR_FIELD(winref); COPY_SCALAR_FIELD(winstar); COPY_SCALAR_FIELD(winagg); + COPY_SCALAR_FIELD(windistinct); + COPY_SCALAR_FIELD(winaggargtype); COPY_LOCATION_FIELD(location); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5b1ba143b1..24a03097a0 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -264,6 +264,8 @@ _equalWindowFunc(const WindowFunc *a, const WindowFunc *b) COMPARE_SCALAR_FIELD(winref); COMPARE_SCALAR_FIELD(winstar); COMPARE_SCALAR_FIELD(winagg); + COMPARE_SCALAR_FIELD(windistinct); + COMPARE_SCALAR_FIELD(winaggargtype); COMPARE_LOCATION_FIELD(location); return true; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index d76fae44b8..c40dbd0cb4 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -1162,6 +1162,8 @@ _outWindowFunc(StringInfo str, const WindowFunc *node) WRITE_UINT_FIELD(winref); WRITE_BOOL_FIELD(winstar); WRITE_BOOL_FIELD(winagg); + WRITE_BOOL_FIELD(windistinct); + WRITE_OID_FIELD(winaggargtype); WRITE_LOCATION_FIELD(location); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 551ce6c41c..aaa327ce87 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -653,6 +653,8 @@ _readWindowFunc(void) READ_UINT_FIELD(winref); READ_BOOL_FIELD(winstar); READ_BOOL_FIELD(winagg); + READ_BOOL_FIELD(windistinct); + READ_OID_FIELD(winaggargtype); READ_LOCATION_FIELD(location); READ_DONE(); diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 2d3ec22407..5cb87c7073 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -2451,6 +2451,8 @@ eval_const_expressions_mutator(Node *node, newexpr->winref = expr->winref; newexpr->winstar = expr->winstar; newexpr->winagg = expr->winagg; + newexpr->windistinct = expr->windistinct; + newexpr->winaggargtype = expr->winaggargtype; newexpr->location = expr->location; return (Node *) newexpr; diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 9c3b6ad916..c663403bfb 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -825,13 +825,21 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, wfunc->location = location; /* - * agg_star is allowed for aggregate functions but distinct isn't + * In a window function, DISTINCT is allowed only for aggregate + * functions and is handled in nodeWindowAgg.c separately from + * when it occurs outside of a window function. */ if (agg_distinct) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("DISTINCT is not implemented for window functions"), - parser_errposition(pstate, location))); + { + if (!wfunc->winagg) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("DISTINCT is only allowed in aggregate window functions"), + parser_errposition(pstate, location))); + wfunc->windistinct = true; + agg_distinct = false; + wfunc->winaggargtype = actual_arg_types[0]; + } /* * Reject attempt to call a parameterless aggregate without (*) diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 116e00bce4..bf4a86b0a9 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9464,7 +9464,11 @@ get_windowfunc_expr(WindowFunc *wfunc, deparse_context *context) if (wfunc->winstar) appendStringInfoChar(buf, '*'); else + { + if (wfunc->windistinct) + appendStringInfoString(buf, "DISTINCT "); get_rule_expr((Node *) wfunc->args, context, true); + } if (wfunc->aggfilter != NULL) { diff --git a/src/include/common/int.h b/src/include/common/int.h index a2972218e7..6fb2885977 100644 --- a/src/include/common/int.h +++ b/src/include/common/int.h @@ -434,4 +434,24 @@ pg_mul_u64_overflow(uint64 a, uint64 b, uint64 *result) #endif } + +/* calculate ceil(log base 2) of num */ +static inline uint64 +sh_log2(uint64 num) +{ + int i; + uint64 limit; + + for (i = 0, limit = 1; limit < num; i++, limit <<= 1) + ; + return i; +} + +/* calculate first power of 2 >= num */ +static inline uint64 +sh_pow2(uint64 num) +{ + return ((uint64) 1) << sh_log2(num); +} + #endif /* COMMON_INT_H */ diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h index 5a6783f653..669d97cac2 100644 --- a/src/include/lib/simplehash.h +++ b/src/include/lib/simplehash.h @@ -57,6 +57,8 @@ * backwards, unless they're empty or already at their optimal position. */ +#include "common/int.h" + /* helpers */ #define SH_MAKE_PREFIX(a) CppConcat(a,_) #define SH_MAKE_NAME(name) SH_MAKE_NAME_(SH_MAKE_PREFIX(SH_PREFIX),name) @@ -215,27 +217,6 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb); #ifndef SIMPLEHASH_H #define SIMPLEHASH_H -/* FIXME: can we move these to a central location? */ - -/* calculate ceil(log base 2) of num */ -static inline uint64 -sh_log2(uint64 num) -{ - int i; - uint64 limit; - - for (i = 0, limit = 1; limit < num; i++, limit <<= 1) - ; - return i; -} - -/* calculate first power of 2 >= num */ -static inline uint64 -sh_pow2(uint64 num) -{ - return ((uint64) 1) << sh_log2(num); -} - #ifdef FRONTEND #define sh_error(...) pg_log_error(__VA_ARGS__) #define sh_log(...) pg_log_info(__VA_ARGS__) diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index d73be2ad46..710eea443f 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -379,6 +379,8 @@ typedef struct WindowFunc Index winref; /* index of associated WindowClause */ bool winstar; /* true if argument list was really '*' */ bool winagg; /* is function a simple aggregate? */ + bool windistinct; /* is function a DISTINCT aggregate? */ + Oid winaggargtype; /* arg type of function, used for DISTINCT */ int location; /* token location, or -1 if unknown */ } WindowFunc;