On Mon, 2021-03-01 at 17:39 +0900, Amit Langote wrote:
> > I am not sure.
> > IMO, for normal table, we use the following macro to get the 
> > parallel_workers:
> > ----------------------
> > /*
> >   * RelationGetParallelWorkers
> >   *              Returns the relation's parallel_workers reloption setting.
> >   *              Note multiple eval of argument!
> >   */
> > #define RelationGetParallelWorkers(relation, defaultpw) \
> >          ((relation)->rd_options ? \
> >           ((StdRdOptions *) (relation)->rd_options)->parallel_workers : 
> > (defaultpw))
> > ----------------------
> > Since we add new struct " PartitionedTableRdOptions ", It seems we need to 
> > get parallel_workers in different way.
> > Do we need similar macro to get partitioned table's parallel_workers ?
> 
> Oh, you're right.  The parallel_workers setting of a relation is only
> accessed through this macro, even for partitioned tables, and I can
> see that it is actually wrong to access a partitioned table's
> parallel_workers through this macro as-is.   Although I hadn't tested
> that, so thanks for pointing that out.
> 
> > Like:
> > #define PartitionedTableGetParallelWorkers(relation, defaultpw) \
> > xxx
> > (PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : 
> > (defaultpw))
> 
> I'm thinking it would be better to just modify the existing macro to
> check relkind to decide which struct pointer type to cast the value in
> rd_options to.

Here is an updated patch with this fix.

I added regression tests and adapted the documentation a bit.

I also added support for lowering the number of parallel workers.

Yours,
Laurenz Albe
From d48874a61ac698dc284b83b7e3b147a71158d09d Mon Sep 17 00:00:00 2001
From: Laurenz Albe <laurenz.a...@cybertec.at>
Date: Mon, 1 Mar 2021 16:06:49 +0100
Subject: [PATCH] Allow setting parallel_workers on partitioned tables.

Sometimes it's beneficial to do

ALTER TABLE my_part_tbl SET (parallel_workers = 32);

when the query planner is planning too few workers to read from your
partitions. For example, if you have a partitioned table where each
partition is a foreign table that cannot itself have this setting on it.
Shown to give a 100%+ performance increase (in my case, 700%) when used
with cstore_fdw column store partitions.

This is also useful to lower the number of parallel workers, for
example when the executor is expected to prune most partitions.

Authors: Seamus Abshere, Amit Langote, Laurenz Albe
Discussion: https://postgr.es/m/95b1dd96-8634-4545-b1de-e2ac779beb44%40www.fastmail.com
---
 doc/src/sgml/ref/create_table.sgml            |  15 +-
 src/backend/access/common/reloptions.c        |  14 +-
 src/backend/optimizer/path/allpaths.c         | 155 ++++++++++--------
 src/include/utils/rel.h                       |  15 +-
 .../regress/expected/partition_aggregate.out  |  51 +++++-
 src/test/regress/sql/partition_aggregate.sql  |  17 +-
 6 files changed, 188 insertions(+), 79 deletions(-)

diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index 3b2b227683..c8c14850c1 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1337,8 +1337,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     If a table parameter value is set and the
     equivalent <literal>toast.</literal> parameter is not, the TOAST table
     will use the table's parameter value.
-    Specifying these parameters for partitioned tables is not supported,
-    but you may specify them for individual leaf partitions.
+    These parameters, with the exception of <literal>parallel_workers</literal>,
+    are not supported on partitioned tables, but you may specify them for
+    individual leaf partitions.
    </para>
 
    <variablelist>
@@ -1401,9 +1402,13 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
      <para>
       This sets the number of workers that should be used to assist a parallel
       scan of this table.  If not set, the system will determine a value based
-      on the relation size.  The actual number of workers chosen by the planner
-      or by utility statements that use parallel scans may be less, for example
-      due to the setting of <xref linkend="guc-max-worker-processes"/>.
+      on the relation size and the number of scanned partitions.
+      When set on a partitioned table, the specified number of workers will
+      work on distinct partitions, so the number of partitions affected by the
+      parallel operation should be taken into account.
+      The actual number of workers chosen by the planner or by utility
+      statements that use parallel scans may be less, for example due
+      to the setting of <xref linkend="guc-max-worker-processes"/>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index c687d3ee9e..f8443d2361 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -377,7 +377,7 @@ static relopt_int intRelOpts[] =
 		{
 			"parallel_workers",
 			"Number of parallel processes that can be used per executor node for this relation.",
-			RELOPT_KIND_HEAP,
+			RELOPT_KIND_HEAP | RELOPT_KIND_PARTITIONED,
 			ShareUpdateExclusiveLock
 		},
 		-1, 0, 1024
