Hi.

At Thu, 21 Jan 2016 19:09:19 +0900, Amit Langote 
<langote_amit...@lab.ntt.co.jp> wrote in <56a0ae4f.9000...@lab.ntt.co.jp>
> 
> Hi!
> 
> On 2016/01/21 18:26, Kyotaro HORIGUCHI wrote:
> >>> Then, suppose we add a function bool ExecStartAsync(PlanState *target,
> >>> ExecCallback callback, PlanState *cb_planstate, void *cb_context).
> >>> For non-async-aware plan nodes, this just returns false.  async-aware
> >>> plan nodes should initiate some work, register some callbacks, and
> >>> return.  The callback that get registered should arrange in turn to
> >>> register the callback passed as an argument when a tuple becomes
> >>> available, passing the planstate and context provided by
> >>> ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
> >>
> >> Although I don't imagine clearly about the case of
> >> async-aware-nodes under non-aware-nodes, it seems to have a high
> >> affinity with (true) parallel execution framework.
> > 
> > The ExecStartAsync is similar to ExecStartNode of my old
> > patch. One of the most annoying things of that is that it needs
> > to walk down to their descendents and in turn it needs garbageous
> > corresponding additional codes for all type of nodes which can
> > have children.
> 
> Unless I am missing something, I wonder if this is where
> planstate_tree_walker() introduced by commit 8dd401aa is useful. For
> example, I see that it's used in ExecShutdownNode() in a manner that looks
> interesting for this discussion.

Oh, that's a part of parallel execution sutff. Thanks for letting
me know of that. The walker approach also fits to kick functions
that most types of node is unrelated. Only one (or two, including
ForeignScan) types of nodes are involved.

The attached patches have the same functionality but using
planstate_tree_walker instead of callbacks. This seems further
simpler the callbacks.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center

>From a7f0f1f9077b474dd212db1fb690413dd7c4ef79 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor using
 planstate_tree_walker

This patch allows async-capable nodes to run the node before
ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to request
asynchronous execution to children on ExecInit phase.

As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
 src/backend/executor/execMain.c        |   2 +
 src/backend/executor/execProcnode.c    |  24 +++++
 src/backend/executor/nodeAppend.c      |   2 +
 src/backend/executor/nodeGather.c      | 167 ++++++++++++++++++++-------------
 src/backend/executor/nodeMergeAppend.c |   3 +
 src/backend/executor/nodeNestloop.c    |  13 +++
 src/backend/executor/nodeSeqscan.c     |  16 ++++
 src/include/executor/executor.h        |   2 +
 src/include/executor/nodeGather.h      |   1 +
 src/include/executor/nodeSeqscan.h     |   1 +
 src/include/nodes/execnodes.h          |   2 +
 11 files changed, 167 insertions(+), 66 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..32b7bc3 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
 	if (use_parallel_mode)
 		EnterParallelMode();
 
+	ExecStartNode(planstate);
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..2107ced 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,30 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * ExecStartNode - execute registered early-startup callbacks
+ */
+bool
+ExecStartNode(PlanState *node)
+{
+	if (node == NULL)
+		return false;
+
+	switch (nodeTag(node))
+	{
+	case T_GatherState:
+		return ExecStartGather((GatherState *)node);
+		break;
+	case T_SeqScanState:
+		return ExecStartSeqScan((SeqScanState *)node);
+		break;
+	default:
+		break;	
+	}
+
+	return planstate_tree_walker(node, ExecStartNode, NULL);
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async-execition for children */
+		eflags |= EXEC_FLAG_ASYNC;
 		appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..097f4bb 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -46,6 +46,88 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
 
+/* ----------------------------------------------------------------
+ *		StartGather
+ *
+ *		Gather node can have an advantage from asynchronous execution in most
+ *		cases because of its startup cost.
+ *		----------------------------------------------------------------
+ */
+bool
+ExecStartGather(GatherState *node)
+{
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
+	TupleTableSlot *fslot = node->funnel_slot;
+	int i;
+
+	/* Don't start if already started or explicitly inhibited by the upper */
+	if (node->initialized || !node->early_start)
+		return false;
+
+	/*
+	 * Initialize the parallel context and workers on first execution. We do
+	 * this on first execution rather than during node initialization, as it
+	 * needs to allocate large dynamic segment, so it is better to do if it
+	 * is really needed.
+	 */
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
+	{
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
+
+		/*
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
+		 */
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
+
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+			for (i = 0; i < pcxt->nworkers; ++i)
+			{
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
+			}
+		}
+
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
+	}
+
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	node->early_start = false;
+	node->initialized = true;
+	return false;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -58,6 +140,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	Plan	   *outerNode;
 	bool		hasoid;
 	TupleDesc	tupDesc;
+	int			child_eflags;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -97,7 +180,12 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * now initialize outer plan
 	 */
 	outerNode = outerPlan(node);
-	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+	/*
+	 * This outer plan is executed in another process so don't start
+	 * asynchronously in this process
+	 */
+	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
@@ -115,6 +203,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
 	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+	/*
+	 * Register asynchronous execution callback for this node. Backend workers
+	 * needs to allocate large dynamic segment, and it is better to execute
+	 * them at the time of first execution from this aspect. So asynchronous
+	 * execution should be decided considering that but we omit the aspect for
+	 * now.
+	 */
+	if (eflags & EXEC_FLAG_ASYNC)
+		gatherstate->early_start = true;
+
 	return gatherstate;
 }
 
