Hello,

I put some consideration and trial on callbacks as a means to
async(early)-execution.

> > Suppose we equip each EState with the ability to fire "callbacks".
> > Callbacks have the signature:
> > 
> > typedef bool (*ExecCallback)(PlanState *planstate, TupleTableSlot
> > *slot, void *context);
> > 
> > Executor nodes can register immediate callbacks to be run at the
> > earliest possible opportunity using a function like
> > ExecRegisterCallback(estate, callback, planstate, slot, context).
> > They can registered deferred callbacks that will be called when a file
> > descriptor becomes ready for I/O, or when the process latch is set,
> > using a call like ExecRegisterFileCallback(estate, fd, event,
> > callback, planstate, slot, context) or
> > ExecRegisterLatchCallback(estate, callback, planstate, slot, context).

I considered on this. The immediate callbacks seems fine but
using latch or fds to signal tuple availability doesn't seem to
fit callbacks stored in estate. They are deferrable until
parent's tuple request and such kind of events can be handled at
the time as ExecGather does now. However some kind of
synchronize/waiting mechanism like latch or select() is needed
anyway.

> > To execute callbacks, an executor node can call ExecFireCallbacks(),
> > which will fire immediate callbacks in order of registration, and wait
> > for the file descriptors for which callbacks have been registered and
> > for the process latch when no immediate callbacks remain but there are
> > still deferred callbacks.  It will return when (1) there are no
> > remaining immediate or deferred callbacks or (2) one of the callbacks
> > returns "true".
> 
> Excellent! I unconsciously excluded the case of callbacks because
> I supposed (without certain ground) all executor nodes can have a
> chance to win from this. Such callback is a good choice to do
> what Start*Node did in the lastest patch.

The previous code added a large amount of garbage, which was the
mechanism of async-execution including additional code for
ExecStartNode phase in the same manner to ExecProcNode and
ExecEndNode. Most of the additional code is totally useless for
most of the types of node.

Callback is usable for not-so-common invoked-for-a-event-at-once
operations such like error-handling. For this case, the
operations can be asynch-execution of a node and the event can be
just before ExecProcNode on the topmost node. The first patch
attached allows async-capable nodes to register callbacks on Init
phase and executes them just before Exec phase on the topmost
node. It grately reduces the additional code as the result. My
first impression from the word "callbacks" is this.

The following operation yields LOG messages from dummy callback
with this patch.

CREATE TABLE t1 (a int, b int);
INSERT INTO t1 (SELECT a, 1 FROM generate_series(0, 99) a);
CREATE TABLE t2 (a int, b int);
INSERT INTO t2 (SELECT a, 2 FROM generate_series(0, 99) a);
CREATE TABLE t3 (a int, b int);
INSERT INTO t3 (SELECT a, 3 FROM generate_series(0, 99) a);
SELECT * FROM t1 UNION ALL SELECT * FROM t2 UNION ALL SELECT * FROM t3;
===
LOG:  dummy_async_cb is called for 0x2783a98
LOG:  dummy_async_cb is called for 0x2784248
LOG:  dummy_async_cb is called for 0x2784ad0

What my previous patch could do is doable by this first patch
with far less diffs.

If this design is not bad, I'll do postgres_fdw part.


Next is discussion about async tuple fetching.

> > Then, suppose we add a function bool ExecStartAsync(PlanState *target,
> > ExecCallback callback, PlanState *cb_planstate, void *cb_context).
> > For non-async-aware plan nodes, this just returns false.  async-aware
> > plan nodes should initiate some work, register some callbacks, and
> > return.  The callback that get registered should arrange in turn to
> > register the callback passed as an argument when a tuple becomes
> > available, passing the planstate and context provided by
> > ExecStartAsync's caller, plus the TupleTableSlot containing the tuple.
> 
> Although I don't imagine clearly about the case of
> async-aware-nodes under non-aware-nodes, it seems to have a high
> affinity with (true) parallel execution framework.

The ExecStartAsync is similar to ExecStartNode of my old
patch. One of the most annoying things of that is that it needs
to walk down to their descendents and in turn it needs garbageous
corresponding additional codes for all type of nodes which can
have children.

