v4, with a test case and some more comment-polishing.  I think this
is committable.

                        regards, tom lane

diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d..ad9eba6 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,17 +47,26 @@
  * greater than any 32-bit integer here so that values < 2^32 can be used
  * by individual parallel nodes to store their own state.
  */
-#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
 /*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
+/*
  * DSM structure for accumulating per-PlanState instrumentation.
  *
  * instrument_options: Same meaning here as in instrument.c.
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+					 int64 tuples_needed)
 {
 	ParallelExecutorInfo *pei;
 	ParallelContext *pcxt;
 	ExecParallelEstimateContext e;
 	ExecParallelInitializeDSMContext d;
+	FixedParallelExecutorState *fpes;
 	char	   *pstmt_data;
 	char	   *pstmt_space;
 	char	   *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	 * for the various things we need to store.
 	 */
 
+	/* Estimate space for fixed-size state. */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   sizeof(FixedParallelExecutorState));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Estimate space for query text. */
 	query_len = strlen(estate->es_sourceText);
 	shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	 * asked for has been allocated or initialized yet, though, so do that.
 	 */
 
+	/* Store fixed-size state. */
+	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+	fpes->tuples_needed = tuples_needed;
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
 	/* Store query string */
 	query_string = shm_toc_allocate(pcxt->toc, query_len);
 	memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
 void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
+	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 
+	/* Get fixed-size state. */
+	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
 	receiver = ExecParallelGetReceiver(seg, toc);
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	queryDesc->planstate->state->es_query_dsa = area;
 	ExecParallelInitializeWorker(queryDesc->planstate, toc);
 