@@ -128,77 +226,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecGather(GatherState *node)
 {
-	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
 	TupleTableSlot *slot;
 	TupleTableSlot *resultSlot;
 	ExprDoneCond isDone;
 	ExprContext *econtext;
 
-	/*
-	 * Initialize the parallel context and workers on first execution. We do
-	 * this on first execution rather than during node initialization, as it
-	 * needs to allocate large dynamic segment, so it is better to do if it
-	 * is really needed.
-	 */
+	/* Initialize workers if not yet. */
 	if (!node->initialized)
-	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
-
-		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
-		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
-
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
-
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
-			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
-			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
-		}
-
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-		node->initialized = true;
-	}
+		ExecStartGather(node);
 
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async execution for now */
+		eflags = eflags | EXEC_FLAG_ASYNC;
+
 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	 * inner child, because it will always be rescanned with fresh parameter
 	 * values.
 	 */
+
+	/*
+	 * async execution of outer plan is benetifical if this join is requested
+	 * as async
+	 */
 	outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
 	if (node->nestParams == NIL)
 		eflags |= EXEC_FLAG_REWIND;
 	else
 		eflags &= ~EXEC_FLAG_REWIND;
+
+	/*
+	 * Async execution of the inner is inhibited if parameterized by the
+	 * outer
+	 */
+	if (list_length(node->nestParams) > 0)
+		eflags &= ~ EXEC_FLAG_ASYNC;
+
 	innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..3ee678d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,18 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
  * ----------------------------------------------------------------
  */
 
+bool
+ExecStartSeqScan(SeqScanState *node)
+{
+	if (node->early_start)
+	{
+		elog(LOG, "dummy_async_cb is called for %p", node);
+		node->early_start = false;
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		SeqNext
  *
@@ -214,6 +226,10 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&scanstate->ss.ps);
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
+	/*  Do early-start when requested */
+	if (eflags & EXEC_FLAG_ASYNC)
+		scanstate->early_start = true;
+
 	return scanstate;
 }
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..3d13217 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
 #define EXEC_FLAG_WITH_OIDS		0x0020	/* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS	0x0040	/* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA	0x0080	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0100	/* request asynchronous execution */
 
 
 /*
@@ -224,6 +225,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
 extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
+extern bool ExecStartNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
 
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
index f76d9be..0a48a03 100644
--- a/src/include/executor/nodeGather.h
+++ b/src/include/executor/nodeGather.h
@@ -18,6 +18,7 @@
 
 extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecGather(GatherState *node);
+extern bool ExecStartGather(GatherState *node);
 extern void ExecEndGather(GatherState *node);
 extern void ExecShutdownGather(GatherState *node);
 extern void ExecReScanGather(GatherState *node);
diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h
index f2e61ff..daf54ac 100644
--- a/src/include/executor/nodeSeqscan.h
+++ b/src/include/executor/nodeSeqscan.h
@@ -19,6 +19,7 @@
 
 extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecSeqScan(SeqScanState *node);
+extern bool ExecStartSeqScan(SeqScanState *node);
 extern void ExecEndSeqScan(SeqScanState *node);
 extern void ExecReScanSeqScan(SeqScanState *node);
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..4ffc2a8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1257,6 +1257,7 @@ typedef struct SeqScanState
 {
 	ScanState	ss;				/* its first field is NodeTag */
 	Size		pscan_len;		/* size of parallel heap scan descriptor */
+	bool		early_start;
 } SeqScanState;
 
 /* ----------------
@@ -1968,6 +1969,7 @@ typedef struct UniqueState
 typedef struct GatherState
 {
 	PlanState	ps;				/* its first field is NodeTag */
+	bool		early_start;
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
 	int			nreaders;
-- 
1.8.3.1

>From ee2e01b8edfec09584822f94663e9bb2e03b7e95 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing

Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.