Instead, in the second patch, I modified ExecProcNode to return
async status in EState. It will be EXEC_READY or EXEC_EOT(End of
table/No more tuple?) for non-async-capable nodes and
async-capable nodes can set it EXEC_NOT_READY, which indicates
that there could be more tuple but not available yet.

Async-aware nodes such as Append can go to the next child if the
predecessor returned EXEC_NOT_READY. If all !EXEC_EOT nodes
returned EXEC_NOT_READY, Append will wait using some signaling
mechanism (it runs busily now instead.). As an example, the
second patch modifies ExecAppend to handle it and modified
ExecSeqScan to return EXEC_NOT_READY by certain probability as an
emulation of asynchronous tuple fetching. The UNION ALL query
above returns results stirred among the tree tables as the result.

> > So, in response to ExecStartAsync, if there's no tuple currently
> > available, postgres_fdw can send a query to the remote server and
> > request a callback when the fd becomes ready-ready.  It must save the
> > callback passed to ExecStartAsync inside the PlanState someplace so
> > that when a tuple becomes available it can register that callback.
> > 
> > ExecAppend can call ExecStartAsync on each of its subplans.  For any
> > subplan where ExecStartAsync returns false, ExecAppend will just
> > execute it normally, by calling ExecProcNode repeatedly until no more
> > tuples are returned.  But for async-capable subplans, it can call
> > ExecStartAsync on all of them, and then call ExecFireCallbacks.  The
> > tuple-ready callback it passes to its child plans will take the tuple
> > provided by the child plan and store it into the Append node's slot.
> > It will then return true if, and only if, ExecFireCallbacks is being
> > invoked from ExecAppend (which it can figure out via some kind of
> > signalling either through its own PlanState or centralized signalling
> > through the EState).  That way, if ExecAppend were itself invoked
> > asynchronously, its tuple-ready callback could simply populate a slot
> > appropriately register its invoker's tuple-ready callback.  Whether
> > called synchronously or asynchronously, each invocation of as
> > asynchronous append after the first would just need to again
> > ExecStartAsync on the child that last returned a tuple.
> 
> Thanks for the attentive explanation. My concern about this is
> that the latency by synchronizing one by one for every tuple
> between the producer and the consumer. My previous patch is not
> asynchronous on every tuple so it can give a pure gain without
> loss from tuple-wise synchronization. But it looks clean and I
> like it so I'll consider this.
> 
> > It seems pretty straightforward to fit Gather into this infrastructure.
>
> Yes.

If Gather's children become a regular node struct with a name
like Worker(Node), instead of non-Node structure as it is now, we
can generalize the tuple-synchronization mecanism so that it can
be used by other nodes such as ForeginScan. Append(ForegnScan,
ForegnScan,...) with async tuple passing can average multiple
foreign servers so I suppose that it is preferable if no penalty
exists.

> > It is unclear to me how useful this is beyond ForeignScan, Gather, and
> > Append.  MergeAppend's ordering constraint makes it less useful; we
> > can asynchronously kick off the request for the next tuple before
> > returning the previous one, but we're going to need to have that tuple
> > before we can return the next one.  But it could be done.  It could
> > potentially even be applied to seq scans or index scans using some set
> > of asynchronous I/O interfaces, but I don't see how it could be
> > applied to joins or aggregates, which typically can't really proceed
> > until they get the next tuple.  They could be plugged into this
> > interface easily enough but it would only help to the extent that it
> > enabled asynchrony elsewhere in the plan tree to be pulled up towards
> > the root.
> 
> This is mainly not an argument on "asynchronous execution/start"
> but "asynchronous tuple-passing". As I showed before, a merge
> join on asynchronous and parallel children running sort *can* win
> over a hash join (if planner foresees that). If asynchronous
> tuple-passing is not so effective like MergeAppend, we can
> simplly refrain from doing that. But cost modeling for it is a
> difficult problem.
> 
> > Thoughts?
> 
> I'll try the callback framework and in-process asynchronous
> tuple-passing (like select(2)). Please wait for a while.

Finally, the two patches attached became somewhat different from
Robert's suggestion and lacks of synchronization feature. However
if this way to is not so bad, I'll build the feature on this way.

Suggestions? Thoughts?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 87e5c9eb6f230b9682fe300bc1592cb9f4fcadb5 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 09:37:25 +0900
Subject: [PATCH 1/2] PoC: Async start callback for executor.