@@ -1962,12 +1962,18 @@ bytea *
 partitioned_table_reloptions(Datum reloptions, bool validate)
 {
 	/*
-	 * There are no options for partitioned tables yet, but this is able to do
-	 * some validation.
+	 * Currently the only setting known to be useful for partitioned tables
+	 * is parallel_workers.
 	 */
+	static const relopt_parse_elt tab[] = {
+		{"parallel_workers", RELOPT_TYPE_INT,
+		offsetof(PartitionedTableRdOptions, parallel_workers)},
+	};
+
 	return (bytea *) build_reloptions(reloptions, validate,
 									  RELOPT_KIND_PARTITIONED,
-									  0, NULL, 0);
+									  sizeof(PartitionedTableRdOptions),
+									  tab, lengthof(tab));
 }
 
 /*
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index d73ac562eb..e22cb6f33e 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -97,6 +97,9 @@ static void set_append_rel_size(PlannerInfo *root, RelOptInfo *rel,
 								Index rti, RangeTblEntry *rte);
 static void set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 									Index rti, RangeTblEntry *rte);
+static int compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+										   int num_live_children,
+										   bool parallel_append);
 static void generate_orderedappend_paths(PlannerInfo *root, RelOptInfo *rel,
 										 List *live_childrels,
 										 List *all_child_pathkeys);
@@ -1268,6 +1271,65 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	add_paths_to_append_rel(root, rel, live_childrels);
 }
 
+/*
+ * compute_append_parallel_workers
+ * 		Computes the number of workers to assign to scan the subpaths appended
+ * 		by a given Append path
+ */
+static int
+compute_append_parallel_workers(RelOptInfo *rel, List *subpaths,
+								int num_live_children,
+								bool parallel_append)
+{
+	ListCell   *lc;
+	int			parallel_workers = 0;
+
+	/*
+	 * For partitioned rels, first see if there is a root-level setting for
+	 * parallel_workers.  But only consider if a Parallel Append plan is
+	 * to be considered.
+	 */
+	if (IS_PARTITIONED_REL(rel) &&
+		parallel_append &&
+		rel->rel_parallel_workers != -1)
+	{
+		parallel_workers = Min(rel->rel_parallel_workers,
+							   max_parallel_workers_per_gather);
+
+		/* an explicit setting overrides heuristics */
+		return parallel_workers;
+	}
+
+	/* Find the highest number of workers requested for any subpath. */
+	foreach(lc, subpaths)
+	{
+		Path	   *path = lfirst(lc);
+
+		parallel_workers = Max(parallel_workers, path->parallel_workers);
+	}
+	Assert(parallel_workers > 0 || subpaths == NIL);
+
+	/*
+	 * If the use of parallel append is permitted, always request at least
+	 * log2(# of children) workers.  We assume it can be useful to have
+	 * extra workers in this case because they will be spread out across
+	 * the children.  The precise formula is just a guess, but we don't
+	 * want to end up with a radically different answer for a table with N
+	 * partitions vs. an unpartitioned table with the same data, so the
+	 * use of some kind of log-scaling here seems to make some sense.
+	 */
+	if (parallel_append)
+	{
+		parallel_workers = Max(parallel_workers,
+							   fls(num_live_children));
+		parallel_workers = Min(parallel_workers,
+							   max_parallel_workers_per_gather);
+	}
+	Assert(parallel_workers > 0);
+
+	return parallel_workers;
+}
+
 
 /*
  * add_paths_to_append_rel
@@ -1464,50 +1526,28 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (partial_subpaths_valid && partial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, partial_subpaths,
+											list_length(live_childrels),
+											enable_parallel_append);
 
-		/* Find the highest number of workers requested for any subpath. */
-		foreach(lc, partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
+			/* Generate a partial append path. */
+			appendpath = create_append_path(root, rel, NIL, partial_subpaths,
+											NIL, NULL, parallel_workers,
+											enable_parallel_append,
+											-1);
 
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
-		}
-		Assert(parallel_workers > 0);
+			/*
+			 * Make sure any subsequent partial paths use the same row count
+			 * estimate.
+			 */
+			partial_rows = appendpath->path.rows;
 
-		/*
-		 * If the use of parallel append is permitted, always request at least
-		 * log2(# of children) workers.  We assume it can be useful to have
-		 * extra workers in this case because they will be spread out across
-		 * the children.  The precise formula is just a guess, but we don't
-		 * want to end up with a radically different answer for a table with N
-		 * partitions vs. an unpartitioned table with the same data, so the
-		 * use of some kind of log-scaling here seems to make some sense.
-		 */
-		if (enable_parallel_append)
-		{
-			parallel_workers = Max(parallel_workers,
-								   fls(list_length(live_childrels)));
-			parallel_workers = Min(parallel_workers,
-								   max_parallel_workers_per_gather);
+			/* Add the path */
+			add_partial_path(rel, (Path *) appendpath);
 		}