As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.

Conflicts:
	src/backend/executor/nodeGather.c
	src/backend/executor/nodeSeqscan.c
	src/include/nodes/execnodes.h
---
 src/backend/executor/execProcnode.c |  6 ++++
 src/backend/executor/nodeAppend.c   | 64 ++++++++++++++++++++++---------------
 src/backend/executor/nodeSeqscan.c  |  8 ++++-
 src/include/nodes/execnodes.h       | 11 +++++++
 4 files changed, 62 insertions(+), 27 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 2107ced..5398ca0 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStartNode(node->instrument);
 
+	node->state->exec_status = EXEC_READY;
+
 	switch (nodeTag(node))
 	{
 			/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
+	if (TupIsNull(result) &&
+		node->state->exec_status == EXEC_READY)
+		node->state->exec_status = EXEC_EOT;
+
 	return result;
 }
 
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 {
 	AppendState *appendstate = makeNode(AppendState);
 	PlanState **appendplanstates;
+	bool	   *stopped;
 	int			nplans;
 	int			i;
 	ListCell   *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	nplans = list_length(node->appendplans);
 
 	appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+	stopped = (bool *) palloc0(nplans * sizeof(bool));
 
 	/*
 	 * create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
 	appendstate->appendplans = appendplanstates;
+	appendstate->stopped = stopped;
 	appendstate->as_nplans = nplans;
 
 	/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
-	for (;;)
+	bool		all_eot = false;
+	EState	   *estate = node->ps.state;
+	TupleTableSlot *result;
+
+	/*!!!! This node currently works only for monotonic-forwarding scan */
+	while (!all_eot)
 	{
 		PlanState  *subnode;
-		TupleTableSlot *result;
+		int i;
 
-		/*
-		 * figure out which subplan we are currently processing
-		 */
-		subnode = node->appendplans[node->as_whichplan];
+		all_eot = true;
+		/* Scan the children in registered order. */
+		for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+		{
+			if (node->stopped[i])
+				continue;
 
-		/*
-		 * get a tuple from the subplan
-		 */
-		result = ExecProcNode(subnode);
+			subnode = node->appendplans[i];
+
+			result = ExecProcNode(subnode);
 
-		if (!TupIsNull(result))
-		{
 			/*
 			 * If the subplan gave us something then return it as-is. We do
 			 * NOT make use of the result slot that was set up in
 			 * ExecInitAppend; there's no need for it.
 			 */
-			return result;
+			switch (estate->exec_status)
+			{
+			case  EXEC_READY:
+				return result;
+
+			case  EXEC_NOT_READY:
+				all_eot = false;
+				break;
+
+			case EXEC_EOT:
+				node->stopped[i] = true;
+				break;
+
+			default:
+				elog(ERROR, "Unkown node status: %d", estate->exec_status);
+			}				
 		}
 
-		/*
-		 * Go on to the "next" subplan in the appropriate direction. If no
-		 * more subplans, return the empty slot set up for us by
-		 * ExecInitAppend.
-		 */
-		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
-		else
-			node->as_whichplan--;
-		if (!exec_append_initialize_next(node))
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-		/* Else loop back and try to get a tuple from the new subplan */
+		/* XXXXX: some waiting measure is needed to wait new tuple */
 	}
+
+	return NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 3ee678d..61916bf 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -136,6 +136,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
+	/* Make the caller wait by some probability */
+	if (random() < RAND_MAX / 10)
+	{
+		node->ss.ps.state->exec_status = EXEC_NOT_READY;
+		return NULL;
+	}
+
 	return ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
 					(ExecScanRecheckMtd) SeqRecheck);
@@ -166,7 +173,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 	ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitSeqScan
  * ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ffc2a8..45c6fba 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,14 @@ typedef struct ResultRelInfo
 	List	   *ri_onConflictSetWhere;
 } ResultRelInfo;
 
+/* Enum for async awareness */
+typedef enum NodeStatus
+{
+	EXEC_NOT_READY,
+	EXEC_READY,
+	EXEC_EOT
+} NodeStatus;
+
 /* ----------------
  *	  EState information
  *
@@ -419,6 +427,8 @@ typedef struct EState
 	HeapTuple  *es_epqTuple;	/* array of EPQ substitute tuples */
 	bool	   *es_epqTupleSet; /* true if EPQ tuple is provided */
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+	NodeStatus	exec_status;
 } EState;
 
 
@@ -1147,6 +1157,7 @@ typedef struct AppendState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
+	bool	   *stopped;
 	int			as_nplans;
 	int			as_whichplan;
 } AppendState;
-- 
1.8.3.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to