-	/* Run the plan */
-	ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+	/* Pass down any tuple bound */
+	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+	/*
+	 * Run the plan.  If we specified a tuple bound, be careful not to demand
+	 * more tuples than that.
+	 */
+	ExecutorRun(queryDesc,
+				ForwardScanDirection,
+				fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+				true);
 
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36d2914..c1aa506 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)
 
 	return false;
 }
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node.  This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples that
+ * their parent will demand is limited.  The tuple bound for a node may
+ * only be changed between scans (i.e., after node initialization or just
+ * before an ExecReScan call).
+ *
+ * Any negative tuples_needed value means "no limit", which should be the
+ * default assumption when this is not called at all for a particular node.
+ *
+ * Note: if this is called repeatedly on a plan tree, the exact same set
+ * of nodes must be updated with the new limit each time; be careful that
+ * only unchanging conditions are tested here.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+	/*
+	 * Since this function recurses, in principle we should check stack depth
+	 * here.  In practice, it's probably pointless since the earlier node
+	 * initialization tree traversal would surely have consumed more stack.
+	 */
+
+	if (IsA(child_node, SortState))
+	{
+		/*
+		 * If it is a Sort node, notify it that it can use bounded sort.
+		 *
+		 * Note: it is the responsibility of nodeSort.c to react properly to
+		 * changes of these parameters.  If we ever redesign this, it'd be a
+		 * good idea to integrate this signaling with the parameter-change
+		 * mechanism.
+		 */
+		SortState  *sortState = (SortState *) child_node;
+
+		if (tuples_needed < 0)
+		{
+			/* make sure flag gets reset if needed upon rescan */
+			sortState->bounded = false;
+		}
+		else
+		{
+			sortState->bounded = true;
+			sortState->bound = tuples_needed;
+		}
+	}
+	else if (IsA(child_node, MergeAppendState))
+	{
+		/*
+		 * If it is a MergeAppend, we can apply the bound to any nodes that
+		 * are children of the MergeAppend, since the MergeAppend surely need
+		 * read no more than that many tuples from any one input.
+		 */
+		MergeAppendState *maState = (MergeAppendState *) child_node;
+		int			i;
+
+		for (i = 0; i < maState->ms_nplans; i++)
+			ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+	}
+	else if (IsA(child_node, ResultState))
+	{
+		/*
+		 * Similarly, for a projecting Result, we can apply the bound to its
+		 * child node.
+		 *
+		 * If Result supported qual checking, we'd have to punt on seeing a
+		 * qual.  Note that having a resconstantqual is not a showstopper: if
+		 * that condition succeeds it affects nothing, while if it fails, no
+		 * rows will be demanded from the Result child anyway.
+		 */
+		if (outerPlanState(child_node))
+			ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+	else if (IsA(child_node, SubqueryScanState))
+	{
+		/*
+		 * We can also descend through SubqueryScan, but only if it has no
+		 * qual (otherwise it might discard rows).
+		 */
+		SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+		if (subqueryState->ss.ps.qual == NULL)
+			ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+	}
+	else if (IsA(child_node, GatherState))
+	{
+		/*
+		 * A Gather node can propagate the bound to its workers.  As with
+		 * MergeAppend, no one worker could possibly need to return more
+		 * tuples than the Gather itself needs to.
+		 *
+		 * Note: As with Sort, the Gather node is responsible for reacting
+		 * properly to changes to this parameter.
+		 */
+		GatherState *gstate = (GatherState *) child_node;
+
+		gstate->tuples_needed = tuples_needed;
+
+		/* Also pass down the bound to our own copy of the child plan */
+		ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+	else if (IsA(child_node, GatherMergeState))
+	{
+		/* Same comments as for Gather */
+		GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+		gstate->tuples_needed = tuples_needed;
+
+		ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+
+	/*
+	 * In principle we could descend through any plan node type that is
+	 * certain not to discard or combine input rows; but on seeing a node that
+	 * can do that, we can't propagate the bound any further.  For the moment
+	 * it's unclear that any other cases are worth checking here.
+	 */
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index e8d94ee..a0f5a60 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.state = estate;
 	gatherstate->ps.ExecProcNode = ExecGather;
 	gatherstate->need_to_scan_locally = !node->single_copy;
+	gatherstate->tuples_needed = -1;
 
 	/*
 	 * Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
 												 estate,
-												 gather->num_workers);
+												 gather->num_workers,
+												 node->tuples_needed);
 
 			/*
 			 * Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 64c6239..2526c58 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
 	gm_state->ps.plan = (Plan *) node;
 	gm_state->ps.state = estate;
 	gm_state->ps.ExecProcNode = ExecGatherMerge;
+	gm_state->tuples_needed = -1;
 
 	/*
 	 * Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
 												 estate,
-												 gm->num_workers);
+												 gm->num_workers,
+												 node->tuples_needed);
 
 			/* Try to launch workers. */
 			pcxt = node->pei->pcxt;
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index ceb6854..883f46c 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -27,7 +27,7 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);
 
 
 /* ----------------------------------------------------------------
@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
 	/* Set state-machine state */
 	node->lstate = LIMIT_RESCAN;
 
-	/* Notify child node about limit, if useful */
-	pass_down_bound(node, outerPlanState(node));
+	/*
+	 * Notify child node about limit.  Note: think not to "optimize" by
+	 * skipping ExecSetTupleBound if compute_tuples_needed returns < 0.  We
+	 * must update the child node anyway, in case this is a rescan and the
+	 * previous time we got a different result.
+	 */
+	ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
 }
 
 /*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the maximum number of tuples needed to satisfy this Limit node.
+ * Return a negative value if there is not a determinable limit.
  */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
 {
-	/*
-	 * Since this function recurses, in principle we should check stack depth
-	 * here.  In practice, it's probably pointless since the earlier node
-	 * initialization tree traversal would surely have consumed more stack.
-	 */
-
-	if (IsA(child_node, SortState))
-	{
-		SortState  *sortState = (SortState *) child_node;
-		int64		tuples_needed = node->count + node->offset;
-
-		/* negative test checks for overflow in sum */
-		if (node->noCount || tuples_needed < 0)
-		{
-			/* make sure flag gets reset if needed upon rescan */
-			sortState->bounded = false;
-		}
-		else
-		{
-			sortState->bounded = true;
-			sortState->bound = tuples_needed;
-		}
-	}
-	else if (IsA(child_node, MergeAppendState))
-	{
-		/* Pass down the bound through MergeAppend */
-		MergeAppendState *maState = (MergeAppendState *) child_node;
-		int			i;
-
-		for (i = 0; i < maState->ms_nplans; i++)
-			pass_down_bound(node, maState->mergeplans[i]);
-	}
-	else if (IsA(child_node, ResultState))
-	{
-		/*
-		 * We also have to be prepared to look through a Result, since the
-		 * planner might stick one atop MergeAppend for projection purposes.
-		 *
-		 * If Result supported qual checking, we'd have to punt on seeing a
-		 * qual.  Note that having a resconstantqual is not a showstopper: if
-		 * that fails we're not getting any rows at all.
-		 */
-		if (outerPlanState(child_node))
-			pass_down_bound(node, outerPlanState(child_node));
-	}
-	else if (IsA(child_node, SubqueryScanState))
-	{
-		/*
-		 * We can also look through SubqueryScan, but only if it has no qual
-		 * (otherwise it might discard rows).
-		 */
-		SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
-		if (subqueryState->ss.ps.qual == NULL)
-			pass_down_bound(node, subqueryState->subplan);
-	}
-
-	/*
-	 * In principle we could look through any plan node type that is certain
-	 * not to discard or combine input rows.  In practice, there are not many
-	 * node types that the planner might put between Sort and Limit, so trying
-	 * to be very general is not worth the trouble.
-	 */
+	if (node->noCount)
+		return -1;
+	/* Note: if this overflows, we'll return a negative value, which is OK */
+	return node->count + node->offset;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index bd0a87f..79b8867 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-					 EState *estate, int nworkers);
+					 EState *estate, int nworkers, int64 tuples_needed);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eacbea3..f48a603 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3272c4b..15a8426 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1919,6 +1919,7 @@ typedef struct GatherState
 	struct TupleQueueReader **reader;
 	TupleTableSlot *funnel_slot;
 	bool		need_to_scan_locally;
