From 757b41801c9297bd36a753fd3c1e665e38c6cbbd Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Tue, 17 Oct 2017 23:40:03 +1300
Subject: [PATCH] Add a GUC to control whether gather nodes run subplans in
 leaders.

Gather and Gather Merge nodes are responsible for gathering tuples from worker
processes, but also run the subplan directly in the leader process.  Add a
new GUC parallel_leader_participation to enable or disable this type of
multiplexing.  If set to off, the leader process only runs the plan as a fall
back in case no workers could be launched.  This GUC is initially intended for
testing, but might prove useful for end users.

Thomas Munro
Discussion: https://postgr.es/m/CAEepm%3D2U%2B%2BLp3bNTv2Bv_kkr5NE2pOyHhxU%3DG0YTa4ZhSYhHiw%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  24 ++++++
 src/backend/executor/nodeGather.c             |   8 +-
 src/backend/executor/nodeGatherMerge.c        |   6 +-
 src/backend/optimizer/path/costsize.c         |  12 ++-
 src/backend/optimizer/plan/planner.c          |   1 +
 src/backend/utils/misc/guc.c                  |  10 +++
 src/include/optimizer/planmain.h              |   1 +
 src/test/regress/expected/select_parallel.out | 113 ++++++++++++++++++++++++++
 src/test/regress/sql/select_parallel.sql      |  36 ++++++++
 9 files changed, 202 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d360fc4d58a..cb550f0ecdd 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4265,6 +4265,30 @@ SELECT * FROM parent WHERE key = 2400;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-parallel-leader-participation" xreflabel="parallel_leader_participation">
+      <term>
+       <varname>parallel_leader_participation</varname> (<type>boolean</type>)
+       <indexterm>
+        <primary>
+         <varname>parallel_leader_participation</varname> configuration
+         parameter
+        </primary>
+       </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Allows the leader process to execute the query plan under
+        <literal>Gather</literal> and <literal>Gather Merge</literal> nodes
+        instead of waiting for worker processes.  The default is
+        <literal>on</literal>.  Setting this value to <literal>on</literal>
+        can cause the leader process to begin producing tuples sooner instead
+        of waiting for worker processes to start up, but might in some cases
+        also cause workers to become blocked waiting for the leader to clear
+        tuple queues.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-force-parallel-mode" xreflabel="force_parallel_mode">
       <term><varname>force_parallel_mode</varname> (<type>enum</type>)
       <indexterm>
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 639f4f5af88..0298c65d065 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -38,6 +38,7 @@
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
 #include "miscadmin.h"
+#include "optimizer/planmain.h"
 #include "pgstat.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -73,7 +74,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.ExecProcNode = ExecGather;
 
 	gatherstate->initialized = false;
-	gatherstate->need_to_scan_locally = !node->single_copy;
+	gatherstate->need_to_scan_locally =
+		!node->single_copy && parallel_leader_participation;
 	gatherstate->tuples_needed = -1;
 
 	/*
@@ -193,9 +195,9 @@ ExecGather(PlanState *pstate)
 			node->nextreader = 0;
 		}
 
-		/* Run plan locally if no workers or not single-copy. */
+		/* Run plan locally if no workers or enabled and not single-copy. */
 		node->need_to_scan_locally = (node->nreaders == 0)
-			|| !gather->single_copy;
+			|| (!gather->single_copy && parallel_leader_participation);
 		node->initialized = true;
 	}
 
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 5625b125210..7206ab91975 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -23,6 +23,7 @@
 #include "executor/tqueue.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
+#include "optimizer/planmain.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -233,8 +234,9 @@ ExecGatherMerge(PlanState *pstate)
 			}
 		}
 
-		/* always allow leader to participate */
-		node->need_to_scan_locally = true;
+		/* allow leader to participate if enabled or no choice */
+		if (parallel_leader_participation || node->nreaders == 0)
+			node->need_to_scan_locally = true;
 		node->initialized = true;
 	}
 
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 2d2df60886a..d11bf19e30a 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -5137,7 +5137,6 @@ static double
 get_parallel_divisor(Path *path)
 {
 	double		parallel_divisor = path->parallel_workers;
-	double		leader_contribution;
 
 	/*
 	 * Early experience with parallel query suggests that when there is only
@@ -5150,9 +5149,14 @@ get_parallel_divisor(Path *path)
 	 * its time servicing each worker, and the remainder executing the
 	 * parallel plan.
 	 */
-	leader_contribution = 1.0 - (0.3 * path->parallel_workers);
-	if (leader_contribution > 0)
-		parallel_divisor += leader_contribution;
+	if (parallel_leader_participation)
+	{
+		double		leader_contribution;
+
+		leader_contribution = 1.0 - (0.3 * path->parallel_workers);
+		if (leader_contribution > 0)
+			parallel_divisor += leader_contribution;
+	}
 
 	return parallel_divisor;
 }
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 9b7a8fd82c4..58e6a49289e 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -61,6 +61,7 @@
 /* GUC parameters */
 double		cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
 int			force_parallel_mode = FORCE_PARALLEL_OFF;