This patch allows async-capable nodes to register callbacks to run the
node before ExecProcNode(). eflags has new bit EXEC_FLAG_ASYNC to
request asynchronous execution to children on ExecInit phase.

As an example, nodeSeqscan registers dummy callback if requested, and
nodeAppend unconditionally requests to its children. So a plan
Append(SeqScan, SeqScan) runs the callback and yields LOG messages.
---
 src/backend/executor/execMain.c        |   2 +
 src/backend/executor/execProcnode.c    |   9 ++
 src/backend/executor/execUtils.c       |  39 ++++++++
 src/backend/executor/nodeAppend.c      |   2 +
 src/backend/executor/nodeGather.c      | 166 ++++++++++++++++++++-------------
 src/backend/executor/nodeMergeAppend.c |   3 +
 src/backend/executor/nodeNestloop.c    |  13 +++
 src/backend/executor/nodeSeqscan.c     |   9 ++
 src/include/executor/executor.h        |   2 +
 src/include/nodes/execnodes.h          |  23 ++++-
 src/include/nodes/plannodes.h          |   1 -
 11 files changed, 202 insertions(+), 67 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 76f7297..7fe188a 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1552,6 +1552,8 @@ ExecutePlan(EState *estate,
 	if (use_parallel_mode)
 		EnterParallelMode();
 
+	AsyncStartNode(planstate);
+
 	/*
 	 * Loop until we've processed the proper number of tuples from the plan.
 	 */
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a31dbc9..df9e533 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -786,6 +786,15 @@ ExecEndNode(PlanState *node)
 }
 
 /*
+ * AsyncStartNode - execute registered early-startup callbacks
+ */
+void
+AsyncStartNode(PlanState *node)
+{
+	RunAsyncCallbacks(node->state->es_async_cb_list);
+}
+
+/*
  * ExecShutdownNode
  *
  * Give execution nodes a chance to stop asynchronous resource consumption
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index e937cf8..0627772 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -964,3 +964,42 @@ ShutdownExprContext(ExprContext *econtext, bool isCommit)
 
 	MemoryContextSwitchTo(oldcontext);
 }
+
+/*
+ * Register a async startup callback to EState.
+ *
+ * The callbacks are executed from the first of the list and this function
+ * puts the callbacks in registered order. This is not necessary if they are
+ * truely asynchronous and independent but the ordering is safer if some of
+ * them have an execution order in back.
+ */
+void
+RegisterAsyncCallback(EState *estate, AsyncStartCallback func, PlanState *node,
+					  int eflags)
+{
+	AsyncStartListItem *elem = palloc(sizeof(AsyncStartListItem));
+	elem->cbfunc = func;
+	elem->node = node;
+
+	if (eflags & EXEC_FLAG_ASYNC)
+		estate->es_async_cb_list =
+			lappend(estate->es_async_cb_list, elem);
+}
+
+/*
+ * Run callbacks in the list
+ */
+void
+RunAsyncCallbacks(List *list)
+{
+	ListCell *lc;
+
+	foreach (lc, list)
+	{
+		AsyncStartListItem *cb = (AsyncStartListItem *) lfirst(lc);
+
+		cb->cbfunc(cb->node);
+	}
+
+	return;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..d10364c 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -165,6 +165,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async-execition for children */
+		eflags |= EXEC_FLAG_ASYNC;
 		appendplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 16c981b..3f9b8b0 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,90 @@
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
+static bool StartGather(PlanState *psnode);
 
+/* ----------------------------------------------------------------
+ *		StartGather
+ *
+ *		Gather node can have an advantage from asynchronous execution in most
+ *		cases because of its startup cost.
+ *		----------------------------------------------------------------
+ */
+static bool
+StartGather(PlanState *psnode)
+{
+	GatherState   *node = (GatherState *)psnode;
+	EState	   *estate = node->ps.state;
+	Gather	   *gather = (Gather *) node->ps.plan;
+	TupleTableSlot *fslot = node->funnel_slot;
+	int i;
+
+	/* Don't start if already started or explicitly inhibited by the upper */
+	if (node->initialized)
+		return false;
+
+	/*
+	 * Initialize the parallel context and workers on first execution. We do
+	 * this on first execution rather than during node initialization, as it
+	 * needs to allocate large dynamic segment, so it is better to do if it
+	 * is really needed.
+	 */
+
+	/*
+	 * Sometimes we might have to run without parallelism; but if
+	 * parallel mode is active then we can try to fire up some workers.
+	 */
+	if (gather->num_workers > 0 && IsInParallelMode())
+	{
+		ParallelContext *pcxt;
+		bool	got_any_worker = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		if (!node->pei)
+			node->pei = ExecInitParallelPlan(node->ps.lefttree,
+											 estate,
+											 gather->num_workers);
+
+		/*
+		 * Register backend workers. We might not get as many as we
+		 * requested, or indeed any at all.
+		 */
+		pcxt = node->pei->pcxt;
+		LaunchParallelWorkers(pcxt);
+
+		/* Set up tuple queue readers to read the results. */
+		if (pcxt->nworkers > 0)
+		{
+			node->nreaders = 0;
+			node->reader =
+				palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+			for (i = 0; i < pcxt->nworkers; ++i)
+			{
+				if (pcxt->worker[i].bgwhandle == NULL)
+					continue;
+
+				shm_mq_set_handle(node->pei->tqueue[i],
+								  pcxt->worker[i].bgwhandle);
+				node->reader[node->nreaders++] =
+					CreateTupleQueueReader(node->pei->tqueue[i],
+										   fslot->tts_tupleDescriptor);
+				got_any_worker = true;
+			}
+		}
+
+		/* No workers?  Then never mind. */
+		if (!got_any_worker)
+			ExecShutdownGatherWorkers(node);
+	}
+
+	/* Run plan locally if no workers or not single-copy. */
+	node->need_to_scan_locally = (node->reader == NULL)
+		|| !gather->single_copy;
+
+	node->initialized = true;
+	return true;
+}
 
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -58,6 +141,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	Plan	   *outerNode;
 	bool		hasoid;
 	TupleDesc	tupDesc;
+	int			child_eflags;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -97,6 +181,11 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * now initialize outer plan
 	 */
 	outerNode = outerPlan(node);
+	/*
+	 * This outer plan is executed in another process so don't start
+	 * asynchronously in this process
+	 */
+	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
 	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
@@ -115,6 +204,16 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
 	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
 
+	/*
+	 * Register asynchronous execution callback for this node. Backend workers
+	 * needs to allocate large dynamic segment, and it is better to execute
+	 * them at the time of first execution from this aspect. So asynchronous
+	 * execution should be decided considering that but we omit the aspect for
+	 * now.
+	 */
+	RegisterAsyncCallback(estate, StartGather, (PlanState *)gatherstate,
+						  eflags);
+
 	return gatherstate;
 }
 