+	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
 } GatherState;
 
 /* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
 	struct binaryheap *gm_heap; /* binary heap of slot indices */
 	bool		gm_initialized; /* gather merge initilized ? */
 	bool		need_to_scan_locally;
+	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
 	int			gm_nkeys;
 	SortSupport gm_sortkeys;	/* array of length ms_nkeys */
 	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* tuple buffer per reader */
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 084f0f0..ccad18e 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty;
    500
 (20 rows)
 
+reset enable_hashagg;
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+                  QUERY PLAN                  
+----------------------------------------------
+ Limit
+   ->  Gather Merge
+         Workers Planned: 4
+         ->  Sort
+               Sort Key: fivethous
+               ->  Parallel Seq Scan on tenk1
+(6 rows)
+
+select fivethous from tenk1 order by fivethous limit 4;
+ fivethous 
+-----------
+         0
+         0
+         1
+         1
+(4 rows)
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5;
 (5 rows)
 
 reset max_parallel_workers;
-reset enable_hashagg;
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
 explain (costs off)
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 58c3f59..c0debdd 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -118,13 +118,20 @@ explain (costs off)
 
 select count(*) from tenk1 group by twenty;
 
+reset enable_hashagg;
+
+-- gather merge test with a LIMIT
+explain (costs off)
+  select fivethous from tenk1 order by fivethous limit 4;
+
+select fivethous from tenk1 order by fivethous limit 4;
+
 -- gather merge test with 0 worker
 set max_parallel_workers = 0;
 explain (costs off)
    select string4 from tenk1 order by string4 limit 5;
 select string4 from tenk1 order by string4 limit 5;
 reset max_parallel_workers;
-reset enable_hashagg;
 
 SAVEPOINT settings;
 SET LOCAL force_parallel_mode = 1;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to