+bool		parallel_leader_participation = true;
 
 /* Hook for plugins to get control in planner() */
 planner_hook_type planner_hook = NULL;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index c4c1afa084b..84dc5b44538 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1676,6 +1676,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"parallel_leader_participation", PGC_USERSET, QUERY_TUNING_OTHER,
+			gettext_noop("Controls whether Gather and Gather Merge also run subplans."),
+			gettext_noop("Should gather nodes also run subplans, or just gather tuples?")
+		},
+		&parallel_leader_participation,
+		true,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index f1d16cffab0..d6133228bdd 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -29,6 +29,7 @@ typedef enum
 #define DEFAULT_CURSOR_TUPLE_FRACTION 0.1
 extern double cursor_tuple_fraction;
 extern int	force_parallel_mode;
+extern bool parallel_leader_participation;
 
 /* query_planner callback to compute query_pathkeys */
 typedef void (*query_pathkeys_callback) (PlannerInfo *root, void *extra);
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index ac9ad0668d1..d53a2786140 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -34,6 +34,49 @@ select count(*) from a_star;
     50
 (1 row)
 
+-- test with leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Seq Scan on tenk1
+                     Filter: (stringu1 = 'GRAAAA'::name)
+(6 rows)
+
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+ count 
+-------
+    15
+(1 row)
+
+-- test with leader participation disabled, but no workers available (so
+-- the leader will have to run the plan despite the setting)
+set max_parallel_workers = 0;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Seq Scan on tenk1
+                     Filter: (stringu1 = 'GRAAAA'::name)
+(6 rows)
+
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+ count 
+-------
+    15
+(1 row)
+
+reset max_parallel_workers;
+reset parallel_leader_participation;
 -- test that parallel_restricted function doesn't run in worker
 alter table tenk1 set (parallel_workers = 4);
 explain (verbose, costs off)
@@ -375,6 +418,49 @@ select count(*) from tenk1 group by twenty;
    500
 (20 rows)
 
+-- test gather merge with parallel leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+   select count(*) from tenk1 group by twenty;
+                     QUERY PLAN                     
+----------------------------------------------------
+ Finalize GroupAggregate
+   Group Key: twenty
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Partial GroupAggregate
+               Group Key: twenty
+               ->  Sort
+                     Sort Key: twenty
+                     ->  Parallel Seq Scan on tenk1
+(9 rows)
+
+select count(*) from tenk1 group by twenty;
+ count 
+-------
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+   500
+(20 rows)
+
+reset parallel_leader_participation;
 --test rescan behavior of gather merge
 set enable_material = false;
 explain (costs off)
@@ -465,6 +551,33 @@ select string4 from tenk1 order by string4 limit 5;
  AAAAxx
 (5 rows)
 
+-- gather merge test with 0 workers, with parallel leader
+-- participation disabled (the leader will have to run the plan
+-- despite the setting)
+set parallel_leader_participation = off;
+explain (costs off)
+   select string4 from tenk1 order by string4 limit 5;
+                  QUERY PLAN                  
+----------------------------------------------
+ Limit
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: string4
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+select string4 from tenk1 order by string4 limit 5;
+ string4 
+---------
+ AAAAxx
+ AAAAxx
+ AAAAxx
+ AAAAxx
+ AAAAxx
+(5 rows)
+
+reset parallel_leader_participation;
 reset max_parallel_workers;
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 495f0335dcc..6bc54ee5a48 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -19,6 +19,22 @@ explain (costs off)
   select count(*) from a_star;
 select count(*) from a_star;
 
+-- test with leader participation disabled
+set parallel_leader_participation = off;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+
+-- test with leader participation disabled, but no workers available (so
+-- the leader will have to run the plan despite the setting)
+set max_parallel_workers = 0;
+explain (costs off)
+  select count(*) from tenk1 where stringu1 = 'GRAAAA';
+select count(*) from tenk1 where stringu1 = 'GRAAAA';
+
+reset max_parallel_workers;
+reset parallel_leader_participation;
+
 -- test that parallel_restricted function doesn't run in worker
 alter table tenk1 set (parallel_workers = 4);
 explain (verbose, costs off)
@@ -144,6 +160,16 @@ explain (costs off)
 
 select count(*) from tenk1 group by twenty;
 
+-- test gather merge with parallel leader participation disabled
+set parallel_leader_participation = off;
+
+explain (costs off)
+   select count(*) from tenk1 group by twenty;
+
+select count(*) from tenk1 group by twenty;
+
+reset parallel_leader_participation;
+
 --test rescan behavior of gather merge
 set enable_material = false;
 
@@ -173,6 +199,16 @@ set max_parallel_workers = 0;
 explain (costs off)
    select string4 from tenk1 order by string4 limit 5;
 select string4 from tenk1 order by string4 limit 5;
+
+-- gather merge test with 0 workers, with parallel leader
+-- participation disabled (the leader will have to run the plan
+-- despite the setting)
+set parallel_leader_participation = off;
+explain (costs off)
+   select string4 from tenk1 order by string4 limit 5;
+select string4 from tenk1 order by string4 limit 5;
+
+reset parallel_leader_participation;
 reset max_parallel_workers;
 
 SAVEPOINT settings;
-- 
2.14.1