@@ -128,77 +227,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecGather(GatherState *node)
 {
-	TupleTableSlot *fslot = node->funnel_slot;
-	int			i;
 	TupleTableSlot *slot;
 	TupleTableSlot *resultSlot;
 	ExprDoneCond isDone;
 	ExprContext *econtext;
 
-	/*
-	 * Initialize the parallel context and workers on first execution. We do
-	 * this on first execution rather than during node initialization, as it
-	 * needs to allocate large dynamic segment, so it is better to do if it
-	 * is really needed.
-	 */
+	/* Initialize workers if not yet. */
 	if (!node->initialized)
-	{
-		EState	   *estate = node->ps.state;
-		Gather	   *gather = (Gather *) node->ps.plan;
-
-		/*
-		 * Sometimes we might have to run without parallelism; but if
-		 * parallel mode is active then we can try to fire up some workers.
-		 */
-		if (gather->num_workers > 0 && IsInParallelMode())
-		{
-			ParallelContext *pcxt;
-			bool	got_any_worker = false;
-
-			/* Initialize the workers required to execute Gather node. */
-			if (!node->pei)
-				node->pei = ExecInitParallelPlan(node->ps.lefttree,
-												 estate,
-												 gather->num_workers);
-
-			/*
-			 * Register backend workers. We might not get as many as we
-			 * requested, or indeed any at all.
-			 */
-			pcxt = node->pei->pcxt;
-			LaunchParallelWorkers(pcxt);
-
-			/* Set up tuple queue readers to read the results. */
-			if (pcxt->nworkers > 0)
-			{
-				node->nreaders = 0;
-				node->reader =
-					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
-
-				for (i = 0; i < pcxt->nworkers; ++i)
-				{
-					if (pcxt->worker[i].bgwhandle == NULL)
-						continue;
-
-					shm_mq_set_handle(node->pei->tqueue[i],
-									  pcxt->worker[i].bgwhandle);
-					node->reader[node->nreaders++] =
-						CreateTupleQueueReader(node->pei->tqueue[i],
-											   fslot->tts_tupleDescriptor);
-					got_any_worker = true;
-				}
-			}
-
-			/* No workers?  Then never mind. */
-			if (!got_any_worker)
-				ExecShutdownGatherWorkers(node);
-		}
-
-		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->reader == NULL)
-			|| !gather->single_copy;
-		node->initialized = true;
-	}
+		StartGather((PlanState *)node);
 
 	/*
 	 * Check to see if we're still projecting out tuples from a previous scan
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..65ef13b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -112,6 +112,9 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	{
 		Plan	   *initNode = (Plan *) lfirst(lc);
 
+		/* always request async execution for now */
+		eflags = eflags | EXEC_FLAG_ASYNC;
+
 		mergeplanstates[i] = ExecInitNode(initNode, estate, eflags);
 		i++;
 	}
