On Wed, 2021-03-03 at 17:58 +0900, Amit Langote wrote:
> For example, with the attached PoC patch:

I have incorporated your POC patch and added a regression test.

I didn't test it thoroughly though.

Yours,
Laurenz Albe
From 34f7da98b34bc1dbf7daca9e2ca6055c80d77f43 Mon Sep 17 00:00:00 2001
From: Laurenz Albe <laurenz.a...@cybertec.at>
Date: Fri, 5 Mar 2021 14:43:34 +0100
Subject: [PATCH] Allow setting parallel_workers on partitioned tables.

Sometimes it's beneficial to do

ALTER TABLE my_part_tbl SET (parallel_workers = 32);

when the query planner is planning too few workers to read from your
partitions. For example, if you have a partitioned table where each
partition is a foreign table that cannot itself have this setting on it.
Shown to give a 100%+ performance increase when used with cstore_fdw
column store partitions.

This is also useful to lower the number of parallel workers, for
example when the executor is expected to prune most partitions.

The setting also affects parallel appends for partitionwise aggregates
on a partitioned table, but not partitionwise joins.

Authors: Seamus Abshere, Amit Langote, Laurenz Albe
Discussion: https://postgr.es/m/95b1dd96-8634-4545-b1de-e2ac779beb44%40www.fastmail.com
---
 doc/src/sgml/ref/create_table.sgml            |  15 +-
 src/backend/access/common/reloptions.c        |  14 +-
 src/backend/optimizer/path/allpaths.c         | 155 ++++++++++--------
 src/backend/optimizer/plan/planner.c          |  19 +++
 src/include/utils/rel.h                       |  15 +-
 .../regress/expected/partition_aggregate.out  |  73 ++++++++-
 src/test/regress/sql/partition_aggregate.sql  |  19 ++-
 7 files changed, 231 insertions(+), 79 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 3b2b227683..c8c14850c1 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1337,8 +1337,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     If a table parameter value is set and the
     equivalent <literal>toast.</literal> parameter is not, the TOAST table
     will use the table's parameter value.
-    Specifying these parameters for partitioned tables is not supported,
-    but you may specify them for individual leaf partitions.
+    These parameters, with the exception of <literal>parallel_workers</literal>,
+    are not supported on partitioned tables, but you may specify them for
+    individual leaf partitions.
    </para>
 
    <variablelist>
@@ -1401,9 +1402,13 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       This sets the number of workers that should be used to assist a parallel
       scan of this table.  If not set, the system will determine a value based
-      on the relation size.  The actual number of workers chosen by the planner
-      or by utility statements that use parallel scans may be less, for example
-      due to the setting of <xref linkend="guc-max-worker-processes"/>.
+      on the relation size and the number of scanned partitions.
+      When set on a partitioned table, the specified number of workers will
+      work on distinct partitions, so the number of partitions affected by the
+      parallel operation should be taken into account.
+      The actual number of workers chosen by the planner or by utility
+      statements that use parallel scans may be less, for example due
+      to the setting of <xref linkend="guc-max-worker-processes"/>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index c687d3ee9e..f8443d2361 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -377,7 +377,7 @@ static relopt_int intRelOpts[] =
 		{
 			"parallel_workers",
 			"Number of parallel processes that can be used per executor node for this relation.",
-			RELOPT_KIND_HEAP,
+			RELOPT_KIND_HEAP | RELOPT_KIND_PARTITIONED,
 			ShareUpdateExclusiveLock
 		},
 		-1, 0, 1024
