Alexander Pyhalov писал 2022-01-17 15:26:
Zhihong Yu писал 2022-01-17 11:43:
Hi,
+ FdwScanPrivateConvertors
+ * Generate attinmeta if there are some converters:
I think it would be better if converter is spelled the same way across
the patch.
For build_conv_list():
+ if (IS_UPPER_REL(foreignrel))
You can return NIL for !IS_UPPER_REL(foreignrel) - this would save
indentation for the body of the func.
Hi.
Updated patch.
Sorry, missed attachment.
--
Best regards,
Alexander Pyhalov,
Postgres Professional
From 4408f98a67872efd3c09a3bf89e7cbf88db2a8b2 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <a.pyha...@postgrespro.ru>
Date: Thu, 14 Oct 2021 17:30:34 +0300
Subject: [PATCH] Partial aggregates push down
---
contrib/postgres_fdw/deparse.c | 57 ++++-
.../postgres_fdw/expected/postgres_fdw.out | 185 ++++++++++++++++-
contrib/postgres_fdw/postgres_fdw.c | 196 +++++++++++++++++-
contrib/postgres_fdw/sql/postgres_fdw.sql | 27 ++-
src/backend/catalog/pg_aggregate.c | 28 ++-
src/backend/commands/aggregatecmds.c | 23 +-
src/backend/utils/adt/numeric.c | 96 +++++++++
src/bin/pg_dump/pg_dump.c | 20 +-
src/include/catalog/pg_aggregate.dat | 106 +++++-----
src/include/catalog/pg_aggregate.h | 10 +-
src/include/catalog/pg_proc.dat | 6 +
src/test/regress/expected/oidjoins.out | 1 +
12 files changed, 672 insertions(+), 83 deletions(-)
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index bf12eac0288..6b12b7bf76b 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -197,6 +197,7 @@ static bool is_subquery_var(Var *node, RelOptInfo *foreignrel,
static void get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
int *relno, int *colno);
+static bool partial_agg_ok(Aggref *agg);
/*
* Examine each qual clause in input_conds, and classify them into two groups,
@@ -832,8 +833,10 @@ foreign_expr_walker(Node *node,
if (!IS_UPPER_REL(glob_cxt->foreignrel))
return false;
- /* Only non-split aggregates are pushable. */
- if (agg->aggsplit != AGGSPLIT_SIMPLE)
+ if ((agg->aggsplit != AGGSPLIT_SIMPLE) && (agg->aggsplit != AGGSPLIT_INITIAL_SERIAL))
+ return false;
+
+ if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && !partial_agg_ok(agg))
return false;
/* As usual, it must be shippable. */
@@ -3349,7 +3352,7 @@ deparseAggref(Aggref *node, deparse_expr_cxt *context)
bool use_variadic;
/* Only basic, non-split aggregation accepted. */
- Assert(node->aggsplit == AGGSPLIT_SIMPLE);
+ Assert(node->aggsplit == AGGSPLIT_SIMPLE || node->aggsplit == AGGSPLIT_INITIAL_SERIAL);
/* Check if need to print VARIADIC (cf. ruleutils.c) */
use_variadic = node->aggvariadic;
@@ -3819,3 +3822,51 @@ get_relation_column_alias_ids(Var *node, RelOptInfo *foreignrel,
/* Shouldn't get here */
elog(ERROR, "unexpected expression in subquery output");
}
+
+/*
+ * Check that partial aggregate agg is fine to push down
+ */
+static bool
+partial_agg_ok(Aggref *agg)
+{
+ HeapTuple aggtup;
+ Form_pg_aggregate aggform;
+
+ Assert(agg->aggsplit == AGGSPLIT_INITIAL_SERIAL);
+
+ /* We don't support complex partial aggregates */
+ if (agg->aggdistinct || agg->aggvariadic || agg->aggkind != AGGKIND_NORMAL || agg->aggorder != NIL)
+ return false;
+
+ aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid));
+ if (!HeapTupleIsValid(aggtup))
+ elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid);
+ aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
+
+ /* Only aggregates, marked as pushdown safe, are allowed */
+ if (!aggform->aggpartialpushdownsafe)
+ {
+ ReleaseSysCache(aggtup);
+ return false;
+ }
+
+ /*
+ * If an aggregate requires serialization/deserialization, partial
+ * converter should be defined
+ */
+ if (agg->aggtranstype == INTERNALOID && aggform->aggpartialconverterfn == InvalidOid)
+ {
+ ReleaseSysCache(aggtup);
+ return false;
+ }
+
+ /* In this case we currently don't use converter */
+ if (agg->aggtranstype != INTERNALOID && get_func_rettype(agg->aggfnoid) != agg->aggtranstype)
+ {
+ ReleaseSysCache(aggtup);
+ return false;
+ }
+
+ ReleaseSysCache(aggtup);
+ return true;
+}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 7d6f7d9e3df..549bb9bae61 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9336,13 +9336,13 @@ RESET enable_partitionwise_join;
-- ===================================================================
-- test partitionwise aggregates
-- ===================================================================
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE TABLE pagg_tab (a int, b int, c text, d numeric) PARTITION BY RANGE(a);
CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
-- Create foreign partitions
CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
CREATE FOREIGN TABLE fpagg_tab_p2 PARTITION OF pagg_tab FOR VALUES FROM (10) TO (20) SERVER loopback OPTIONS (table_name 'pagg_tab_p2');
@@ -9401,8 +9401,8 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
-- Should have all the columns in the target list for the given relation
EXPLAIN (VERBOSE, COSTS OFF)
SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
- QUERY PLAN
-------------------------------------------------------------------------
+ QUERY PLAN
+---------------------------------------------------------------------------
Sort
Output: t1.a, (count(((t1.*)::pagg_tab)))
Sort Key: t1.a
@@ -9413,21 +9413,21 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
Filter: (avg(t1.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p1 t1
Output: t1.a, t1.*, t1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p1
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p1
-> HashAggregate
Output: t1_1.a, count(((t1_1.*)::pagg_tab))
Group Key: t1_1.a
Filter: (avg(t1_1.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p2 t1_1
Output: t1_1.a, t1_1.*, t1_1.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p2
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p2
-> HashAggregate
Output: t1_2.a, count(((t1_2.*)::pagg_tab))
Group Key: t1_2.a
Filter: (avg(t1_2.b) < '22'::numeric)
-> Foreign Scan on public.fpagg_tab_p3 t1_2
Output: t1_2.a, t1_2.*, t1_2.b
- Remote SQL: SELECT a, b, c FROM public.pagg_tab_p3
+ Remote SQL: SELECT a, b, c, d FROM public.pagg_tab_p3
(25 rows)
SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
@@ -9463,6 +9463,173 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700
-> Foreign Scan on fpagg_tab_p3 pagg_tab_2
(15 rows)
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------
+ Sort
+ Output: pagg_tab.b, (max(pagg_tab.a)), (count(*))
+ Sort Key: pagg_tab.b
+ -> Finalize HashAggregate
+ Output: pagg_tab.b, max(pagg_tab.a), count(*)
+ Group Key: pagg_tab.b
+ Filter: (sum(pagg_tab.a) < 700)
+ -> Append
+ -> Partial HashAggregate
+ Output: pagg_tab.b, PARTIAL max(pagg_tab.a), PARTIAL count(*), PARTIAL sum(pagg_tab.a)
+ Group Key: pagg_tab.b
+ -> Foreign Scan on public.fpagg_tab_p1 pagg_tab
+ Output: pagg_tab.b, pagg_tab.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p1
+ -> Partial HashAggregate
+ Output: pagg_tab_1.b, PARTIAL max(pagg_tab_1.a), PARTIAL count(*), PARTIAL sum(pagg_tab_1.a)
+ Group Key: pagg_tab_1.b
+ -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_1
+ Output: pagg_tab_1.b, pagg_tab_1.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p2
+ -> Partial HashAggregate
+ Output: pagg_tab_2.b, PARTIAL max(pagg_tab_2.a), PARTIAL count(*), PARTIAL sum(pagg_tab_2.a)
+ Group Key: pagg_tab_2.b
+ -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_2
+ Output: pagg_tab_2.b, pagg_tab_2.a
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p3
+(26 rows)
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------
+ Sort
+ Output: pagg_tab.b, (max(pagg_tab.a)), (count(*))
+ Sort Key: pagg_tab.b
+ -> Finalize HashAggregate
+ Output: pagg_tab.b, max(pagg_tab.a), count(*)
+ Group Key: pagg_tab.b
+ -> Append
+ -> Foreign Scan
+ Output: pagg_tab.b, (PARTIAL max(pagg_tab.a)), (PARTIAL count(*))
+ Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+ Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p1 GROUP BY 1
+ -> Foreign Scan
+ Output: pagg_tab_1.b, (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*))
+ Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+ Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p2 GROUP BY 1
+ -> Foreign Scan
+ Output: pagg_tab_2.b, (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*))
+ Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+ Remote SQL: SELECT b, max(a), count(*) FROM public.pagg_tab_p3 GROUP BY 1
+(19 rows)
+
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+ b | max | count
+----+-----+-------
+ 0 | 20 | 60
+ 1 | 21 | 60
+ 2 | 22 | 60
+ 3 | 23 | 60
+ 4 | 24 | 60
+ 5 | 25 | 60
+ 6 | 26 | 60
+ 7 | 27 | 60
+ 8 | 28 | 60
+ 9 | 29 | 60
+ 10 | 20 | 60
+ 11 | 21 | 60
+ 12 | 22 | 60
+ 13 | 23 | 60
+ 14 | 24 | 60
+ 15 | 25 | 60
+ 16 | 26 | 60
+ 17 | 27 | 60
+ 18 | 28 | 60
+ 19 | 29 | 60
+ 20 | 20 | 60
+ 21 | 21 | 60
+ 22 | 22 | 60
+ 23 | 23 | 60
+ 24 | 24 | 60
+ 25 | 25 | 60
+ 26 | 26 | 60
+ 27 | 27 | 60
+ 28 | 28 | 60
+ 29 | 29 | 60
+ 30 | 20 | 60
+ 31 | 21 | 60
+ 32 | 22 | 60
+ 33 | 23 | 60
+ 34 | 24 | 60
+ 35 | 25 | 60
+ 36 | 26 | 60
+ 37 | 27 | 60
+ 38 | 28 | 60
+ 39 | 29 | 60
+ 40 | 20 | 60
+ 41 | 21 | 60
+ 42 | 22 | 60
+ 43 | 23 | 60
+ 44 | 24 | 60
+ 45 | 25 | 60
+ 46 | 26 | 60
+ 47 | 27 | 60
+ 48 | 28 | 60
+ 49 | 29 | 60
+(50 rows)
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------
+ Finalize Aggregate
+ Output: max(pagg_tab.a), count(*), sum(pagg_tab.d)
+ -> Append
+ -> Foreign Scan
+ Output: (PARTIAL max(pagg_tab.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab.d))
+ Relations: Aggregate on (public.fpagg_tab_p1 pagg_tab)
+ Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p1
+ -> Foreign Scan
+ Output: (PARTIAL max(pagg_tab_1.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_1.d))
+ Relations: Aggregate on (public.fpagg_tab_p2 pagg_tab_1)
+ Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p2
+ -> Foreign Scan
+ Output: (PARTIAL max(pagg_tab_2.a)), (PARTIAL count(*)), (PARTIAL sum(pagg_tab_2.d))
+ Relations: Aggregate on (public.fpagg_tab_p3 pagg_tab_2)
+ Remote SQL: SELECT max(a), count(*), sum(d) FROM public.pagg_tab_p3
+(15 rows)
+
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+ max | count | sum
+-----+-------+-------
+ 29 | 3000 | 58500
+(1 row)
+
+-- Shouldn't try to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+ QUERY PLAN
+---------------------------------------------------------------
+ Aggregate
+ Output: max(pagg_tab.a), count(DISTINCT pagg_tab.b)
+ -> Append
+ -> Foreign Scan on public.fpagg_tab_p1 pagg_tab_1
+ Output: pagg_tab_1.a, pagg_tab_1.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p1
+ -> Foreign Scan on public.fpagg_tab_p2 pagg_tab_2
+ Output: pagg_tab_2.a, pagg_tab_2.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p2
+ -> Foreign Scan on public.fpagg_tab_p3 pagg_tab_3
+ Output: pagg_tab_3.a, pagg_tab_3.b
+ Remote SQL: SELECT a, b FROM public.pagg_tab_p3
+(12 rows)
+
+SELECT max(a), count(distinct b) FROM pagg_tab;
+ max | count
+-----+-------
+ 29 | 50
+(1 row)
+
-- ===================================================================
-- access rights and superuser
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 09a3f5e23cb..f7701162def 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,8 @@
#include "access/sysattr.h"
#include "access/table.h"
#include "catalog/pg_class.h"
+#include "catalog/pg_aggregate.h"
+#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
@@ -48,6 +50,7 @@
#include "utils/rel.h"
#include "utils/sampling.h"
#include "utils/selfuncs.h"
+#include "utils/syscache.h"
PG_MODULE_MAGIC;
@@ -80,7 +83,12 @@ enum FdwScanPrivateIndex
* String describing join i.e. names of relations being joined and types
* of join, added when the scan is join
*/
- FdwScanPrivateRelations
+ FdwScanPrivateRelations,
+
+ /*
+ * List of functions to convert partial aggregate result
+ */
+ FdwScanPrivateConverters
};
/*
@@ -143,6 +151,7 @@ typedef struct PgFdwScanState
/* extracted fdw_private data */
char *query; /* text of SELECT command */
List *retrieved_attrs; /* list of retrieved attribute numbers */
+ List *conv_list; /* list of converters */
/* for remote query execution */
PGconn *conn; /* connection for the scan */
@@ -474,6 +483,7 @@ static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
static void deallocate_query(PgFdwModifyState *fmstate);
+static List *build_conv_list(RelOptInfo *foreignrel);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -510,6 +520,7 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
Relation rel,
AttInMetadata *attinmeta,
List *retrieved_attrs,
+ List *conv_list,
ForeignScanState *fsstate,
MemoryContext temp_context);
static void conversion_error_callback(void *arg);
@@ -517,7 +528,7 @@ static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
JoinPathExtraData *extra);
static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
- Node *havingQual);
+ Node *havingQual, bool partial);
static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
RelOptInfo *rel);
static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
@@ -526,7 +537,8 @@ static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
static void add_foreign_grouping_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *grouped_rel,
- GroupPathExtraData *extra);
+ GroupPathExtraData *extra,
+ bool partial);
static void add_foreign_ordered_paths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *ordered_rel);
@@ -541,7 +553,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
-
/*
* Foreign-data wrapper handler function: return a struct with pointers
* to my callback routines.
@@ -1233,6 +1244,7 @@ postgresGetForeignPlan(PlannerInfo *root,
List *local_exprs = NIL;
List *params_list = NIL;
List *fdw_scan_tlist = NIL;
+ List *fdw_conv_list = NIL;
List *fdw_recheck_quals = NIL;
List *retrieved_attrs;
StringInfoData sql;
@@ -1336,6 +1348,9 @@ postgresGetForeignPlan(PlannerInfo *root,
/* Build the list of columns to be fetched from the foreign server. */
fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+ /* Build the list of converters for partial aggregates. */
+ fdw_conv_list = build_conv_list(foreignrel);
+
/*
* Ensure that the outer plan produces a tuple whose descriptor
* matches our scan tuple slot. Also, remove the local conditions
@@ -1415,6 +1430,8 @@ postgresGetForeignPlan(PlannerInfo *root,
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name));
+ if (IS_UPPER_REL(foreignrel))
+ fdw_private = lappend(fdw_private, fdw_conv_list);
/*
* Create the ForeignScan node for the given relation.
@@ -1433,6 +1450,48 @@ postgresGetForeignPlan(PlannerInfo *root,
outer_plan);
}
+/*
+ * Generate attinmeta if there are some converters:
+ * they are expected to return BYTEA, but real input type is likely different.
+ */
+static AttInMetadata *
+get_rcvd_attinmeta(TupleDesc tupdesc, List *conv_list)
+{
+ TupleDesc rcvd_tupdesc;
+
+ Assert(conv_list != NIL);
+
+ rcvd_tupdesc = CreateTupleDescCopy(tupdesc);
+ for (int i = 0; i < rcvd_tupdesc->natts; i++)
+ {
+ Oid converter = InvalidOid;
+ Form_pg_attribute att = TupleDescAttr(rcvd_tupdesc, i);
+
+ converter = list_nth_oid(conv_list, i);
+ if (converter != InvalidOid)
+ {
+ HeapTuple proctup;
+ Form_pg_proc procform;
+
+ proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(converter));
+
+ if (!HeapTupleIsValid(proctup))
+ elog(ERROR, "cache lookup failed for function %u", converter);
+
+ procform = (Form_pg_proc) GETSTRUCT(proctup);
+
+ if (procform->pronargs != 1)
+ elog(ERROR, "converter %s is expected to have one argument", NameStr(procform->proname));
+
+ att->atttypid = procform->proargtypes.values[0];
+
+ ReleaseSysCache(proctup);
+ }
+ }
+
+ return TupleDescGetAttInMetadata(rcvd_tupdesc);
+}
+
/*
* Construct a tuple descriptor for the scan tuples handled by a foreign join.
*/
@@ -1545,6 +1604,12 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
FdwScanPrivateSelectSql));
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
FdwScanPrivateRetrievedAttrs);
+
+ if (list_length(fsplan->fdw_private) > FdwScanPrivateConverters)
+ fsstate->conv_list = (List *) list_nth(fsplan->fdw_private,
+ FdwScanPrivateConverters);
+
+
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
FdwScanPrivateFetchSize));
@@ -1571,7 +1636,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
}
- fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
+ if (fsstate->conv_list != NIL)
+ fsstate->attinmeta = get_rcvd_attinmeta(fsstate->tupdesc, fsstate->conv_list);
+ else
+ fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
/*
* Prepare for processing of parameters used in remote query, if any.
@@ -3822,6 +3890,7 @@ fetch_more_data(ForeignScanState *node)
fsstate->rel,
fsstate->attinmeta,
fsstate->retrieved_attrs,
+ fsstate->conv_list,
node,
fsstate->temp_cxt);
}
@@ -4309,6 +4378,7 @@ store_returning_result(PgFdwModifyState *fmstate,
fmstate->rel,
fmstate->attinmeta,
fmstate->retrieved_attrs,
+ NIL,
NULL,
fmstate->temp_cxt);
@@ -4603,6 +4673,7 @@ get_returning_data(ForeignScanState *node)
dmstate->rel,
dmstate->attinmeta,
dmstate->retrieved_attrs,
+ NIL,
node,
dmstate->temp_cxt);
ExecStoreHeapTuple(newtup, slot, false);
@@ -5183,6 +5254,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate->rel,
astate->attinmeta,
astate->retrieved_attrs,
+ NIL,
NULL,
astate->temp_cxt);
@@ -6083,7 +6155,7 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
*/
static bool
foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
- Node *havingQual)
+ Node *havingQual, bool partial)
{
Query *query = root->parse;
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
@@ -6097,6 +6169,11 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
if (query->groupingSets)
return false;
+
+ /* It's unsafe to push having statements with partial aggregates */
+ if (partial && havingQual)
+ return false;
+
/* Get the fpinfo of the underlying scan relation. */
ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
@@ -6336,6 +6413,7 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
/* Ignore stages we don't support; and skip any duplicate calls. */
if ((stage != UPPERREL_GROUP_AGG &&
+ stage != UPPERREL_PARTIAL_GROUP_AGG &&
stage != UPPERREL_ORDERED &&
stage != UPPERREL_FINAL) ||
output_rel->fdw_private)
@@ -6350,7 +6428,11 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
{
case UPPERREL_GROUP_AGG:
add_foreign_grouping_paths(root, input_rel, output_rel,
- (GroupPathExtraData *) extra);
+ (GroupPathExtraData *) extra, false);
+ break;
+ case UPPERREL_PARTIAL_GROUP_AGG:
+ add_foreign_grouping_paths(root, input_rel, output_rel,
+ (GroupPathExtraData *) extra, true);
break;
case UPPERREL_ORDERED:
add_foreign_ordered_paths(root, input_rel, output_rel);
@@ -6375,7 +6457,8 @@ postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
static void
add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *grouped_rel,
- GroupPathExtraData *extra)
+ GroupPathExtraData *extra,
+ bool partial)
{
Query *parse = root->parse;
PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
@@ -6391,8 +6474,9 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
!root->hasHavingQual)
return;
- Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
- extra->patype == PARTITIONWISE_AGGREGATE_FULL);
+ Assert(((extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
+ extra->patype == PARTITIONWISE_AGGREGATE_FULL) && !partial) ||
+ (extra->patype == PARTITIONWISE_AGGREGATE_PARTIAL && partial));
/* save the input_rel as outerrel in fpinfo */
fpinfo->outerrel = input_rel;
@@ -6412,7 +6496,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
* Use HAVING qual from extra. In case of child partition, it will have
* translated Vars.
*/
- if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual))
+ if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual, partial))
return;
/*
@@ -7108,6 +7192,30 @@ complete_pending_request(AsyncRequest *areq)
TupIsNull(areq->result) ? 0.0 : 1.0);
}
+/*
+ * Interface to fmgr to call converters
+ */
+static Datum
+call_converter(Oid converter, Oid collation, Datum value, bool isnull, bool *res_isnull)
+{
+ LOCAL_FCINFO(fcinfo, 1);
+ FmgrInfo flinfo;
+ Datum result;
+
+ fmgr_info(converter, &flinfo);
+
+ InitFunctionCallInfoData(*fcinfo, &flinfo, 1, collation, NULL, NULL);
+
+ fcinfo->args[0].value = value;
+ fcinfo->args[0].isnull = isnull;
+
+ result = FunctionCallInvoke(fcinfo);
+
+ if (res_isnull)
+ *res_isnull = fcinfo->isnull;
+ return result;
+}
+
/*
* Create a tuple from the specified row of the PGresult.
*
@@ -7127,6 +7235,7 @@ make_tuple_from_result_row(PGresult *res,
Relation rel,
AttInMetadata *attinmeta,
List *retrieved_attrs,
+ List *conv_list,
ForeignScanState *fsstate,
MemoryContext temp_context)
{
@@ -7209,6 +7318,20 @@ make_tuple_from_result_row(PGresult *res,
valstr,
attinmeta->attioparams[i - 1],
attinmeta->atttypmods[i - 1]);
+ if (conv_list != NIL)
+ {
+ Oid converter = list_nth_oid(conv_list, i - 1);
+
+ if (converter != InvalidOid)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+ bool res_isnull;
+
+ values[i - 1] = call_converter(converter, att->attcollation, values[i - 1], nulls[i - 1], &res_isnull);
+
+ nulls[i - 1] = res_isnull;
+ }
+ }
}
else if (i == SelfItemPointerAttributeNumber)
{
@@ -7472,3 +7595,54 @@ get_batch_size_option(Relation rel)
return batch_size;
}
+
+/*
+ * For UPPER_REL build a list of converters, corresponding to tlist entries.
+ */
+static List *
+build_conv_list(RelOptInfo *foreignrel)
+{
+ List *conv_list = NIL;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
+ ListCell *lc;
+
+ if (!IS_UPPER_REL(foreignrel))
+ return NIL;
+
+ /* For UPPER_REL tlist matches grouped_tlist */
+ foreach(lc, fpinfo->grouped_tlist)
+ {
+ TargetEntry *tlentry = (TargetEntry *) lfirst(lc);
+ Oid converter_oid = InvalidOid;
+
+ if (IsA(tlentry->expr, Aggref))
+ {
+ Aggref *agg = (Aggref *) tlentry->expr;
+
+ if (agg->aggsplit == AGGSPLIT_INITIAL_SERIAL && agg->aggtranstype == INTERNALOID)
+ {
+ HeapTuple aggtup;
+ Form_pg_aggregate aggform;
+
+ aggtup = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(agg->aggfnoid));
+ if (!HeapTupleIsValid(aggtup))
+ elog(ERROR, "cache lookup failed for function %u", agg->aggfnoid);
+
+ aggform = (Form_pg_aggregate) GETSTRUCT(aggtup);
+
+ converter_oid = aggform->aggpartialconverterfn;
+ Assert(converter_oid != InvalidOid);
+
+ ReleaseSysCache(aggtup);
+ }
+ }
+
+ /*
+ * We append InvalidOid to conv_list to preserve one-to-one
+ * mapping between tlist and conv_list members.
+ */
+ conv_list = lappend_oid(conv_list, converter_oid);
+ }
+
+ return conv_list;
+}
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9eb673e3693..61296de6ca7 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2755,15 +2755,15 @@ RESET enable_partitionwise_join;
-- test partitionwise aggregates
-- ===================================================================
-CREATE TABLE pagg_tab (a int, b int, c text) PARTITION BY RANGE(a);
+CREATE TABLE pagg_tab (a int, b int, c text, d numeric) PARTITION BY RANGE(a);
CREATE TABLE pagg_tab_p1 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p2 (LIKE pagg_tab);
CREATE TABLE pagg_tab_p3 (LIKE pagg_tab);
-INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
-INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
-INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000') FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
+INSERT INTO pagg_tab_p1 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 10;
+INSERT INTO pagg_tab_p2 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 20 and (i % 30) >= 10;
+INSERT INTO pagg_tab_p3 SELECT i % 30, i % 50, to_char(i/30, 'FM0000'), i % 40 FROM generate_series(1, 3000) i WHERE (i % 30) < 30 and (i % 30) >= 20;
-- Create foreign partitions
CREATE FOREIGN TABLE fpagg_tab_p1 PARTITION OF pagg_tab FOR VALUES FROM (0) TO (10) SERVER loopback OPTIONS (table_name 'pagg_tab_p1');
@@ -2797,6 +2797,25 @@ SELECT a, count(t1) FROM pagg_tab t1 GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
EXPLAIN (COSTS OFF)
SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+-- It's unsafe to push down having clause when there are partial aggregates
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 ORDER BY 1;
+
+-- Partial aggregates are fine to push down without having clause
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+SELECT b, max(a), count(*) FROM pagg_tab GROUP BY b ORDER BY 1;
+
+-- Partial aggregates are fine to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+SELECT max(a), count(*), sum(d) FROM pagg_tab;
+
+-- Shouldn't try to push down
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT max(a), count(distinct b) FROM pagg_tab;
+SELECT max(a), count(distinct b) FROM pagg_tab;
+
-- ===================================================================
-- access rights and superuser
-- ===================================================================
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index 0d0daa69b34..521794cfbea 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -60,6 +60,7 @@ AggregateCreate(const char *aggName,
List *aggcombinefnName,
List *aggserialfnName,
List *aggdeserialfnName,
+ List *aggpartialconverterfnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
@@ -74,7 +75,8 @@ AggregateCreate(const char *aggName,
int32 aggmTransSpace,
const char *agginitval,
const char *aggminitval,
- char proparallel)
+ char proparallel,
+ bool partialPushdownSafe)
{
Relation aggdesc;
HeapTuple tup;
@@ -88,6 +90,7 @@ AggregateCreate(const char *aggName,
Oid combinefn = InvalidOid; /* can be omitted */
Oid serialfn = InvalidOid; /* can be omitted */
Oid deserialfn = InvalidOid; /* can be omitted */
+ Oid partialconverterfn = InvalidOid; /* can be omitted */
Oid mtransfn = InvalidOid; /* can be omitted */
Oid minvtransfn = InvalidOid; /* can be omitted */
Oid mfinalfn = InvalidOid; /* can be omitted */
@@ -569,6 +572,27 @@ AggregateCreate(const char *aggName,
format_type_be(finaltype))));
}
+ /*
+ * Validate the partial converter, if present.
+ */
+ if (aggpartialconverterfnName)
+ {
+ fnArgs[0] = finaltype;
+
+ partialconverterfn = lookup_agg_function(aggpartialconverterfnName, 1,
+ fnArgs, InvalidOid,
+ &rettype);
+
+ if (rettype != BYTEAOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("return type of partial serialization function %s is not %s",
+ NameListToString(aggserialfnName),
+ format_type_be(BYTEAOID))));
+
+ }
+
+
/* handle sortop, if supplied */
if (aggsortopName)
{
@@ -664,6 +688,7 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggcombinefn - 1] = ObjectIdGetDatum(combinefn);
values[Anum_pg_aggregate_aggserialfn - 1] = ObjectIdGetDatum(serialfn);
values[Anum_pg_aggregate_aggdeserialfn - 1] = ObjectIdGetDatum(deserialfn);
+ values[Anum_pg_aggregate_aggpartialconverterfn - 1] = ObjectIdGetDatum(partialconverterfn);
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
@@ -676,6 +701,7 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType);
values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace);
+ values[Anum_pg_aggregate_aggpartialpushdownsafe - 1] = BoolGetDatum(partialPushdownSafe);
if (agginitval)
values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
else
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index 010eca7340a..414fc09a263 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -69,11 +69,13 @@ DefineAggregate(ParseState *pstate,
List *combinefuncName = NIL;
List *serialfuncName = NIL;
List *deserialfuncName = NIL;
+ List *partialconverterfuncName = NIL;
List *mtransfuncName = NIL;
List *minvtransfuncName = NIL;
List *mfinalfuncName = NIL;
bool finalfuncExtraArgs = false;
bool mfinalfuncExtraArgs = false;
+ bool partialPushdownSafe = false;
char finalfuncModify = 0;
char mfinalfuncModify = 0;
List *sortoperatorName = NIL;
@@ -142,6 +144,8 @@ DefineAggregate(ParseState *pstate,
serialfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "deserialfunc") == 0)
deserialfuncName = defGetQualifiedName(defel);
+ else if (strcmp(defel->defname, "partialconverterfunc") == 0)
+ partialconverterfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "msfunc") == 0)
mtransfuncName = defGetQualifiedName(defel);
else if (strcmp(defel->defname, "minvfunc") == 0)
@@ -189,6 +193,8 @@ DefineAggregate(ParseState *pstate,
minitval = defGetString(defel);
else if (strcmp(defel->defname, "parallel") == 0)
parallel = defGetString(defel);
+ else if (strcmp(defel->defname, "partial_pushdown_safe") == 0)
+ partialPushdownSafe = defGetBoolean(defel);
else
ereport(WARNING,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -372,6 +378,18 @@ DefineAggregate(ParseState *pstate,
errmsg("must specify both or neither of serialization and deserialization functions")));
}
+ if (partialconverterfuncName)
+ {
+ /*
+ * Converter is only needed/allowed for transtype INTERNAL.
+ */
+ if (transTypeId != INTERNALOID)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("partial converters may be specified only when the aggregate transition data type is %s",
+ format_type_be(INTERNALOID))));
+ }
+
/*
* If a moving-aggregate transtype is specified, look that up. Same
* restrictions as for transtype.
@@ -457,6 +475,8 @@ DefineAggregate(ParseState *pstate,
combinefuncName, /* combine function name */
serialfuncName, /* serial function name */
deserialfuncName, /* deserial function name */
+ partialconverterfuncName, /* partial converter function
+ * name */
mtransfuncName, /* fwd trans function name */
minvtransfuncName, /* inv trans function name */
mfinalfuncName, /* final function name */
@@ -471,7 +491,8 @@ DefineAggregate(ParseState *pstate,
mtransSpace, /* transition space */
initval, /* initial condition */
minitval, /* initial condition */
- proparallel); /* parallel safe? */
+ proparallel, /* parallel safe? */
+ partialPushdownSafe); /* partial pushdown safe? */
}
/*
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 3208789f75e..6998093f082 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -5769,6 +5769,52 @@ int8_avg_deserialize(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(result);
}
+/*
+ * int8_sum_to_internal_serialize
+ * Convert int8 argument to serialized internal representation
+ */
+Datum
+int8_sum_to_internal_serialize(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state = NULL;
+ StringInfoData buf;
+ bytea *result;
+ NumericVar tmp_var;
+
+ state = (PolyNumAggState *) palloc0(sizeof(PolyNumAggState));
+ state->calcSumX2 = false;
+
+ if (!PG_ARGISNULL(0))
+ {
+#ifdef HAVE_INT128
+ do_int128_accum(state, (int128) PG_GETARG_INT64(0));
+#else
+ do_numeric_accum(state, int64_to_numeric(PG_GETARG_INT64(0)));
+#endif
+ }
+
+ init_var(&tmp_var);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+#ifdef HAVE_INT128
+ int128_to_numericvar(state->sumX, &tmp_var);
+#else
+ accum_sum_final(&state->sumX, &tmp_var);
+#endif
+ numericvar_serialize(&buf, &tmp_var);
+
+ result = pq_endtypsend(&buf);
+
+ free_var(&tmp_var);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
/*
* Inverse transition functions to go with the above.
*/
@@ -5994,6 +6040,56 @@ numeric_sum(PG_FUNCTION_ARGS)
PG_RETURN_NUMERIC(result);
}
+/*
+ * numeric_sum_to_internal_serialize
+ * Convert numeric argument to serialized internal representation
+ */
+Datum
+numeric_sum_to_internal_serialize(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state = NULL;
+ StringInfoData buf;
+ bytea *result;
+ NumericVar tmp_var;
+
+ state = makeNumericAggStateCurrentContext(false);
+
+ if (!PG_ARGISNULL(0))
+ do_numeric_accum(state, PG_GETARG_NUMERIC(0));
+
+ init_var(&tmp_var);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ accum_sum_final(&state->sumX, &tmp_var);
+ numericvar_serialize(&buf, &tmp_var);
+
+ /* maxScale */
+ pq_sendint32(&buf, state->maxScale);
+
+ /* maxScaleCount */
+ pq_sendint64(&buf, state->maxScaleCount);
+
+ /* NaNcount */
+ pq_sendint64(&buf, state->NaNcount);
+
+ /* pInfcount */
+ pq_sendint64(&buf, state->pInfcount);
+
+ /* nInfcount */
+ pq_sendint64(&buf, state->nInfcount);
+
+ result = pq_endtypsend(&buf);
+
+ free_var(&tmp_var);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
/*
* Workhorse routine for the standard deviance and variance
* aggregates. 'state' is aggregate's transition state.
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c5f231118bd..5ca18bc7a9a 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -13114,11 +13114,13 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
const char *aggcombinefn;
const char *aggserialfn;
const char *aggdeserialfn;
+ const char *aggpartialconverterfn;
const char *aggmtransfn;
const char *aggminvtransfn;
const char *aggmfinalfn;
bool aggfinalextra;
bool aggmfinalextra;
+ bool aggpartialpushdownsafe;
char aggfinalmodify;
char aggmfinalmodify;
const char *aggsortop;
@@ -13199,11 +13201,19 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
if (fout->remoteVersion >= 110000)
appendPQExpBufferStr(query,
"aggfinalmodify,\n"
- "aggmfinalmodify\n");
+ "aggmfinalmodify,\n");
else
appendPQExpBufferStr(query,
"'0' AS aggfinalmodify,\n"
- "'0' AS aggmfinalmodify\n");
+ "'0' AS aggmfinalmodify,\n");
+ if (fout->remoteVersion >= 150000)
+ appendPQExpBufferStr(query,
+ "aggpartialconverterfn,\n"
+ "aggpartialpushdownsafe\n");
+ else
+ appendPQExpBufferStr(query,
+ "'-' AS aggpartialconverterfn,\n"
+ "false as aggpartialpushdownsafe\n");
appendPQExpBufferStr(query,
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
@@ -13229,6 +13239,8 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
aggcombinefn = PQgetvalue(res, 0, PQfnumber(res, "aggcombinefn"));
aggserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggserialfn"));
aggdeserialfn = PQgetvalue(res, 0, PQfnumber(res, "aggdeserialfn"));
+ aggpartialconverterfn = PQgetvalue(res, 0, PQfnumber(res, "aggpartialconverterfn"));
+ aggpartialpushdownsafe = (PQgetvalue(res, 0, PQfnumber(res, "aggpartialpushdownsafe"))[0] == 't');
aggmtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggmtransfn"));
aggminvtransfn = PQgetvalue(res, 0, PQfnumber(res, "aggminvtransfn"));
aggmfinalfn = PQgetvalue(res, 0, PQfnumber(res, "aggmfinalfn"));
@@ -13318,6 +13330,10 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
if (strcmp(aggdeserialfn, "-") != 0)
appendPQExpBuffer(details, ",\n DESERIALFUNC = %s", aggdeserialfn);
+ if (strcmp(aggpartialconverterfn, "-") != 0)
+ appendPQExpBuffer(details, ",\n PARTIALCONVERTERFUNC = %s", aggpartialconverterfn);
+ if (aggpartialpushdownsafe)
+ appendPQExpBufferStr(details, ",\n PARTIAL_PUSHDOWN_SAFE");
if (strcmp(aggmtransfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s",
diff --git a/src/include/catalog/pg_aggregate.dat b/src/include/catalog/pg_aggregate.dat
index 137f6eef695..627ded8d2b6 100644
--- a/src/include/catalog/pg_aggregate.dat
+++ b/src/include/catalog/pg_aggregate.dat
@@ -56,26 +56,27 @@
aggserialfn => 'int8_avg_serialize', aggdeserialfn => 'int8_avg_deserialize',
aggmtransfn => 'int8_avg_accum', aggminvtransfn => 'int8_avg_accum_inv',
aggmfinalfn => 'numeric_poly_sum', aggtranstype => 'internal',
- aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48' },
+ aggtransspace => '48', aggmtranstype => 'internal', aggmtransspace => '48',
+ aggpartialconverterfn => 'int8_sum_to_internal_serialize', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(int4)', aggtransfn => 'int4_sum', aggcombinefn => 'int8pl',
aggmtransfn => 'int4_avg_accum', aggminvtransfn => 'int4_avg_accum_inv',
aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
- aggmtranstype => '_int8', aggminitval => '{0,0}' },
+ aggmtranstype => '_int8', aggminitval => '{0,0}', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(int2)', aggtransfn => 'int2_sum', aggcombinefn => 'int8pl',
aggmtransfn => 'int2_avg_accum', aggminvtransfn => 'int2_avg_accum_inv',
aggmfinalfn => 'int2int4_sum', aggtranstype => 'int8',
- aggmtranstype => '_int8', aggminitval => '{0,0}' },
+ aggmtranstype => '_int8', aggminitval => '{0,0}', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(float4)', aggtransfn => 'float4pl',
- aggcombinefn => 'float4pl', aggtranstype => 'float4' },
+ aggcombinefn => 'float4pl', aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(float8)', aggtransfn => 'float8pl',
- aggcombinefn => 'float8pl', aggtranstype => 'float8' },
+ aggcombinefn => 'float8pl', aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(money)', aggtransfn => 'cash_pl', aggcombinefn => 'cash_pl',
aggmtransfn => 'cash_pl', aggminvtransfn => 'cash_mi',
- aggtranstype => 'money', aggmtranstype => 'money' },
+ aggtranstype => 'money', aggmtranstype => 'money', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(interval)', aggtransfn => 'interval_pl',
aggcombinefn => 'interval_pl', aggmtransfn => 'interval_pl',
aggminvtransfn => 'interval_mi', aggtranstype => 'interval',
- aggmtranstype => 'interval' },
+ aggmtranstype => 'interval', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'sum(numeric)', aggtransfn => 'numeric_avg_accum',
aggfinalfn => 'numeric_sum', aggcombinefn => 'numeric_avg_combine',
aggserialfn => 'numeric_avg_serialize',
@@ -83,146 +84,149 @@
aggmtransfn => 'numeric_avg_accum', aggminvtransfn => 'numeric_accum_inv',
aggmfinalfn => 'numeric_sum', aggtranstype => 'internal',
aggtransspace => '128', aggmtranstype => 'internal',
- aggmtransspace => '128' },
+ aggmtransspace => '128', aggpartialconverterfn => 'numeric_sum_to_internal_serialize', aggpartialpushdownsafe => 't' },
# max
{ aggfnoid => 'max(int8)', aggtransfn => 'int8larger',
aggcombinefn => 'int8larger', aggsortop => '>(int8,int8)',
- aggtranstype => 'int8' },
+ aggtranstype => 'int8', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(int4)', aggtransfn => 'int4larger',
aggcombinefn => 'int4larger', aggsortop => '>(int4,int4)',
- aggtranstype => 'int4' },
+ aggtranstype => 'int4', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(int2)', aggtransfn => 'int2larger',
aggcombinefn => 'int2larger', aggsortop => '>(int2,int2)',
- aggtranstype => 'int2' },
+ aggtranstype => 'int2', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(oid)', aggtransfn => 'oidlarger',
aggcombinefn => 'oidlarger', aggsortop => '>(oid,oid)',
- aggtranstype => 'oid' },
+ aggtranstype => 'oid', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(float4)', aggtransfn => 'float4larger',
aggcombinefn => 'float4larger', aggsortop => '>(float4,float4)',
- aggtranstype => 'float4' },
+ aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(float8)', aggtransfn => 'float8larger',
aggcombinefn => 'float8larger', aggsortop => '>(float8,float8)',
- aggtranstype => 'float8' },
+ aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(date)', aggtransfn => 'date_larger',
aggcombinefn => 'date_larger', aggsortop => '>(date,date)',
- aggtranstype => 'date' },
+ aggtranstype => 'date', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(time)', aggtransfn => 'time_larger',
aggcombinefn => 'time_larger', aggsortop => '>(time,time)',
- aggtranstype => 'time' },
+ aggtranstype => 'time', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(timetz)', aggtransfn => 'timetz_larger',
aggcombinefn => 'timetz_larger', aggsortop => '>(timetz,timetz)',
- aggtranstype => 'timetz' },
+ aggtranstype => 'timetz', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(money)', aggtransfn => 'cashlarger',
aggcombinefn => 'cashlarger', aggsortop => '>(money,money)',
- aggtranstype => 'money' },
+ aggtranstype => 'money', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(timestamp)', aggtransfn => 'timestamp_larger',
aggcombinefn => 'timestamp_larger', aggsortop => '>(timestamp,timestamp)',
- aggtranstype => 'timestamp' },
+ aggtranstype => 'timestamp', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(timestamptz)', aggtransfn => 'timestamptz_larger',
aggcombinefn => 'timestamptz_larger',
aggsortop => '>(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
{ aggfnoid => 'max(interval)', aggtransfn => 'interval_larger',
aggcombinefn => 'interval_larger', aggsortop => '>(interval,interval)',
- aggtranstype => 'interval' },
+ aggtranstype => 'interval', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(text)', aggtransfn => 'text_larger',
aggcombinefn => 'text_larger', aggsortop => '>(text,text)',
- aggtranstype => 'text' },
+ aggtranstype => 'text', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(numeric)', aggtransfn => 'numeric_larger',
aggcombinefn => 'numeric_larger', aggsortop => '>(numeric,numeric)',
- aggtranstype => 'numeric' },
+ aggtranstype => 'numeric', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(anyarray)', aggtransfn => 'array_larger',
aggcombinefn => 'array_larger', aggsortop => '>(anyarray,anyarray)',
- aggtranstype => 'anyarray' },
+ aggtranstype => 'anyarray', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(bpchar)', aggtransfn => 'bpchar_larger',
aggcombinefn => 'bpchar_larger', aggsortop => '>(bpchar,bpchar)',
- aggtranstype => 'bpchar' },
+ aggtranstype => 'bpchar', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(tid)', aggtransfn => 'tidlarger',
aggcombinefn => 'tidlarger', aggsortop => '>(tid,tid)',
- aggtranstype => 'tid' },
+ aggtranstype => 'tid', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(anyenum)', aggtransfn => 'enum_larger',
aggcombinefn => 'enum_larger', aggsortop => '>(anyenum,anyenum)',
- aggtranstype => 'anyenum' },
+ aggtranstype => 'anyenum', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(inet)', aggtransfn => 'network_larger',
aggcombinefn => 'network_larger', aggsortop => '>(inet,inet)',
- aggtranstype => 'inet' },
+ aggtranstype => 'inet', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'max(pg_lsn)', aggtransfn => 'pg_lsn_larger',
aggcombinefn => 'pg_lsn_larger', aggsortop => '>(pg_lsn,pg_lsn)',
- aggtranstype => 'pg_lsn' },
+ aggtranstype => 'pg_lsn', aggpartialpushdownsafe => 't' },
# min
{ aggfnoid => 'min(int8)', aggtransfn => 'int8smaller',
aggcombinefn => 'int8smaller', aggsortop => '<(int8,int8)',
- aggtranstype => 'int8' },
+ aggtranstype => 'int8', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(int4)', aggtransfn => 'int4smaller',
aggcombinefn => 'int4smaller', aggsortop => '<(int4,int4)',
- aggtranstype => 'int4' },
+ aggtranstype => 'int4', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(int2)', aggtransfn => 'int2smaller',
aggcombinefn => 'int2smaller', aggsortop => '<(int2,int2)',
- aggtranstype => 'int2' },
+ aggtranstype => 'int2', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(oid)', aggtransfn => 'oidsmaller',
aggcombinefn => 'oidsmaller', aggsortop => '<(oid,oid)',
- aggtranstype => 'oid' },
+ aggtranstype => 'oid', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(float4)', aggtransfn => 'float4smaller',
aggcombinefn => 'float4smaller', aggsortop => '<(float4,float4)',
- aggtranstype => 'float4' },
+ aggtranstype => 'float4', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(float8)', aggtransfn => 'float8smaller',
aggcombinefn => 'float8smaller', aggsortop => '<(float8,float8)',
- aggtranstype => 'float8' },
+ aggtranstype => 'float8', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(date)', aggtransfn => 'date_smaller',
aggcombinefn => 'date_smaller', aggsortop => '<(date,date)',
- aggtranstype => 'date' },
+ aggtranstype => 'date', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(time)', aggtransfn => 'time_smaller',
aggcombinefn => 'time_smaller', aggsortop => '<(time,time)',
- aggtranstype => 'time' },
+ aggtranstype => 'time', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(timetz)', aggtransfn => 'timetz_smaller',
aggcombinefn => 'timetz_smaller', aggsortop => '<(timetz,timetz)',
- aggtranstype => 'timetz' },
+ aggtranstype => 'timetz', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(money)', aggtransfn => 'cashsmaller',
aggcombinefn => 'cashsmaller', aggsortop => '<(money,money)',
- aggtranstype => 'money' },
+ aggtranstype => 'money', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(timestamp)', aggtransfn => 'timestamp_smaller',
aggcombinefn => 'timestamp_smaller', aggsortop => '<(timestamp,timestamp)',
- aggtranstype => 'timestamp' },
+ aggtranstype => 'timestamp', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(timestamptz)', aggtransfn => 'timestamptz_smaller',
aggcombinefn => 'timestamptz_smaller',
- aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz' },
+ aggsortop => '<(timestamptz,timestamptz)', aggtranstype => 'timestamptz',
+ aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(interval)', aggtransfn => 'interval_smaller',
aggcombinefn => 'interval_smaller', aggsortop => '<(interval,interval)',
- aggtranstype => 'interval' },
+ aggtranstype => 'interval', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(text)', aggtransfn => 'text_smaller',
aggcombinefn => 'text_smaller', aggsortop => '<(text,text)',
- aggtranstype => 'text' },
+ aggtranstype => 'text', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(numeric)', aggtransfn => 'numeric_smaller',
aggcombinefn => 'numeric_smaller', aggsortop => '<(numeric,numeric)',
- aggtranstype => 'numeric' },
+ aggtranstype => 'numeric', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(anyarray)', aggtransfn => 'array_smaller',
aggcombinefn => 'array_smaller', aggsortop => '<(anyarray,anyarray)',
- aggtranstype => 'anyarray' },
+ aggtranstype => 'anyarray', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(bpchar)', aggtransfn => 'bpchar_smaller',
aggcombinefn => 'bpchar_smaller', aggsortop => '<(bpchar,bpchar)',
- aggtranstype => 'bpchar' },
+ aggtranstype => 'bpchar', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(tid)', aggtransfn => 'tidsmaller',
aggcombinefn => 'tidsmaller', aggsortop => '<(tid,tid)',
- aggtranstype => 'tid' },
+ aggtranstype => 'tid', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(anyenum)', aggtransfn => 'enum_smaller',
aggcombinefn => 'enum_smaller', aggsortop => '<(anyenum,anyenum)',
- aggtranstype => 'anyenum' },
+ aggtranstype => 'anyenum', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(inet)', aggtransfn => 'network_smaller',
aggcombinefn => 'network_smaller', aggsortop => '<(inet,inet)',
- aggtranstype => 'inet' },
+ aggtranstype => 'inet', aggpartialpushdownsafe => 't' },
{ aggfnoid => 'min(pg_lsn)', aggtransfn => 'pg_lsn_smaller',
aggcombinefn => 'pg_lsn_smaller', aggsortop => '<(pg_lsn,pg_lsn)',
- aggtranstype => 'pg_lsn' },
+ aggtranstype => 'pg_lsn' , aggpartialpushdownsafe => 't'},
# count
{ aggfnoid => 'count(any)', aggtransfn => 'int8inc_any',
aggcombinefn => 'int8pl', aggmtransfn => 'int8inc_any',
aggminvtransfn => 'int8dec_any', aggtranstype => 'int8',
- aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+ aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+ aggpartialpushdownsafe => 't' },
{ aggfnoid => 'count()', aggtransfn => 'int8inc', aggcombinefn => 'int8pl',
aggmtransfn => 'int8inc', aggminvtransfn => 'int8dec', aggtranstype => 'int8',
- aggmtranstype => 'int8', agginitval => '0', aggminitval => '0' },
+ aggmtranstype => 'int8', agginitval => '0', aggminitval => '0',
+ aggpartialpushdownsafe => 't' },
# var_pop
{ aggfnoid => 'var_pop(int8)', aggtransfn => 'int8_accum',
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 593da9f76a8..3b8e541532d 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -55,6 +55,9 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
/* function to convert bytea to transtype (0 if none) */
regproc aggdeserialfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
+ /* function to convert aggregate result to bytea (0 if none) */
+ regproc aggpartialconverterfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
+
/* forward function for moving-aggregate mode (0 if none) */
regproc aggmtransfn BKI_DEFAULT(-) BKI_LOOKUP_OPT(pg_proc);
@@ -91,6 +94,9 @@ CATALOG(pg_aggregate,2600,AggregateRelationId)
/* estimated size of moving-agg state (0 for default est) */
int32 aggmtransspace BKI_DEFAULT(0);
+ /* true if partial aggregate is fine to push down */
+ bool aggpartialpushdownsafe BKI_DEFAULT(f);
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* initial value for transition state (can be NULL) */
@@ -161,6 +167,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
List *aggcombinefnName,
List *aggserialfnName,
List *aggdeserialfnName,
+ List *aggpartialconverterfnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
@@ -175,6 +182,7 @@ extern ObjectAddress AggregateCreate(const char *aggName,
int32 aggmTransSpace,
const char *agginitval,
const char *aggminitval,
- char proparallel);
+ char proparallel,
+ bool partialPushdownSafe);
#endif /* PG_AGGREGATE_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d6bf1f3274b..ebf63e72443 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -4784,6 +4784,12 @@
{ oid => '2786', descr => 'aggregate serial function',
proname => 'int8_avg_serialize', prorettype => 'bytea',
proargtypes => 'internal', prosrc => 'int8_avg_serialize' },
+{ oid => '9630', descr => 'partial aggregate converter function',
+ proname => 'int8_sum_to_internal_serialize', prorettype => 'bytea',
+ proargtypes => 'int8', prosrc => 'int8_sum_to_internal_serialize' },
+{ oid => '9631', descr => 'partial aggregate converter function',
+ proname => 'numeric_sum_to_internal_serialize', prorettype => 'bytea',
+ proargtypes => 'numeric', prosrc => 'numeric_sum_to_internal_serialize' },
{ oid => '2787', descr => 'aggregate deserial function',
proname => 'int8_avg_deserialize', prorettype => 'internal',
proargtypes => 'bytea internal', prosrc => 'int8_avg_deserialize' },
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be3..864151bbca3 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -145,6 +145,7 @@ NOTICE: checking pg_aggregate {aggfinalfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggcombinefn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggserialfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggdeserialfn} => pg_proc {oid}
+NOTICE: checking pg_aggregate {aggpartialconverterfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggmtransfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggminvtransfn} => pg_proc {oid}
NOTICE: checking pg_aggregate {aggmfinalfn} => pg_proc {oid}
--
2.25.1