From 3840573ab36746b09a9f944c6db95013c824bc93 Mon Sep 17 00:00:00 2001
From: David Rowley <dgrowley@gmail.com>
Date: Thu, 17 Mar 2016 18:02:32 +1300
Subject: [PATCH] Allow aggregation to happen in parallel

This modifies the grouping planner to allow it to generate Paths for
parallel aggregation, when possible.
---
 src/backend/executor/execQual.c         |  19 +-
 src/backend/nodes/copyfuncs.c           |   2 +
 src/backend/nodes/equalfuncs.c          |   2 +
 src/backend/nodes/nodeFuncs.c           |   8 +-
 src/backend/nodes/outfuncs.c            |   2 +
 src/backend/nodes/readfuncs.c           |   2 +
 src/backend/optimizer/path/allpaths.c   |   2 +-
 src/backend/optimizer/path/costsize.c   |  12 +-
 src/backend/optimizer/plan/createplan.c |   4 +-
 src/backend/optimizer/plan/planner.c    | 461 +++++++++++++++++++++++++++-----
 src/backend/optimizer/plan/setrefs.c    | 253 +++++++++++++++++-
 src/backend/optimizer/prep/prepunion.c  |   4 +-
 src/backend/optimizer/util/clauses.c    |  88 ++++++
 src/backend/optimizer/util/pathnode.c   |  14 +-
 src/backend/optimizer/util/tlist.c      |  46 ++++
 src/include/nodes/primnodes.h           |  19 ++
 src/include/nodes/relation.h            |   2 +
 src/include/optimizer/clauses.h         |  20 ++
 src/include/optimizer/cost.h            |   2 +-
 src/include/optimizer/pathnode.h        |   7 +-
 src/include/optimizer/tlist.h           |   1 +
 21 files changed, 887 insertions(+), 83 deletions(-)

diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c
index 778b6c1..4029721 100644
--- a/src/backend/executor/execQual.c
+++ b/src/backend/executor/execQual.c
@@ -4510,20 +4510,25 @@ ExecInitExpr(Expr *node, PlanState *parent)
 		case T_Aggref:
 			{
 				AggrefExprState *astate = makeNode(AggrefExprState);
+				AggState   *aggstate = (AggState *) parent;
+				Aggref   *aggref = (Aggref *) node;
 
 				astate->xprstate.evalfunc = (ExprStateEvalFunc) ExecEvalAggref;
-				if (parent && IsA(parent, AggState))
+				if (!aggstate || !IsA(aggstate, AggState))
 				{
-					AggState   *aggstate = (AggState *) parent;
-
-					aggstate->aggs = lcons(astate, aggstate->aggs);
-					aggstate->numaggs++;
+					/* planner messed up */
+					elog(ERROR, "Aggref found in non-Agg plan node");
 				}
-				else
+				if (aggref->aggpartial == aggstate->finalizeAggs)
 				{
 					/* planner messed up */
-					elog(ERROR, "Aggref found in non-Agg plan node");
+					if (aggref->aggpartial)
+						elog(ERROR, "Partial type Aggref found in FinalizeAgg plan node");
+					else
+						elog(ERROR, "Non-Partial type Aggref found in Non-FinalizeAgg plan node");
 				}
+				aggstate->aggs = lcons(astate, aggstate->aggs);
+				aggstate->numaggs++;
 				state = (ExprState *) astate;
 			}
 			break;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df7c2fa..d502aef 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1231,6 +1231,7 @@ _copyAggref(const Aggref *from)
 
 	COPY_SCALAR_FIELD(aggfnoid);
 	COPY_SCALAR_FIELD(aggtype);
+	COPY_SCALAR_FIELD(aggpartialtype);
 	COPY_SCALAR_FIELD(aggcollid);
 	COPY_SCALAR_FIELD(inputcollid);
 	COPY_NODE_FIELD(aggdirectargs);
@@ -1240,6 +1241,7 @@ _copyAggref(const Aggref *from)
 	COPY_NODE_FIELD(aggfilter);
 	COPY_SCALAR_FIELD(aggstar);
 	COPY_SCALAR_FIELD(aggvariadic);