diff --git a/src/backend/executor/nodeNestloop.c b/src/backend/executor/nodeNestloop.c
index 555fa09..16c317c 100644
--- a/src/backend/executor/nodeNestloop.c
+++ b/src/backend/executor/nodeNestloop.c
@@ -340,11 +340,24 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
 	 * inner child, because it will always be rescanned with fresh parameter
 	 * values.
 	 */
+
+	/*
+	 * async execution of outer plan is benetifical if this join is requested
+	 * as async
+	 */
 	outerPlanState(nlstate) = ExecInitNode(outerPlan(node), estate, eflags);
 	if (node->nestParams == NIL)
 		eflags |= EXEC_FLAG_REWIND;
 	else
 		eflags &= ~EXEC_FLAG_REWIND;
+
+	/*
+	 * Async execution of the inner is inhibited if parameterized by the
+	 * outer
+	 */
+	if (list_length(node->nestParams) > 0)
+		eflags &= ~ EXEC_FLAG_ASYNC;
+
 	innerPlanState(nlstate) = ExecInitNode(innerPlan(node), estate, eflags);
 
 	/*
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f12921d..2ae598d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -39,6 +39,12 @@ static TupleTableSlot *SeqNext(SeqScanState *node);
  * ----------------------------------------------------------------
  */
 
+static void
+dummy_async_cb(PlanState *ps)
+{
+	elog(LOG, "dummy_async_cb is called for %p", ps);
+}
+
 /* ----------------------------------------------------------------
  *		SeqNext
  *
@@ -214,6 +220,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&scanstate->ss.ps);
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
+	/*  Register dummy async callback if requested */
+	RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+
 	return scanstate;
 }
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1a44085..b1a17eb 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -62,6 +62,7 @@
 #define EXEC_FLAG_WITH_OIDS		0x0020	/* force OIDs in returned tuples */
 #define EXEC_FLAG_WITHOUT_OIDS	0x0040	/* force no OIDs in returned tuples */
 #define EXEC_FLAG_WITH_NO_DATA	0x0080	/* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC			0x0100	/* request asynchronous execution */
 
 
 /*
@@ -225,6 +226,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
+extern void AsyncStartNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
 
 /*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..1e8936c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -343,6 +343,17 @@ typedef struct ResultRelInfo
 	List	   *ri_onConflictSetWhere;
 } ResultRelInfo;
 
+/* ---------------
+ *  Struct and enum for async-execution
+ */
+typedef struct PlanState PlanState;
+typedef void (*AsyncStartCallback)(PlanState *node);
+typedef struct AsyncStartListItem
+{
+	AsyncStartCallback		cbfunc;	/* the callback function  */
+	PlanState			   *node;	/* parameter to give the callback */
+} AsyncStartListItem;
+
 /* ----------------
  *	  EState information
  *
@@ -419,9 +430,19 @@ typedef struct EState
 	HeapTuple  *es_epqTuple;	/* array of EPQ substitute tuples */
 	bool	   *es_epqTupleSet; /* true if EPQ tuple is provided */
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
-} EState;
 