@@ -1962,12 +1962,18 @@ bytea *
 partitioned_table_reloptions(Datum reloptions, bool validate)
 {
 	/*
-	 * There are no options for partitioned tables yet, but this is able to do
-	 * some validation.
+	 * Currently the only setting known to be useful for partitioned tables
+	 * is parallel_workers.
 	 */
+	static const relopt_parse_elt tab[] = {
+		{"parallel_workers", RELOPT_TYPE_INT,
+		offsetof(PartitionedTableRdOptions, parallel_workers)},
+	};
+
 	return (bytea *) build_reloptions(reloptions, validate,
 									  RELOPT_KIND_PARTITIONED,
-									  0, NULL, 0);
+									  sizeof(PartitionedTableRdOptions),
+									  tab, lengthof(tab));
 }
 
 /*
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index d73ac562eb..e22cb6f33e 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -97,6 +97,9 @@ static void set_append_rel_size(PlannerInfo *root, RelOptInfo *rel,
 								Index rti, RangeTblEntry *rte);
 static void set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 									Index rti, RangeTblEntry *rte);
+static int compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+										   int num_live_children,
+										   bool parallel_append);
 static void generate_orderedappend_paths(PlannerInfo *root, RelOptInfo *rel,
 										 List *live_childrels,
 										 List *all_child_pathkeys);
@@ -1268,6 +1271,65 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	add_paths_to_append_rel(root, rel, live_childrels);
 }
 
+/*
+ * compute_append_parallel_workers
+ * 		Computes the number of workers to assign to scan the subpaths appended
+ * 		by a given Append path
+ */
+static int
+compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+								int num_live_children,
+								bool parallel_append)
+{
+	ListCell   *lc;
+	int			parallel_workers = 0;
+
+	/*
+	 * For partitioned rels, first see if there is a root-level setting for
+	 * parallel_workers.  But only consider if a Parallel Append plan is
+	 * to be considered.
+	 */
+	if (IS_PARTITIONED_REL(rel) &&
+		parallel_append &&
+		rel->rel_parallel_workers != -1)
+	{
+		parallel_workers = Min(rel->rel_parallel_workers,
+							   max_parallel_workers_per_gather);
+
+		/* an explicit setting overrides heuristics */
+		return parallel_workers;
+	}
+
+	/* Find the highest number of workers requested for any subpath. */
+	foreach(lc, subpaths)
+	{
+		Path	   *path = lfirst(lc);
+
+		parallel_workers = Max(parallel_workers, path->parallel_workers);
+	}
+	Assert(parallel_workers > 0 || subpaths == NIL);
+
+	/*
+	 * If the use of parallel append is permitted, always request at least
+	 * log2(# of children) workers.  We assume it can be useful to have
+	 * extra workers in this case because they will be spread out across
+	 * the children.  The precise formula is just a guess, but we don't
+	 * want to end up with a radically different answer for a table with N
+	 * partitions vs. an unpartitioned table with the same data, so the
+	 * use of some kind of log-scaling here seems to make some sense.
+	 */
+	if (parallel_append)
+	{
+		parallel_workers = Max(parallel_workers,
+							   fls(num_live_children));
+		parallel_workers = Min(parallel_workers,
+							   max_parallel_workers_per_gather);
+	}
+	Assert(parallel_workers > 0);
+
+	return parallel_workers;
+}
+
 
 /*
  * add_paths_to_append_rel
@@ -1464,50 +1526,28 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (partial_subpaths_valid && partial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, partial_subpaths,
+											list_length(live_childrels),
+											enable_parallel_append);
 
-		/* Find the highest number of workers requested for any subpath. */
-		foreach(lc, partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
+			/* Generate a partial append path. */
+			appendpath = create_append_path(root, rel, NIL, partial_subpaths,
+											NIL, NULL, parallel_workers,
+											enable_parallel_append,
+											-1);
 
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
-		}
-		Assert(parallel_workers > 0);
+			/*
+			 * Make sure any subsequent partial paths use the same row count
+			 * estimate.
+			 */
+			partial_rows = appendpath->path.rows;
 
-		/*
-		 * If the use of parallel append is permitted, always request at least
-		 * log2(# of children) workers.  We assume it can be useful to have
-		 * extra workers in this case because they will be spread out across
-		 * the children.  The precise formula is just a guess, but we don't
-		 * want to end up with a radically different answer for a table with N
-		 * partitions vs. an unpartitioned table with the same data, so the
-		 * use of some kind of log-scaling here seems to make some sense.
-		 */
-		if (enable_parallel_append)
-		{
-			parallel_workers = Max(parallel_workers,
-								   fls(list_length(live_childrels)));
-			parallel_workers = Min(parallel_workers,
-								   max_parallel_workers_per_gather);
+			/* Add the path */
+			add_partial_path(rel, (Path *) appendpath);
 		}
-		Assert(parallel_workers > 0);
-
-		/* Generate a partial append path. */
-		appendpath = create_append_path(root, rel, NIL, partial_subpaths,
-										NIL, NULL, parallel_workers,
-										enable_parallel_append,
-										-1);
-
-		/*
-		 * Make sure any subsequent partial paths use the same row count
-		 * estimate.
-		 */
-		partial_rows = appendpath->path.rows;
-
-		/* Add the path. */
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
@@ -1519,36 +1559,19 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, pa_partial_subpaths,
+											list_length(live_childrels),
+											true);
 
-		/*
-		 * Find the highest number of workers requested for any partial
-		 * subpath.
-		 */
-		foreach(lc, pa_partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
-
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
+			appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
+											pa_partial_subpaths,
+											NIL, NULL, parallel_workers, true,
+											partial_rows);
+			add_partial_path(rel, (Path *) appendpath);
 		}
-
-		/*
-		 * Same formula here as above.  It's even more important in this
-		 * instance because the non-partial paths won't contribute anything to
-		 * the planned number of parallel workers.
-		 */
-		parallel_workers = Max(parallel_workers,
-							   fls(list_length(live_childrels)));
-		parallel_workers = Min(parallel_workers,
-							   max_parallel_workers_per_gather);
-		Assert(parallel_workers > 0);
-
-		appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
-										pa_partial_subpaths,
-										NIL, NULL, parallel_workers, true,
-										partial_rows);
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 545b56bcaf..ebb99a71d2 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3938,6 +3938,9 @@ make_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 	grouped_rel->useridiscurrent = input_rel->useridiscurrent;
 	grouped_rel->fdwroutine = input_rel->fdwroutine;
 