+	COPY_SCALAR_FIELD(aggpartial);
 	COPY_SCALAR_FIELD(aggkind);
 	COPY_SCALAR_FIELD(agglevelsup);
 	COPY_LOCATION_FIELD(location);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index b9c3959..bf29227 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -192,6 +192,7 @@ _equalAggref(const Aggref *a, const Aggref *b)
 {
 	COMPARE_SCALAR_FIELD(aggfnoid);
 	COMPARE_SCALAR_FIELD(aggtype);
+	COMPARE_SCALAR_FIELD(aggpartialtype);
 	COMPARE_SCALAR_FIELD(aggcollid);
 	COMPARE_SCALAR_FIELD(inputcollid);
 	COMPARE_NODE_FIELD(aggdirectargs);
@@ -201,6 +202,7 @@ _equalAggref(const Aggref *a, const Aggref *b)
 	COMPARE_NODE_FIELD(aggfilter);
 	COMPARE_SCALAR_FIELD(aggstar);
 	COMPARE_SCALAR_FIELD(aggvariadic);
+	COMPARE_SCALAR_FIELD(aggpartial);
 	COMPARE_SCALAR_FIELD(aggkind);
 	COMPARE_SCALAR_FIELD(agglevelsup);
 	COMPARE_LOCATION_FIELD(location);
diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c
index b4ea440..23a8ec8 100644
--- a/src/backend/nodes/nodeFuncs.c
+++ b/src/backend/nodes/nodeFuncs.c
@@ -57,7 +57,13 @@ exprType(const Node *expr)
 			type = ((const Param *) expr)->paramtype;
 			break;
 		case T_Aggref:
-			type = ((const Aggref *) expr)->aggtype;
+			{
+				const Aggref *aggref = (const Aggref *) expr;
+				if (aggref->aggpartial)
+					type = aggref->aggpartialtype;
+				else
+					type = aggref->aggtype;
+			}
 			break;
 		case T_GroupingFunc:
 			type = INT4OID;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 548a3b9..6e2a6e4 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1031,6 +1031,7 @@ _outAggref(StringInfo str, const Aggref *node)
 
 	WRITE_OID_FIELD(aggfnoid);
 	WRITE_OID_FIELD(aggtype);
+	WRITE_OID_FIELD(aggpartialtype);
 	WRITE_OID_FIELD(aggcollid);
 	WRITE_OID_FIELD(inputcollid);
 	WRITE_NODE_FIELD(aggdirectargs);
@@ -1040,6 +1041,7 @@ _outAggref(StringInfo str, const Aggref *node)
 	WRITE_NODE_FIELD(aggfilter);
 	WRITE_BOOL_FIELD(aggstar);
 	WRITE_BOOL_FIELD(aggvariadic);
+	WRITE_BOOL_FIELD(aggpartial);
 	WRITE_CHAR_FIELD(aggkind);
 	WRITE_UINT_FIELD(agglevelsup);
 	WRITE_LOCATION_FIELD(location);
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index a2c2243..61be6c5 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -552,6 +552,7 @@ _readAggref(void)
 
 	READ_OID_FIELD(aggfnoid);
 	READ_OID_FIELD(aggtype);
+	READ_OID_FIELD(aggpartialtype);
 	READ_OID_FIELD(aggcollid);
 	READ_OID_FIELD(inputcollid);
 	READ_NODE_FIELD(aggdirectargs);
@@ -561,6 +562,7 @@ _readAggref(void)
 	READ_NODE_FIELD(aggfilter);
 	READ_BOOL_FIELD(aggstar);
 	READ_BOOL_FIELD(aggvariadic);
+	READ_BOOL_FIELD(aggpartial);
 	READ_CHAR_FIELD(aggkind);
 	READ_UINT_FIELD(agglevelsup);
 	READ_LOCATION_FIELD(location);
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 4f60b85..fe05e28 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -1968,7 +1968,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
 	simple_gather_path = (Path *)
-		create_gather_path(root, rel, cheapest_partial_path, NULL);
+		create_gather_path(root, rel, cheapest_partial_path, NULL, NULL);
 	add_path(rel, simple_gather_path);
 }
 
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 943fcde..79d3064 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -350,16 +350,22 @@ cost_samplescan(Path *path, PlannerInfo *root,
  *
  * 'rel' is the relation to be operated upon
  * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ * 'rows' may be used to point to a row estimate, this may be used when a rel
+ * is unavailable to retrieve row estimates from. This setting, if non-NULL
+ * overrides both 'rel' and 'param_info'.
  */
 void
 cost_gather(GatherPath *path, PlannerInfo *root,
-			RelOptInfo *rel, ParamPathInfo *param_info)
+			RelOptInfo *rel, ParamPathInfo *param_info,
+			double *rows)
 {
 	Cost		startup_cost = 0;
 	Cost		run_cost = 0;
 
 	/* Mark the path with the correct row estimate */
-	if (param_info)
+	if (rows)
+		path->path.rows = *rows;
+	else if (param_info)
 		path->path.rows = param_info->ppi_rows;
 	else
 		path->path.rows = rel->rows;
@@ -1751,6 +1757,8 @@ cost_agg(Path *path, PlannerInfo *root,
 	{
 		/* must be AGG_HASHED */
 		startup_cost = input_total_cost;
+		if (!enable_hashagg)
+			startup_cost += disable_cost;
 		startup_cost += aggcosts->transCost.startup;
 		startup_cost += aggcosts->transCost.per_tuple * input_tuples;
 		startup_cost += (cpu_operator_cost * numGroupCols) * input_tuples;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index e37bdfd..6953a60 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1572,8 +1572,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path)
 
 	plan = make_agg(tlist, quals,
 					best_path->aggstrategy,
-					false,
-					true,
+					best_path->combineStates,
+					best_path->finalizeAggs,
 					list_length(best_path->groupClause),
 					extract_grouping_cols(best_path->groupClause,
 										  subplan->targetlist),
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index fc0a2d8..92f6dcf 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -106,6 +106,11 @@ static double get_number_of_groups(PlannerInfo *root,
 					 double path_rows,
 					 List *rollup_lists,
 					 List *rollup_groupclauses);
+static void set_grouped_rel_consider_parallel(PlannerInfo *root,
+					 RelOptInfo *grouped_rel,
+					 PathTarget *target);
+static Size estimate_hashagg_tablesize(Path *path, AggClauseCosts *agg_costs,
+					 double dNumGroups);
 static RelOptInfo *create_grouping_paths(PlannerInfo *root,
 					  RelOptInfo *input_rel,
 					  PathTarget *target,
@@ -134,6 +139,8 @@ static RelOptInfo *create_ordered_paths(PlannerInfo *root,
 					 double limit_tuples);
 static PathTarget *make_group_input_target(PlannerInfo *root,
 						PathTarget *final_target);
+static PathTarget *make_partialgroup_input_target(PlannerInfo *root,
+												  PathTarget *final_target);
 static List *postprocess_setop_tlist(List *new_tlist, List *orig_tlist);
 static List *select_active_windows(PlannerInfo *root, WindowFuncLists *wflists);
 static PathTarget *make_window_input_target(PlannerInfo *root,
@@ -1741,6 +1748,19 @@ grouping_planner(PlannerInfo *root, bool inheritance_update,
 		}
 
 		/*
+		 * Likewise for any partial paths, although this case is more simple as
+		 * we don't track the cheapest path.
+		 */
+		foreach(lc, current_rel->partial_pathlist)
+		{
+			Path	   *subpath = (Path *) lfirst(lc);
+
+			Assert(subpath->param_info == NULL);
+			lfirst(lc) = apply_projection_to_path(root, current_rel,
+											subpath, scanjoin_target);
+		}
+
+		/*
 		 * Save the various upper-rel PathTargets we just computed into
 		 * root->upper_targets[].  The core code doesn't use this, but it
 		 * provides a convenient place for extensions to get at the info.  For
@@ -3134,6 +3154,71 @@ get_number_of_groups(PlannerInfo *root,
 }
 
 /*
+ * set_grouped_rel_consider_parallel
+ *	  Determine if this upper rel is safe to generate partial paths for.
+ */
+static void
+set_grouped_rel_consider_parallel(PlannerInfo *root, RelOptInfo *grouped_rel,
+								  PathTarget *target)
+{
+	Query	   *parse = root->parse;
+
+	Assert(grouped_rel->reloptkind == RELOPT_UPPER_REL);
+
+	/* we can do nothing in parallel if there's no aggregates or group by */
+	if (!parse->hasAggs && parse->groupClause == NIL)
+		return;
+
+	/* grouping sets are currently not supported by parallel aggregate */
+	if (parse->groupingSets)
+		return;
+
+	if (has_parallel_hazard((Node *) target->exprs, false) ||
+		has_parallel_hazard((Node *) parse->havingQual, false))
+		return;
+
+	/*
+	 * All that's left to check now is to make sure all aggregate functions
+	 * support partial mode. If there's no aggregates then we can skip checking
+	 * that.
+	 */
+	if (!parse->hasAggs)
+		grouped_rel->consider_parallel = true;
+	else if (aggregates_allow_partial((Node *) target->exprs) == PAT_ANY &&
+		aggregates_allow_partial(root->parse->havingQual) == PAT_ANY)
+		grouped_rel->consider_parallel = true;
+}
+
+/*
+ * estimate_hashagg_tablesize
+ *	  estimate the number of bytes that a hash aggregate hashtable will
+ *	  require based on the agg_costs, path width and dNumGroups.
+ *
+ * 'agg_costs' may be passed as NULL when no Aggregate size estimates are
+ * available or required.
+ */
+static Size
+estimate_hashagg_tablesize(Path *path, AggClauseCosts *agg_costs,
+						   double dNumGroups)
+{
+	Size		hashentrysize;
+
+	/* Estimate per-hash-entry space at tuple width... */
+	hashentrysize = MAXALIGN(path->pathtarget->width) +
+		MAXALIGN(SizeofMinimalTupleHeader);
+
+	if (agg_costs)
+	{
+		/* plus space for pass-by-ref transition values... */
+		hashentrysize += agg_costs->transitionSpace;
+		/* plus the per-hash-entry overhead */
+		hashentrysize += hash_agg_entry_size(agg_costs->numAggs);
+	}
+
+	return hashentrysize * dNumGroups;
+}
+
+/*
  * create_grouping_paths
  *
  * Build a new upperrel containing Paths for grouping and/or aggregation.
@@ -3162,10 +3247,14 @@ create_grouping_paths(PlannerInfo *root,
 {
 	Query	   *parse = root->parse;
 	Path	   *cheapest_path = input_rel->cheapest_total_path;
+	PathTarget *partial_group_target = NULL; /* only for parallel aggregate */
 	RelOptInfo *grouped_rel;
 	AggClauseCosts agg_costs;
+	Size		hashaggtablesize;
 	double		dNumGroups;
-	bool		allow_hash;
+	bool		can_hash;
+	bool		can_sort;
+
 	ListCell   *lc;
 
 	/* For now, do all work in the (GROUP_AGG, NULL) upperrel */
@@ -3259,12 +3348,131 @@ create_grouping_paths(PlannerInfo *root,
 									  rollup_groupclauses);
 
 	/*
-	 * Consider sort-based implementations of grouping, if possible.  (Note
-	 * that if groupClause is empty, grouping_is_sortable() is trivially true,
-	 * and all the pathkeys_contained_in() tests will succeed too, so that
-	 * we'll consider every surviving input path.)
+	 * Partial paths in the input rel could allow us to perform aggregation in
+	 * parallel, set_grouped_rel_consider_parallel() will determine if it's
+	 * going to be safe to do so.
+	 */
+	if (input_rel->partial_pathlist != NIL)
+		set_grouped_rel_consider_parallel(root, grouped_rel, target);
+
+	/*
+	 * Determine if it's possible to perform sort-based implementations of
+	 * grouping.  (Note that if groupClause is empty, grouping_is_sortable()
+	 * is trivially true, and all the pathkeys_contained_in() tests will
+	 * succeed too, so that we'll consider every surviving input path.)
+	 */
+	can_sort = grouping_is_sortable(parse->groupClause);
+
+	/*
+	 * Determine if we should consider hash-based implementations of grouping.
+	 *
+	 * Hashed aggregation only applies if we're grouping.  We currently can't
+	 * hash if there are grouping sets, though.
+	 *
+	 * Executor doesn't support hashed aggregation with DISTINCT or ORDER BY
+	 * aggregates.  (Doing so would imply storing *all* the input values in
+	 * the hash table, and/or running many sorts in parallel, either of which
+	 * seems like a certain loser.)  We similarly don't support ordered-set
+	 * aggregates in hashed aggregation, but that case is also included in the
+	 * numOrderedAggs count.
+	 *
+	 * Note: grouping_is_hashable() is much more expensive to check than the
+	 * other gating conditions, so we want to do it last.
+	 */
+	can_hash = (parse->groupClause != NIL &&
+				parse->groupingSets == NIL &&
+				agg_costs.numOrderedAggs == 0 &&
+				grouping_is_hashable(parse->groupClause));
+
+	/*
+	 * As of now grouped_rel has no partial paths. In order for us to consider
+	 * performing grouping in parallel we'll generate some partial aggregate
+	 * paths here.
 	 */
-	if (grouping_is_sortable(parse->groupClause))
+	if (grouped_rel->consider_parallel)
+	{
+		Path	   *partial_aggregate_path;
+		double		dNumPartialGroups;
+
+		partial_aggregate_path = (Path *) linitial(input_rel->partial_pathlist);
+
+		/* Build a suitable PathTarget for partial aggregation */
+		partial_group_target = make_partialgroup_input_target(root, target);
+
+		/*
+		 * XXX does this need estimated for each partial path, or are they
+		 * all going to be the same anyway?
+		 */
+		dNumPartialGroups = get_number_of_groups(root,
+								clamp_row_est(partial_aggregate_path->rows),
+												 rollup_lists,
+												 rollup_groupclauses);
+
+		if (can_sort && (parse->hasAggs || parse->groupClause))
+		{
+			foreach(lc, input_rel->partial_pathlist)
+			{
+				Path	   *path = (Path *) lfirst(lc);
+				bool		is_sorted;
+
+				is_sorted = pathkeys_contained_in(root->group_pathkeys,
+												  path->pathkeys);
+				if (!is_sorted)
+					path = (Path *) create_sort_path(root,
+													 grouped_rel,
+													 path,
+													 root->group_pathkeys,
+													 -1.0);
+
+				if (parse->hasAggs)
+					add_partial_path(grouped_rel, (Path *)
+								create_agg_path(root, grouped_rel,
+												path,
+												partial_group_target,
+								parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+												parse->groupClause,
+												NIL,
+												&agg_costs,
+												dNumPartialGroups,
+												false,
+												false));
+				else
+					add_partial_path(grouped_rel, (Path *)
+								create_group_path(root,
+												  grouped_rel,
+												  path,
+												  partial_group_target,
+												  parse->groupClause,
+												  NIL,
+												  dNumPartialGroups));
+			}
+		}
+
+		hashaggtablesize = estimate_hashagg_tablesize(partial_aggregate_path,
+													  &agg_costs,
+													  dNumPartialGroups);
+
+		/*
+		 * Generate a hashagg Path, if we can, but we'll skip this if the hash
+		 * table looks like it'll exceed work_mem.
+		 */
+		if (can_hash && hashaggtablesize < work_mem * 1024L)
+		{
+			add_partial_path(grouped_rel, (Path *)
+						create_agg_path(root, grouped_rel,
+										partial_aggregate_path,
+										partial_group_target,
+										AGG_HASHED,
+										parse->groupClause,
+										NIL,
+										&agg_costs,
+										dNumPartialGroups,
+										false,
+										false));
+		}
+	}
+
+	if (can_sort)
 	{
 		/*
 		 * Use any available suitably-sorted path as input, and also consider
@@ -3320,7 +3528,9 @@ create_grouping_paths(PlannerInfo *root,
 											 parse->groupClause,
 											 (List *) parse->havingQual,
 											 &agg_costs,
-											 dNumGroups));
+											 dNumGroups,
+											 false,
+											 true));
 				}
 				else if (parse->groupClause)
 				{
@@ -3344,69 +3554,106 @@ create_grouping_paths(PlannerInfo *root,
 				}
 			}
 		}
-	}
 
-	/*
-	 * Consider hash-based implementations of grouping, if possible.
-	 *
-	 * Hashed aggregation only applies if we're grouping.  We currently can't
-	 * hash if there are grouping sets, though.
-	 *
-	 * Executor doesn't support hashed aggregation with DISTINCT or ORDER BY
-	 * aggregates.  (Doing so would imply storing *all* the input values in
-	 * the hash table, and/or running many sorts in parallel, either of which
-	 * seems like a certain loser.)  We similarly don't support ordered-set
-	 * aggregates in hashed aggregation, but that case is also included in the
-	 * numOrderedAggs count.
-	 *
-	 * Note: grouping_is_hashable() is much more expensive to check than the
-	 * other gating conditions, so we want to do it last.
-	 */
-	allow_hash = (parse->groupClause != NIL &&
-				  parse->groupingSets == NIL &&
-				  agg_costs.numOrderedAggs == 0);
+		foreach(lc, grouped_rel->partial_pathlist)
+		{
+			Path	   *path = (Path *) lfirst(lc);
+			double		total_groups;
+
+			total_groups = path->parallel_degree * path->rows;
+
+			path = (Path *) create_gather_path(root, grouped_rel, path, NULL,
+											   &total_groups);
+			path = (Path *) create_sort_path(root,
+											 grouped_rel,
+											 path,
+											 root->group_pathkeys,
+											 -1.0);
+
+			if (parse->hasAggs)
+				add_path(grouped_rel, (Path *)
+							create_agg_path(root,
+											grouped_rel,
+											path,
+											target,
+								parse->groupClause ? AGG_SORTED : AGG_PLAIN,
+											parse->groupClause,
+											(List *) parse->havingQual,
+											&agg_costs,
+											total_groups,
+											true,
+											true));
+			else
+					add_path(grouped_rel, (Path *)
+							 create_group_path(root,
+											   grouped_rel,
+											   path,
+											   target,
+											   parse->groupClause,
+											   (List *) parse->havingQual,
+											   total_groups));
+
+		}
+	}
 
-	/* Consider reasons to disable hashing, but only if we can sort instead */
-	if (allow_hash && grouped_rel->pathlist != NIL)
+	if (can_hash)
 	{
-		if (!enable_hashagg)
-			allow_hash = false;
-		else
+		hashaggtablesize = estimate_hashagg_tablesize(cheapest_path,
+													  &agg_costs,
+													  dNumGroups);
+
+		/*
+		 * Generate HashAgg Paths providing the estimated hash table size is
+		 * not too big. Although if no other Paths were generated above, then
+		 * we'll begrudgingly generate them so that we actually have some.
+		 */
+		if (hashaggtablesize < work_mem * 1024L ||
+			grouped_rel->pathlist == NIL)
 		{
 			/*
-			 * Don't hash if it doesn't look like the hashtable will fit into
-			 * work_mem.
+			 * We just need an Agg over the cheapest-total input path, since input
+			 * order won't matter.
 			 */
-			Size		hashentrysize;
-
-			/* Estimate per-hash-entry space at tuple width... */
-			hashentrysize = MAXALIGN(cheapest_path->pathtarget->width) +
-				MAXALIGN(SizeofMinimalTupleHeader);
-			/* plus space for pass-by-ref transition values... */
-			hashentrysize += agg_costs.transitionSpace;
-			/* plus the per-hash-entry overhead */
-			hashentrysize += hash_agg_entry_size(agg_costs.numAggs);
-
-			if (hashentrysize * dNumGroups > work_mem * 1024L)
-				allow_hash = false;
+			add_path(grouped_rel, (Path *)
+					 create_agg_path(root, grouped_rel,
+									 cheapest_path,
+									 target,
+									 AGG_HASHED,
+									 parse->groupClause,
+									 (List *) parse->havingQual,
+									 &agg_costs,
+									 dNumGroups,
+									 false,
+									 true));
 		}
-	}
 
-	if (allow_hash && grouping_is_hashable(parse->groupClause))
-	{
 		/*
-		 * We just need an Agg over the cheapest-total input path, since input
-		 * order won't matter.
+		 * Now generate complete HashAgg paths atop of the cheapest partial
+		 * path.
 		 */
-		add_path(grouped_rel, (Path *)
-				 create_agg_path(root, grouped_rel,
-								 cheapest_path,
-								 target,
-								 AGG_HASHED,
-								 parse->groupClause,
-								 (List *) parse->havingQual,
-								 &agg_costs,
-								 dNumGroups));
+		if (grouped_rel->partial_pathlist)
+		{
+			Path   *path = (Path *) linitial(grouped_rel->partial_pathlist);
+			double	total_groups;
+
+			total_groups = path->parallel_degree * path->rows;
+
+			path = (Path *) create_gather_path(root, grouped_rel, path, NULL,
+											   &total_groups);
+
+			add_path(grouped_rel, (Path *)
+						create_agg_path(root,
+										grouped_rel,
+										path,
+										target,
+										AGG_HASHED,
+										parse->groupClause,
+										(List *) parse->havingQual,
+										&agg_costs,
+										total_groups,
+										true,
+										true));
+		}
 	}
 
 	/* Give a helpful error if we failed to find any implementation */
@@ -3735,7 +3982,9 @@ create_distinct_paths(PlannerInfo *root,
 								 parse->distinctClause,
 								 NIL,
 								 NULL,
-								 numDistinctRows));
+								 numDistinctRows,
+								 false,
+								 true));
 	}
 
 	/* Give a helpful error if we failed to find any implementation */
@@ -3915,6 +4164,96 @@ make_group_input_target(PlannerInfo *root, PathTarget *final_target)
 }
 
 /*
+ * make_partialgroup_input_target
+ *	  Generate appropriate PathTarget for input for Partial Aggregate nodes.
+ *
+ * Similar to make_group_input_target(), only we don't recurse into Aggrefs, as
+ * we need these to remain intact so that they can be found later in  Combine
+ * Aggregate nodes during setrefs. Vars will be still pulled out of
+ * non-Aggref nodes as these will still be required by the combine aggregate
+ * phase.
+ *
+ * We also convert any Aggrefs which we do find and put them into partial mode,
+ * this adjusts the Aggref's return type so that the partially calculated
+ * aggregate value can make its way up the execution tree up to the Finalize
+ * Aggregate node.
+ */
+static PathTarget *
+make_partialgroup_input_target(PlannerInfo *root, PathTarget *final_target)
+{
+	Query	   *parse = root->parse;
+	PathTarget *input_target;
+	List	   *non_group_cols;
+	List	   *non_group_exprs;
+	int			i;
+	ListCell   *lc;
+
+	input_target = create_empty_pathtarget();
+	non_group_cols = NIL;
+
+	i = -1;
+	foreach(lc, final_target->exprs)
+	{
+		Expr	   *expr = (Expr *) lfirst(lc);
+
+		i++;
+
+		if (parse->groupClause)
+		{
+			Index sgref = final_target->sortgrouprefs[i];
+
+			if (sgref && get_sortgroupref_clause_noerr(sgref, parse->groupClause)
+					!= NULL)
+			{
+				/*
+				 * It's a grouping column, so add it to the input target as-is.
+				 */
+				add_column_to_pathtarget(input_target, expr, sgref);
+				continue;
+			}
+		}
+
+		/*
+		 * Non-grouping column, so just remember the expression for later
+		 * call to pull_var_clause.
+		 */
+		non_group_cols = lappend(non_group_cols, expr);
+	}
+
+	/*
+	 * If there's a HAVING clause, we'll need the Aggrefs it uses, too.
+	 */
+	if (parse->havingQual)
+		non_group_cols = lappend(non_group_cols, parse->havingQual);
+
+	/*
+	 * Pull out all the Vars mentioned in non-group cols (plus HAVING), and
+	 * add them to the input target if not already present.  (A Var used
+	 * directly as a GROUP BY item will be present already.)  Note this
+	 * includes Vars used in resjunk items, so we are covering the needs of
+	 * ORDER BY and window specifications.  Vars used within Aggrefs will be
+	 * ignored and the Aggrefs themselves will be added to the PathTarget.
+	 */
+	non_group_exprs = pull_var_clause((Node *) non_group_cols,
+									  PVC_INCLUDE_AGGREGATES |
+									  PVC_RECURSE_WINDOWFUNCS |
+									  PVC_INCLUDE_PLACEHOLDERS);
+
+	add_new_columns_to_pathtarget(input_target, non_group_exprs);
+
+	/* clean up cruft */
+	list_free(non_group_exprs);
+	list_free(non_group_cols);
+
+	/* Adjust Aggrefs to put them in partial mode. */
+	apply_partialaggref_adjustment(input_target);
+
+	/* XXX this causes some redundant cost calculation ... */
+	input_target = set_pathtarget_cost_width(root, input_target);
+	return input_target;
+}
+
+/*
  * postprocess_setop_tlist
  *	  Fix up targetlist returned by plan_set_operations().
  *
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index aa2c308..4ae1599 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -104,6 +104,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context);
 static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context);
 static void set_join_references(PlannerInfo *root, Join *join, int rtoffset);
 static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset);
+static void set_combineagg_references(PlannerInfo *root, Plan *plan,
+									  int rtoffset);
 static void set_dummy_tlist_references(Plan *plan, int rtoffset);
 static indexed_tlist *build_tlist_index(List *tlist);
 static Var *search_indexed_tlist_for_var(Var *var,
@@ -117,6 +119,8 @@ static Var *search_indexed_tlist_for_sortgroupref(Node *node,
 									  Index sortgroupref,
 									  indexed_tlist *itlist,
 									  Index newvarno);
+static Var *search_indexed_tlist_for_partial_aggref(Aggref *aggref,
+					   indexed_tlist *itlist, Index newvarno);
 static List *fix_join_expr(PlannerInfo *root,
 			  List *clauses,
 			  indexed_tlist *outer_itlist,
@@ -131,6 +135,13 @@ static Node *fix_upper_expr(PlannerInfo *root,
 			   int rtoffset);
 static Node *fix_upper_expr_mutator(Node *node,
 					   fix_upper_expr_context *context);
+static Node *fix_combine_agg_expr(PlannerInfo *root,
+								  Node *node,
+								  indexed_tlist *subplan_itlist,
+								  Index newvarno,
+								  int rtoffset);
+static Node *fix_combine_agg_expr_mutator(Node *node,
+										  fix_upper_expr_context *context);
 static List *set_returning_clause_references(PlannerInfo *root,
 								List *rlist,
 								Plan *topplan,
@@ -667,8 +678,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 			}
 			break;
 		case T_Agg:
-			set_upper_references(root, plan, rtoffset);
-			break;
+			{
+				Agg *aggplan = (Agg *) plan;
+
+				if (aggplan->combineStates)
+					set_combineagg_references(root, plan, rtoffset);
+				else
+					set_upper_references(root, plan, rtoffset);
+
+				break;
+			}
 		case T_Group:
 			set_upper_references(root, plan, rtoffset);
 			break;
@@ -1702,6 +1721,72 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset)
 }
 
 /*
+ * set_combineagg_references
+ *	  This does a similar job as set_upper_references(), but additionally it
+ *	  transforms Aggref nodes args to suit the combine aggregate phase, this
+ *	  means that the Aggref->args are converted to reference the corresponding
+ *	  aggregate function in the subplan rather than simple Var(s), as would be
+ *	  the case for a non-combine aggregate node.
+ */
+static void
+set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset)
+{
+	Plan	   *subplan = plan->lefttree;
+	indexed_tlist *subplan_itlist;
+	List	   *output_targetlist;
+	ListCell   *l;
+
+	Assert(IsA(plan, Agg));
+	Assert(((Agg *) plan)->combineStates);
+
+	subplan_itlist = build_tlist_index(subplan->targetlist);
+
+	output_targetlist = NIL;
+
+	foreach(l, plan->targetlist)
+	{
+		TargetEntry *tle = (TargetEntry *) lfirst(l);
+		Node	   *newexpr;
+
+		/* If it's a non-Var sort/group item, first try to match by sortref */
+		if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var))
+		{
+			newexpr = (Node *)
+				search_indexed_tlist_for_sortgroupref((Node *) tle->expr,
+													  tle->ressortgroupref,
+													  subplan_itlist,
+													  OUTER_VAR);
+			if (!newexpr)
+				newexpr = fix_combine_agg_expr(root,
+											   (Node *) tle->expr,
+											   subplan_itlist,
+											   OUTER_VAR,
+											   rtoffset);
+		}
+		else
+			newexpr = fix_combine_agg_expr(root,
+										   (Node *) tle->expr,
+										   subplan_itlist,
+										   OUTER_VAR,
+										   rtoffset);
+		tle = flatCopyTargetEntry(tle);
+		tle->expr = (Expr *) newexpr;
+		output_targetlist = lappend(output_targetlist, tle);
+	}
+
+	plan->targetlist = output_targetlist;
+
+	plan->qual = (List *)
+		fix_combine_agg_expr(root,
+							 (Node *) plan->qual,
+							 subplan_itlist,
+							 OUTER_VAR,
+							 rtoffset);
+
+	pfree(subplan_itlist);
+}
+
+/*
  * set_dummy_tlist_references
  *	  Replace the targetlist of an upper-level plan node with a simple
  *	  list of OUTER_VAR references to its child.
@@ -1968,6 +2053,71 @@ search_indexed_tlist_for_sortgroupref(Node *node,
 }
 
 /*
+ * Find the Var for the matching 'aggref' in 'itlist'
+ *
+ * Aggrefs for partial aggregates have their aggpartial setting adjusted to put
+ * them in partial mode. This means that a standard equal() comparison won't
+ * match when comparing an Aggref which is in partial mode with an Aggref which
+ * is not. Here we manually compare all of the fields apart from
+ * aggpartialtype, which is set only when putting the Aggref into partial mode,
+ * and aggpartial, which is the flag which determines if the Aggref is in
+ * partial mode or not.
+ */
+static Var *
+search_indexed_tlist_for_partial_aggref(Aggref *aggref, indexed_tlist *itlist,
+										Index newvarno)
+{
+	ListCell *lc;
+
+	foreach(lc, itlist->tlist)
+	{
+		TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+		if (IsA(tle->expr, Aggref))
+		{
+			Aggref	   *tlistaggref = (Aggref *) tle->expr;
+			Var		   *newvar;
+
+			if (aggref->aggfnoid != tlistaggref->aggfnoid)
+				continue;
+			if (aggref->aggtype != tlistaggref->aggtype)
+				continue;
+			/* ignore aggpartialtype */
+			if (aggref->aggcollid != tlistaggref->aggcollid)
+				continue;
+			if (aggref->inputcollid != tlistaggref->inputcollid)
+				continue;
+			if (!equal(aggref->aggdirectargs, tlistaggref->aggdirectargs))
+				continue;
+			if (!equal(aggref->args, tlistaggref->args))
+				continue;
+			if (!equal(aggref->aggorder, tlistaggref->aggorder))
+				continue;
+			if (!equal(aggref->aggdistinct, tlistaggref->aggdistinct))
+				continue;
+			if (!equal(aggref->aggfilter, tlistaggref->aggfilter))
+				continue;
+			if (aggref->aggstar != tlistaggref->aggstar)
+				continue;
+			if (aggref->aggvariadic != tlistaggref->aggvariadic)
+				continue;
+			/* ignore aggpartial */
+			if (aggref->aggkind != tlistaggref->aggkind)
+				continue;
+			if (aggref->agglevelsup != tlistaggref->agglevelsup)
+				continue;
+
+			newvar = makeVarFromTargetEntry(newvarno, tle);
+			newvar->varnoold = 0;	/* wasn't ever a plain Var */
+			newvar->varoattno = 0;
+
+			return newvar;
+		}
+	}
+	return NULL;
+}
+
+/*
  * fix_join_expr
  *	   Create a new set of targetlist entries or join qual clauses by
  *	   changing the varno/varattno values of variables in the clauses
@@ -2238,6 +2388,105 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context)
 }
 
 /*
+ * fix_combine_agg_expr
+ *	  Like fix_upper_expr() but additionally adjusts the Aggref->args of
+ *	  Aggrefs so that they references the corresponding Aggref in the subplan.
+ */
+static Node *
+fix_combine_agg_expr(PlannerInfo *root,
+			   Node *node,
+			   indexed_tlist *subplan_itlist,
+			   Index newvarno,
+			   int rtoffset)
+{
+	fix_upper_expr_context context;
+
+	context.root = root;
+	context.subplan_itlist = subplan_itlist;
+	context.newvarno = newvarno;
+	context.rtoffset = rtoffset;
+	return fix_combine_agg_expr_mutator(node, &context);
+}
+
+static Node *
+fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context)
+{
+	Var		   *newvar;
+
+	if (node == NULL)
+		return NULL;
+	if (IsA(node, Var))
+	{
+		Var		   *var = (Var *) node;
+
+		newvar = search_indexed_tlist_for_var(var,
+											  context->subplan_itlist,
+											  context->newvarno,
+											  context->rtoffset);
+		if (!newvar)
+			elog(ERROR, "variable not found in subplan target list");
+		return (Node *) newvar;
+	}
+	if (IsA(node, PlaceHolderVar))
+	{
+		PlaceHolderVar *phv = (PlaceHolderVar *) node;
+
+		/* See if the PlaceHolderVar has bubbled up from a lower plan node */
+		if (context->subplan_itlist->has_ph_vars)
+		{
+			newvar = search_indexed_tlist_for_non_var((Node *) phv,
+													  context->subplan_itlist,
+													  context->newvarno);
+			if (newvar)
+				return (Node *) newvar;
+		}
+		/* If not supplied by input plan, evaluate the contained expr */
+		return fix_upper_expr_mutator((Node *) phv->phexpr, context);
+	}
+	if (IsA(node, Param))
+		return fix_param_node(context->root, (Param *) node);
+	if (IsA(node, Aggref))
+	{
+		Aggref		   *aggref = (Aggref *) node;
+
+		newvar = search_indexed_tlist_for_partial_aggref(aggref,
+													 context->subplan_itlist,
+														 context->newvarno);
+		if (newvar)
+		{
+			Aggref		   *newaggref;
+			TargetEntry	   *newtle;
+
+			/*
+			 * Now build a new TargetEntry for the Aggref's arguments which is
+			 * a single Var which references the corresponding AggRef in the
+			 * node below.
+			 */
+			newtle = makeTargetEntry((Expr *) newvar, 1, NULL, false);
+			newaggref = (Aggref *) copyObject(aggref);
+			newaggref->args = list_make1(newtle);
+
+			return (Node *) newaggref;
+		}
+		else
+			elog(ERROR, "Aggref not found in subplan target list");
+	}
+	/* Try matching more complex expressions too, if tlist has any */
+	if (context->subplan_itlist->has_non_vars)
+	{
+		newvar = search_indexed_tlist_for_non_var(node,
+												  context->subplan_itlist,
+												  context->newvarno);
+		if (newvar)
+			return (Node *) newvar;
+	}
+	fix_expr_common(context->root, node);
+	return expression_tree_mutator(node,
+								   fix_combine_agg_expr_mutator,
+								   (void *) context);
+}
+
+/*
  * set_returning_clause_references
  *		Perform setrefs.c's work on a RETURNING targetlist
  *
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 6ea3319..fb139af 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -859,7 +859,9 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist,
 										groupList,
 										NIL,
 										NULL,
-										dNumGroups);
+										dNumGroups,
+										false,
+										true);
 	}
 	else
 	{
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index b692e18..925c340 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -52,6 +52,10 @@
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
+typedef struct
+{
+	PartialAggType allowedtype;
+} partial_agg_context;
 
 typedef struct
 {
@@ -93,6 +97,8 @@ typedef struct
 	bool		allow_restricted;
 } has_parallel_hazard_arg;
 
+static bool aggregates_allow_partial_walker(Node *node,
+											partial_agg_context *context);
 static bool contain_agg_clause_walker(Node *node, void *context);
 static bool count_agg_clauses_walker(Node *node,
 						 count_agg_clauses_context *context);
@@ -400,6 +406,88 @@ make_ands_implicit(Expr *clause)
  *****************************************************************************/
 
 /*
+ * aggregates_allow_partial
+ *		Recursively search for Aggref clauses and determine the maximum
+ *		level of partial aggregation which can be supported.
+ *
+ * Partial aggregation requires that each aggregate does not have a DISTINCT or
+ * ORDER BY clause, and that it also has a combine function set. Since partial
+ * aggregation requires that the aggregate state is not finalized before
+ * returning to the next node up in the plan tree, this means that aggregate
+ * with an INTERNAL state type can only support, at most PAT_INTERNAL_ONLY
+ * mode, meaning that partial aggregation is only supported within a single
+ * process, of course, this is because this pointer to the INTERNAL state
+ * cannot be dereferenced by another process.
+ */
+PartialAggType
+aggregates_allow_partial(Node *clause)
+{
+	partial_agg_context context;
+
+	/* initially any type is okay, until we find Aggrefs which say otherwise */
+	context.allowedtype = PAT_ANY;
+
+	if (!aggregates_allow_partial_walker(clause, &context))
+		return context.allowedtype;
+	return context.allowedtype;
+}
+
+static bool
+aggregates_allow_partial_walker(Node *node, partial_agg_context *context)
+{
+	if (node == NULL)
+		return false;
+	if (IsA(node, Aggref))
+	{
+		Aggref	   *aggref = (Aggref *) node;
+		HeapTuple	aggTuple;
+		Form_pg_aggregate aggform;
+
+		Assert(aggref->agglevelsup == 0);
+
+		/*
+		 * We can't perform partial aggregation with Aggrefs containing a
+		 * DISTINCT or ORDER BY clause.
+		 */
+		if (aggref->aggdistinct || aggref->aggorder)
+		{
+			context->allowedtype = PAT_DISABLED;
+			return true;	/* abort search */
+		}
+		aggTuple = SearchSysCache1(AGGFNOID,
+								   ObjectIdGetDatum(aggref->aggfnoid));
+		if (!HeapTupleIsValid(aggTuple))
+			elog(ERROR, "cache lookup failed for aggregate %u",
+				 aggref->aggfnoid);
+		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+		/*
+		 * If there is no combine function, then partial aggregation is not
+		 * possible.
+		 */
+		if (!OidIsValid(aggform->aggcombinefn))
+		{
+			ReleaseSysCache(aggTuple);
+			context->allowedtype = PAT_DISABLED;
+			return true;	/* abort search */
+		}
+
+		/*
+		 * If we find any aggs with an internal transtype then we must ensure
+		 * that pointers to aggregate states are not passed to other processes,
+		 * therefore we set the maximum allowed type to PAT_INTERNAL_ONLY.
+		 */
+		if (aggform->aggtranstype == INTERNALOID)
+			context->allowedtype = PAT_INTERNAL_ONLY;
+
+		ReleaseSysCache(aggTuple);
+		return false; /* continue searching */
+	}
+	return expression_tree_walker(node, aggregates_allow_partial_walker,
+								  (void *) context);
+}
+
+/*
  * contain_agg_clause
  *	  Recursively search for Aggref/GroupingFunc nodes within a clause.
  *
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index b8ea316..2a1b2a0 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1645,10 +1645,12 @@ translate_sub_tlist(List *tlist, int relid)
  * create_gather_path
  *	  Creates a path corresponding to a gather scan, returning the
  *	  pathnode.
+ *
+ * 'rows' may optionally be set to override row estimates from other sources.
  */
 GatherPath *
 create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