+	/*
+	 * Early-start callback list. These functions are executed just before
+	 * ExecProcNode of the top-node.
+	 */
+	List	*es_async_cb_list;
+	List	*es_private_async_cb_list;
+} EState;
 
+/* in execUtils.c */
+void RegisterAsyncCallback(EState *estate, AsyncStartCallback func,
+						   PlanState *node, int eflags);
+void RunAsyncCallbacks(List *list);
 /*
  * ExecRowMark -
  *	   runtime representation of FOR [KEY] UPDATE/SHARE clauses
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index e823c83..cbd58cb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -79,7 +79,6 @@ typedef struct PlannedStmt
 #define exec_subplan_get_plan(plannedstmt, subplan) \
 	((Plan *) list_nth((plannedstmt)->subplans, (subplan)->plan_id - 1))
 
-
 /* ----------------
  *		Plan node
  *
-- 
1.8.3.1

>From 8a65bfc57897d7be07d9bb3506550c50cf99b957 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Jan 2016 16:47:31 +0900
Subject: [PATCH 2/2] PoC: Example implement of asynchronous tuple passing

Aside from early node execution, tuples from multiple children of a
node can be received asynchronously. This patch makes ExecProcNode to
return the third status EXEC_NOT_READY using estate addition to that
previously returned via result. It means that the node may have more
tuple to return but not available for the time.

As an example, this patch also modifies nodeSeqscan to return
EXEC_NOT_READY by certain probability and nodeAppend skips to the next
child if it is returned.
---
 src/backend/executor/execProcnode.c |  6 ++++
 src/backend/executor/nodeAppend.c   | 64 ++++++++++++++++++++++---------------
 src/backend/executor/nodeGather.c   | 10 +++---
 src/backend/executor/nodeSeqscan.c  | 11 +++++--
 src/include/nodes/execnodes.h       | 13 ++++++++
 5 files changed, 71 insertions(+), 33 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index df9e533..febc41a 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -383,6 +383,8 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStartNode(node->instrument);
 
+	node->state->exec_status = EXEC_READY;
+
 	switch (nodeTag(node))
 	{
 			/*
@@ -540,6 +542,10 @@ ExecProcNode(PlanState *node)
 	if (node->instrument)
 		InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
+	if (TupIsNull(result) &&
+		node->state->exec_status == EXEC_READY)
+		node->state->exec_status = EXEC_EOT;
+
 	return result;
 }
 
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index d10364c..6ba13e9 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -121,6 +121,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 {
 	AppendState *appendstate = makeNode(AppendState);
 	PlanState **appendplanstates;
+	bool	   *stopped;
 	int			nplans;
 	int			i;
 	ListCell   *lc;
@@ -134,6 +135,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	nplans = list_length(node->appendplans);
 
 	appendplanstates = (PlanState **) palloc0(nplans * sizeof(PlanState *));
+	stopped = (bool *) palloc0(nplans * sizeof(bool));
 
 	/*
 	 * create new AppendState for our append node
@@ -141,6 +143,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.plan = (Plan *) node;
 	appendstate->ps.state = estate;
 	appendstate->appendplans = appendplanstates;
+	appendstate->stopped = stopped;
 	appendstate->as_nplans = nplans;
 
 	/*
@@ -195,45 +198,54 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
-	for (;;)
+	bool		all_eot = false;
+	EState	   *estate = node->ps.state;
+	TupleTableSlot *result;
+
+	/*!!!! This node currently works only for monotonic-forwarding scan */
+	while (!all_eot)
 	{
 		PlanState  *subnode;
-		TupleTableSlot *result;
+		int i;
 
-		/*
-		 * figure out which subplan we are currently processing
-		 */
-		subnode = node->appendplans[node->as_whichplan];
+		all_eot = true;
+		/* Scan the children in registered order. */
+		for (i = node->as_whichplan ; i < node->as_nplans ; i++)
+		{
+			if (node->stopped[i])
+				continue;
 
-		/*
-		 * get a tuple from the subplan
-		 */
-		result = ExecProcNode(subnode);
+			subnode = node->appendplans[i];
+
+			result = ExecProcNode(subnode);
 
-		if (!TupIsNull(result))
-		{
 			/*
 			 * If the subplan gave us something then return it as-is. We do
 			 * NOT make use of the result slot that was set up in
 			 * ExecInitAppend; there's no need for it.
 			 */
-			return result;
+			switch (estate->exec_status)
+			{
+			case  EXEC_READY:
+				return result;
+
+			case  EXEC_NOT_READY:
+				all_eot = false;
+				break;
+
+			case EXEC_EOT:
+				node->stopped[i] = true;
+				break;
+
+			default:
+				elog(ERROR, "Unkown node status: %d", estate->exec_status);
+			}				
 		}
 
-		/*
-		 * Go on to the "next" subplan in the appropriate direction. If no
-		 * more subplans, return the empty slot set up for us by
-		 * ExecInitAppend.
-		 */
-		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
-		else
-			node->as_whichplan--;
-		if (!exec_append_initialize_next(node))
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
-		/* Else loop back and try to get a tuple from the new subplan */
+		/* XXXXX: some waiting measure is needed to wait new tuple */
 	}
+
+	return NULL;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 3f9b8b0..1b990b4 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -45,7 +45,7 @@
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
-static bool StartGather(PlanState *psnode);
+static void StartGather(PlanState *psnode);
 
 /* ----------------------------------------------------------------
  *		StartGather
@@ -54,7 +54,7 @@ static bool StartGather(PlanState *psnode);
  *		cases because of its startup cost.
  *		----------------------------------------------------------------
  */
-static bool
+static void
 StartGather(PlanState *psnode)
 {
 	GatherState   *node = (GatherState *)psnode;
@@ -65,7 +65,7 @@ StartGather(PlanState *psnode)
 
 	/* Don't start if already started or explicitly inhibited by the upper */
 	if (node->initialized)
-		return false;
+		return;
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -127,7 +127,7 @@ StartGather(PlanState *psnode)
 		|| !gather->single_copy;
 
 	node->initialized = true;
-	return true;
+	return;
 }
 
 /* ----------------------------------------------------------------
@@ -186,7 +186,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	 * asynchronously in this process
 	 */
 	child_eflags = eflags & ~EXEC_FLAG_ASYNC;
-	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, child_eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 2ae598d..f345d8c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -130,6 +130,13 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
+	/* Make the caller wait by some probability */
+	if (random() < RAND_MAX / 10)
+	{
+		node->ss.ps.state->exec_status = EXEC_NOT_READY;
+		return NULL;
+	}
+
 	return ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
 					(ExecScanRecheckMtd) SeqRecheck);