+	/* Copy parallel_workers. */
+	grouped_rel->rel_parallel_workers = input_rel->rel_parallel_workers;
+
 	return grouped_rel;
 }
 
@@ -7605,6 +7608,20 @@ create_partitionwise_grouping_paths(PlannerInfo *root,
 	Assert(patype != PARTITIONWISE_AGGREGATE_PARTIAL ||
 		   partially_grouped_rel != NULL);
 
+	/* Like input rel, make fully grouped rel appear partitioned too. */
+	Assert(IS_PARTITIONED_REL(input_rel));
+	if (patype == PARTITIONWISE_AGGREGATE_FULL)
+	{
+		grouped_rel->part_scheme = input_rel->part_scheme;
+		grouped_rel->partexprs = input_rel->partexprs;
+		grouped_rel->nullable_partexprs = input_rel->nullable_partexprs;
+		grouped_rel->boundinfo = input_rel->boundinfo;
+		grouped_rel->nparts = nparts;
+		Assert(grouped_rel->part_rels == NULL);
+		grouped_rel->part_rels =
+			(RelOptInfo **) palloc0(sizeof(RelOptInfo *) * nparts);
+	}
+
 	/* Add paths for partitionwise aggregation/grouping. */
 	for (cnt_parts = 0; cnt_parts < nparts; cnt_parts++)
 	{
@@ -7680,6 +7697,8 @@ create_partitionwise_grouping_paths(PlannerInfo *root,
 			set_cheapest(child_grouped_rel);
 			grouped_live_children = lappend(grouped_live_children,
 											child_grouped_rel);
+			Assert(grouped_rel->part_rels[cnt_parts] == NULL);
+			grouped_rel->part_rels[cnt_parts] = child_grouped_rel;
 		}
 
 		pfree(appinfos);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 10b63982c0..efa7b714cb 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -308,6 +308,16 @@ typedef struct StdRdOptions
 	bool		vacuum_truncate;	/* enables vacuum to truncate a relation */
 } StdRdOptions;
 
+/*
+ * PartitionedTableRdOptions
+ * 		Contents of rd_options for partitioned tables
+ */
+typedef struct PartitionedTableRdOptions
+{
+	int32		vl_len_;		/* varlena header (do not touch directly!) */
+	int			parallel_workers;	/* max number of parallel workers */
+} PartitionedTableRdOptions;
+
 #define HEAP_MIN_FILLFACTOR			10
 #define HEAP_DEFAULT_FILLFACTOR		100
 
@@ -359,7 +369,10 @@ typedef struct StdRdOptions
  */
 #define RelationGetParallelWorkers(relation, defaultpw) \
 	((relation)->rd_options ? \
-	 ((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw))
+	 ((relation)->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? \
+	  ((PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : \
+	  ((StdRdOptions *) (relation)->rd_options)->parallel_workers \
+	 ) : (defaultpw))
 
 /* ViewOptions->check_option values */
 typedef enum ViewOptCheckOption
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index dfa4b036b5..2dbdf39957 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -1166,9 +1166,80 @@ SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 O
  29 | 4500 |   500
 (12 rows)
 
--- Parallelism within partitionwise aggregates
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 4
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 6
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT a, count(*) FROM pagg_tab_ml GROUP BY a;
+                             QUERY PLAN                              
+---------------------------------------------------------------------
+ Gather
+   Workers Planned: 6
+   ->  Parallel Append
+         ->  HashAggregate
+               Group Key: pagg_tab_ml_5.a
+               ->  Append
+                     ->  Seq Scan on pagg_tab_ml_p3_s1 pagg_tab_ml_5
+                     ->  Seq Scan on pagg_tab_ml_p3_s2 pagg_tab_ml_6
+         ->  HashAggregate
+               Group Key: pagg_tab_ml.a
+               ->  Seq Scan on pagg_tab_ml_p1 pagg_tab_ml
+         ->  HashAggregate
+               Group Key: pagg_tab_ml_2.a
+               ->  Append
+                     ->  Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+                     ->  Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+(16 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Append
+   ->  Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+         Filter: (b = 42)
+(7 rows)
+
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+-- Parallelism within partitionwise aggregates
+SET max_parallel_workers_per_gather TO 2;
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
 -- PARTITION KEY, thus we will have a partial aggregation for them.
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index c17294b15b..1269a86792 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -253,10 +253,25 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 
--- Parallelism within partitionwise aggregates
-
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+EXPLAIN (COSTS OFF)
+SELECT a, count(*) FROM pagg_tab_ml GROUP BY a;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+
+-- Parallelism within partitionwise aggregates
+
+SET max_parallel_workers_per_gather TO 2;
 
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
-- 
2.26.2

Reply via email to