-				   Relids required_outer)
+				   Relids required_outer, double *rows)
 {
 	GatherPath *pathnode = makeNode(GatherPath);
 
@@ -1674,7 +1676,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		pathnode->single_copy = true;
 	}
 
-	cost_gather(pathnode, root, rel, pathnode->path.param_info);
+	cost_gather(pathnode, root, rel, pathnode->path.param_info, rows);
 
 	return pathnode;
 }
@@ -2387,6 +2389,8 @@ create_upper_unique_path(PlannerInfo *root,
  * 'qual' is the HAVING quals if any
  * 'aggcosts' contains cost info about the aggregate functions to be computed
  * 'numGroups' is the estimated number of groups (1 if not grouping)
+ * 'combineStates' is set to true if the Agg node should combine agg states
+ * 'finalizeAggs' is set to false if the Agg node should not call the finalfn
  */
 AggPath *
 create_agg_path(PlannerInfo *root,
@@ -2397,7 +2401,9 @@ create_agg_path(PlannerInfo *root,
 				List *groupClause,
 				List *qual,
 				const AggClauseCosts *aggcosts,
-				double numGroups)
+				double numGroups,
+				bool combineStates,
+				bool finalizeAggs)
 {
 	AggPath    *pathnode = makeNode(AggPath);
 
@@ -2420,6 +2426,8 @@ create_agg_path(PlannerInfo *root,
 	pathnode->numGroups = numGroups;
 	pathnode->groupClause = groupClause;
 	pathnode->qual = qual;
+	pathnode->finalizeAggs = finalizeAggs;
+	pathnode->combineStates = combineStates;
 
 	cost_agg(&pathnode->path, root,
 			 aggstrategy, aggcosts,
diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c
index b297d87..e650fa4 100644
--- a/src/backend/optimizer/util/tlist.c
+++ b/src/backend/optimizer/util/tlist.c
@@ -14,9 +14,12 @@
  */
 #include "postgres.h"
 
+#include "access/htup_details.h"
+#include "catalog/pg_aggregate.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/tlist.h"
+#include "utils/syscache.h"
 
 
 /*****************************************************************************
@@ -748,3 +751,46 @@ apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target)
 		i++;
 	}
 }
+
+/*
+ * apply_partialaggref_adjustment
+ *	  Convert PathTarget to be suitable for a partial aggregate node. We simply
+ *	  adjust any Aggref nodes found in the target and set the aggpartial to
+ *	  TRUE. Here we also apply the aggpartialtype to the Aggref. This allows
+ *	  exprType() to return the partial type rather than the agg type.
+ *
+ * Note: We expect 'target' to be a flat target list and not have Aggrefs burried
+ * within other expressions.
+ */
+void
+apply_partialaggref_adjustment(PathTarget *target)
+{
+	ListCell *lc;
+
+	foreach(lc, target->exprs)
+	{
+		Aggref *aggref = (Aggref *) lfirst(lc);
+
+		if (IsA(aggref, Aggref))
+		{
+			HeapTuple	aggTuple;
+			Form_pg_aggregate aggform;
+			Aggref	   *newaggref;
+
+			aggTuple = SearchSysCache1(AGGFNOID,
+									   ObjectIdGetDatum(aggref->aggfnoid));
+			if (!HeapTupleIsValid(aggTuple))
+				elog(ERROR, "cache lookup failed for aggregate %u",
+					 aggref->aggfnoid);
+			aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
+
+			newaggref = (Aggref *) copyObject(aggref);
+			newaggref->aggpartialtype = aggform->aggtranstype;
+			newaggref->aggpartial = true;
+
+			lfirst(lc) = newaggref;
+
+			ReleaseSysCache(aggTuple);
+		}
+	}
+}
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index f942378..947fca6 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -255,12 +255,30 @@ typedef struct Param
  * DISTINCT is not supported in this case, so aggdistinct will be NIL.
  * The direct arguments appear in aggdirectargs (as a list of plain
  * expressions, not TargetEntry nodes).
+ *
+ * An Aggref can operate in one of two modes. Normally an aggregate function's
+ * value is calculated with a single executor Agg node, however there are
+ * times, such as parallel aggregation when we want to calculate the aggregate
+ * value in multiple phases. This requires at least a Partial Aggregate phase,
+ * where normal aggregation takes place, but the aggregate's final function is
+ * not called, then later a Finalize Aggregate phase, where previously
+ * aggregated states are combined and the final function is called. No settings
+ * in Aggref determine this behaviour, the only thing that is required in
+ * Aggref to allow this behaviour is having the ability to determine the data
+ * type which this Aggref will produce. The 'aggpartial' field is used to
+ * determine to which of the two data types the Aggref will produce, either
+ * 'aggtype' or 'aggpartialtype', the latter of which is only set upon changing
+ * the Aggref into partial mode.
+ *
+ * Note: If you are adding fields here you may also need to add a comparison
+ * in search_indexed_tlist_for_partial_aggref()
  */
 typedef struct Aggref
 {
 	Expr		xpr;
 	Oid			aggfnoid;		/* pg_proc Oid of the aggregate */
 	Oid			aggtype;		/* type Oid of result of the aggregate */
+	Oid			aggpartialtype;	/* return type if aggpartial is true */
 	Oid			aggcollid;		/* OID of collation of result */
 	Oid			inputcollid;	/* OID of collation that function should use */
 	List	   *aggdirectargs;	/* direct arguments, if an ordered-set agg */
@@ -271,6 +289,7 @@ typedef struct Aggref
 	bool		aggstar;		/* TRUE if argument list was really '*' */
 	bool		aggvariadic;	/* true if variadic arguments have been
 								 * combined into an array last argument */
+	bool		aggpartial;		/* TRUE if Agg value should not be finalized */
 	char		aggkind;		/* aggregate kind (see pg_aggregate.h) */
 	Index		agglevelsup;	/* > 0 if agg belongs to outer query */
 	int			location;		/* token location, or -1 if unknown */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 5032696..ee7007a 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1309,6 +1309,8 @@ typedef struct AggPath
 	double		numGroups;		/* estimated number of groups in input */
 	List	   *groupClause;	/* a list of SortGroupClause's */
 	List	   *qual;			/* quals (HAVING quals), if any */
+	bool		combineStates;	/* input is partially aggregated agg states */
+	bool		finalizeAggs;	/* should the executor call the finalfn? */
 } AggPath;
 
 /*
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 3b3fd0f..c467f84 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -27,6 +27,25 @@ typedef struct
 	List	  **windowFuncs;	/* lists of WindowFuncs for each winref */
 } WindowFuncLists;
 
+/*
+ * PartialAggType
+ *	PartialAggType stores whether partial aggregation is allowed and
+ *	which context it is allowed in. We require three states here as there are
+ *	two different contexts in which partial aggregation is safe. For aggregates
+ *	which have an 'stype' of INTERNAL, within a single backend process it is
+ *	okay to pass a pointer to the aggregate state, as the memory to which the
+ *	pointer points to will belong to the same process. In cases where the
+ *	aggregate state must be passed between different processes, for example
+ *	during parallel aggregation, passing the pointer is not okay due to the
+ *	fact that the memory being referenced won't be accessible from another
+ *	process.
+ */
+typedef enum
+{
+	PAT_ANY = 0,		/* Any type of partial aggregation is okay. */
+	PAT_INTERNAL_ONLY,	/* Some aggregates support only internal mode. */
+	PAT_DISABLED		/* Some aggregates don't support partial mode at all */
+} PartialAggType;
 
 extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset,
 			  Expr *leftop, Expr *rightop,
@@ -47,6 +66,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2);
 extern Expr *make_ands_explicit(List *andclauses);
 extern List *make_ands_implicit(Expr *clause);
 
+extern PartialAggType aggregates_allow_partial(Node *clause);
 extern bool contain_agg_clause(Node *clause);
 extern void count_agg_clauses(PlannerInfo *root, Node *clause,
 				  AggClauseCosts *costs);
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index fea2bb7..d4adca6 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -150,7 +150,7 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
 					SpecialJoinInfo *sjinfo,
 					SemiAntiJoinFactors *semifactors);
 extern void cost_gather(GatherPath *path, PlannerInfo *root,
-			RelOptInfo *baserel, ParamPathInfo *param_info);
+			RelOptInfo *baserel, ParamPathInfo *param_info, double *rows);
 extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan);
 extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root);
 extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root);
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index d1eb22f..4337e2c 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -74,7 +74,8 @@ extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
 extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
 				   Path *subpath, SpecialJoinInfo *sjinfo);
 extern GatherPath *create_gather_path(PlannerInfo *root,
-				   RelOptInfo *rel, Path *subpath, Relids required_outer);
+				   RelOptInfo *rel, Path *subpath, Relids required_outer,
+				   double *rows);
 extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root,
 						 RelOptInfo *rel, Path *subpath,
 						 List *pathkeys, Relids required_outer);
@@ -168,7 +169,9 @@ extern AggPath *create_agg_path(PlannerInfo *root,
 				List *groupClause,
 				List *qual,
 				const AggClauseCosts *aggcosts,
-				double numGroups);
+				double numGroups,
+				bool combineStates,
+				bool finalizeAggs);
 extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root,
 						 RelOptInfo *rel,
 						 Path *subpath,
diff --git a/src/include/optimizer/tlist.h b/src/include/optimizer/tlist.h
index 0d745a0..de58db1 100644
--- a/src/include/optimizer/tlist.h
+++ b/src/include/optimizer/tlist.h
@@ -61,6 +61,7 @@ extern void add_column_to_pathtarget(PathTarget *target,
 extern void add_new_column_to_pathtarget(PathTarget *target, Expr *expr);
 extern void add_new_columns_to_pathtarget(PathTarget *target, List *exprs);
 extern void apply_pathtarget_labeling_to_tlist(List *tlist, PathTarget *target);
+extern void apply_partialaggref_adjustment(PathTarget *target);
 
 /* Convenience macro to get a PathTarget with valid cost/width fields */
 #define create_pathtarget(root, tlist) \
-- 
1.9.5.msysgit.1