@@ -160,7 +167,6 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags)
 	ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
 }
 
-
 /* ----------------------------------------------------------------
  *		ExecInitSeqScan
  * ----------------------------------------------------------------
@@ -221,7 +227,8 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
 	ExecAssignScanProjectionInfo(&scanstate->ss);
 
 	/*  Register dummy async callback if requested */
-	RegisterAsyncCallback(estate, dummy_async_cb, scanstate, eflags);
+	RegisterAsyncCallback(estate, dummy_async_cb,
+						  (PlanState *)scanstate, eflags);
 
 	return scanstate;
 }
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1e8936c..714178a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -354,6 +354,15 @@ typedef struct AsyncStartListItem
 	PlanState			   *node;	/* parameter to give the callback */
 } AsyncStartListItem;
 
+/* Enum for the return of AsyncExecNode */
+typedef enum NodeStatus
+{
+	EXEC_NOT_READY,
+	EXEC_READY,
+	EXEC_EOT
+} NodeStatus;
+
+
 /* ----------------
  *	  EState information
  *
@@ -437,6 +446,8 @@ typedef struct EState
 	 */
 	List	*es_async_cb_list;
 	List	*es_private_async_cb_list;
+
+	NodeStatus exec_status;
 } EState;
 
 /* in execUtils.c */
@@ -1078,6 +1089,7 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	bool		ps_async_tuple;	/* tuple is passed semi-asynchronously */
 } PlanState;
 
 /* ----------------
@@ -1168,6 +1180,7 @@ typedef struct AppendState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
+	bool	   *stopped;
 	int			as_nplans;
 	int			as_whichplan;
 } AppendState;
-- 
1.8.3.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to