-		Assert(parallel_workers > 0);
-
-		/* Generate a partial append path. */
-		appendpath = create_append_path(root, rel, NIL, partial_subpaths,
-										NIL, NULL, parallel_workers,
-										enable_parallel_append,
-										-1);
-
-		/*
-		 * Make sure any subsequent partial paths use the same row count
-		 * estimate.
-		 */
-		partial_rows = appendpath->path.rows;
-
-		/* Add the path. */
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
@@ -1519,36 +1559,19 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
 	if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL)
 	{
 		AppendPath *appendpath;
-		ListCell   *lc;
-		int			parallel_workers = 0;
+		int			parallel_workers =
+			compute_append_parallel_workers(rel, pa_partial_subpaths,
+											list_length(live_childrels),
+											true);
 
-		/*
-		 * Find the highest number of workers requested for any partial
-		 * subpath.
-		 */
-		foreach(lc, pa_partial_subpaths)
+		if (parallel_workers > 0)
 		{
-			Path	   *path = lfirst(lc);
-
-			parallel_workers = Max(parallel_workers, path->parallel_workers);
+			appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
+											pa_partial_subpaths,
+											NIL, NULL, parallel_workers, true,
+											partial_rows);
+			add_partial_path(rel, (Path *) appendpath);
 		}
-
-		/*
-		 * Same formula here as above.  It's even more important in this
-		 * instance because the non-partial paths won't contribute anything to
-		 * the planned number of parallel workers.
-		 */
-		parallel_workers = Max(parallel_workers,
-							   fls(list_length(live_childrels)));
-		parallel_workers = Min(parallel_workers,
-							   max_parallel_workers_per_gather);
-		Assert(parallel_workers > 0);
-
-		appendpath = create_append_path(root, rel, pa_nonpartial_subpaths,
-										pa_partial_subpaths,
-										NIL, NULL, parallel_workers, true,
-										partial_rows);
-		add_partial_path(rel, (Path *) appendpath);
 	}
 
 	/*
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 10b63982c0..efa7b714cb 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -308,6 +308,16 @@ typedef struct StdRdOptions
 	bool		vacuum_truncate;	/* enables vacuum to truncate a relation */
 } StdRdOptions;
 
+/*
+ * PartitionedTableRdOptions
+ * 		Contents of rd_options for partitioned tables
+ */
+typedef struct PartitionedTableRdOptions
+{
+	int32		vl_len_;		/* varlena header (do not touch directly!) */
+	int			parallel_workers;	/* max number of parallel workers */
+} PartitionedTableRdOptions;
+
 #define HEAP_MIN_FILLFACTOR			10
 #define HEAP_DEFAULT_FILLFACTOR		100
 
@@ -359,7 +369,10 @@ typedef struct StdRdOptions
  */
 #define RelationGetParallelWorkers(relation, defaultpw) \
 	((relation)->rd_options ? \
-	 ((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw))
+	 ((relation)->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? \
+	  ((PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : \
+	  ((StdRdOptions *) (relation)->rd_options)->parallel_workers \
+	 ) : (defaultpw))
 
 /* ViewOptions->check_option values */
 typedef enum ViewOptCheckOption
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index dfa4b036b5..611dfc87ff 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -1166,9 +1166,58 @@ SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 O
  29 | 4500 |   500
 (12 rows)
 
--- Parallelism within partitionwise aggregates
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 4
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Gather
+   Workers Planned: 6
+   ->  Parallel Append
+         ->  Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+               Filter: (b = 42)
+         ->  Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+               Filter: (b = 42)
+(9 rows)
+
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Append
+   ->  Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2
+         Filter: (b = 42)
+   ->  Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3
+         Filter: (b = 42)
+(7 rows)
+
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+-- Parallelism within partitionwise aggregates
+SET max_parallel_workers_per_gather TO 2;
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
 -- PARTITION KEY, thus we will have a partial aggregation for them.
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index c17294b15b..c058e2e181 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -253,10 +253,23 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3;
 
--- Parallelism within partitionwise aggregates
-
+-- Override "parallel_workers" for a partitioned table
 SET min_parallel_table_scan_size TO '8kB';
 SET parallel_setup_cost TO 0;
+SET max_parallel_workers_per_gather TO 8;
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 6);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml SET (parallel_workers = 0);
+EXPLAIN (COSTS OFF)
+SELECT a FROM pagg_tab_ml WHERE b = 42;
+ALTER TABLE pagg_tab_ml RESET (parallel_workers);
+
+-- Parallelism within partitionwise aggregates
+
+SET max_parallel_workers_per_gather TO 2;
 
 -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY
 -- for level 1 only. For subpartitions, GROUP BY clause does not match with
-- 
2.26.2

Reply via email to