This is the rebased version on the current master(-0004), and
added resowner stuff (0005) and unlikely(0006).

At Tue, 18 Oct 2016 10:30:51 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20161018.103051.30820907.horiguchi.kyot...@lab.ntt.co.jp>
> > > - Errors in the executor can leak the WaitEventSet.  Probably we need
> > > to modify ResourceOwners to be able to own WaitEventSets.
> 
> WaitEventSet itself is not leaked but epoll-fd should be closed
> at failure. This seems doable with TRY-CATCHing in
> ExecAsyncEventLoop. (not yet)

Haha, that's a silly talk. The wait event can continue to live
when timeout and any error can happen on the way after the
that. I added an entry for wait event set to resource owner and
hang ones created in ExecAsyncEventWait to
TopTransactionResourceOwner. Currently WaitLatchOrSocket doesn't
do so not to change the current behavior. WaitEventSet doesn't
have usable identifier for resowner.c so currently I use the
address(pointer value) for the purpose. The patch 0005 does that.

> I measured performance and had the following result.
> 
> t0  - SELECT sum(a) FROM <local single table>;
> pl  - SELECT sum(a) FROM <4 local children>;
> pf0 - SELECT sum(a) FROM <4 foreign children on single connection>;
> pf1 - SELECT sum(a) FROM <4 foreign children on dedicate connections>;
> 
> The result is written as "time<ms> (std dev <ms>)"
> 
> sync
>   t0: 3820.33 (  1.88)
>   pl: 1608.59 ( 12.06)
>  pf0: 7928.29 ( 46.58)
>  pf1: 8023.16 ( 26.43)
> 
> async
>   t0: 3806.31 (  4.49)    0.4% faster (should be error)
>   pl: 1629.17 (  0.29)    1.3% slower
>  pf0: 6447.07 ( 25.19)   18.7% faster
>  pf1: 1876.80 ( 47.13)   76.6% faster
> 
> t0 is not affected since the ExecProcNode stuff has gone.
> 
> pl is getting a bit slower. (almost the same to simple seqscan of
> the previous patch) This should be a misprediction penalty.

Using likely macro for ExecAppend, and it seems to have shaken
off the degradation.

sync
  t0: 3919.49 (  5.95)
  pl: 1637.95 (  0.75)
 pf0: 8304.20 ( 43.94)
 pf1: 8222.09 ( 28.20)

async
  t0: 3885.84 ( 40.20)  0.86% faster (should be error but stable on my env..)
  pl: 1617.20 (  3.51)  1.26% faster (ditto)
 pf0: 6680.95 (478.72)  19.5% faster
 pf1: 1886.87 ( 36.25)  77.1% faster

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 25eba7e506228ab087e8b743efb039286a8251c4 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 12 Oct 2016 12:46:10 +0900
Subject: [PATCH 1/6] robert's 2nd framework

---
 contrib/postgres_fdw/postgres_fdw.c     |  49 ++++
 src/backend/executor/Makefile           |   4 +-
 src/backend/executor/README             |  43 +++
 src/backend/executor/execAmi.c          |   5 +
 src/backend/executor/execAsync.c        | 462 ++++++++++++++++++++++++++++++++
 src/backend/executor/nodeAppend.c       | 162 ++++++++++-
 src/backend/executor/nodeForeignscan.c  |  49 ++++
 src/backend/nodes/copyfuncs.c           |   1 +
 src/backend/nodes/outfuncs.c            |   1 +
 src/backend/nodes/readfuncs.c           |   1 +
 src/backend/optimizer/plan/createplan.c |  45 +++-
 src/include/executor/execAsync.h        |  29 ++
 src/include/executor/nodeAppend.h       |   3 +
 src/include/executor/nodeForeignscan.h  |   7 +
 src/include/foreign/fdwapi.h            |  15 ++
 src/include/nodes/execnodes.h           |  57 +++-
 src/include/nodes/plannodes.h           |   1 +
 17 files changed, 909 insertions(+), 25 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 906d6e6..c480945 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -19,6 +19,7 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -349,6 +350,14 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 							 UpperRelationKind stage,
 							 RelOptInfo *input_rel,
 							 RelOptInfo *output_rel);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(EState *estate,
+							PendingAsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(EState *estate,
+								  PendingAsyncRequest *areq,
+								  bool reinit);
+static void postgresForeignAsyncNotify(EState *estate,
+						   PendingAsyncRequest *areq);
 
 /*
  * Helper functions
@@ -468,6 +477,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	/* Support functions for upper relation push-down */
 	routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+	/* Support functions for async execution */
+	routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+	routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+	routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+	routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
 	PG_RETURN_POINTER(routine);
 }
 
@@ -4442,6 +4457,40 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
 	/* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+	return true;
+}
+
+/*
+ * XXX. Just for testing purposes, let's run everything through the async
+ * mechanism but return tuples synchronously.
+ */
+static void
+postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	TupleTableSlot *slot;
+
+	Assert(IsA(node, ForeignScanState));
+	slot = postgresIterateForeignScan(node);
+	ExecAsyncRequestDone(estate, areq, (Node *) slot);
+}
+
+static void
+postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
+								  bool reinit)
+{
+	elog(ERROR, "postgresForeignAsyncConfigureWait");
+}
+
+static void
+postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq)
+{
+	elog(ERROR, "postgresForeignAsyncNotify");
+}
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 51edd4c..0675b01 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,8 +12,8 @@ subdir = src/backend/executor
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
-       execMain.o execParallel.o execProcnode.o execQual.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execGrouping.o execIndexing.o \
+       execJunk.o execMain.o execParallel.o execProcnode.o execQual.o \
        execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
diff --git a/src/backend/executor/README b/src/backend/executor/README
index f1d1e4c..1dee3db 100644
--- a/src/backend/executor/README
+++ b/src/backend/executor/README
@@ -199,3 +199,46 @@ query returning the same set of scan tuples multiple times.  Likewise,
 SRFs are disallowed in an UPDATE's targetlist.  There, they would have the
 effect of the same row being updated multiple times, which is not very
 useful --- and updates after the first would have no effect anyway.
+
+Asynchronous Execution
+----------------------
+
+In certain cases, it's desirable for a node to indicate that it cannot
+return any tuple immediately but may be able to do at a later time.  This
+might either because the node is waiting on an event external to the
+database system, such as a ForeignScan awaiting network I/O, or because
+the node is waiting for an event internal to the database system - e.g.
+one process involved in a parallel query may find that it cannot progress
+a certain parallel operation until some other process reaches a certain
+point in the computation.  A process which discovers this type of situation
+can always handle it simply by blocking, but this may waste time that could
+be spent executing some other part of the plan where progress could be
+made immediately.  This is particularly likely to occur when the plan
+contains an Append node.
+
+To use asynchronous execution, a node must first request a tuple from an
+async-capable child node using ExecAsyncRequest.  Next, it must execute
+the asynchronous event loop using ExecAsyncEventLoop; it can avoid giving
+up control indefinitely by passing a timeout to this function, even passing
+-1 to poll for events without blocking.  Eventually, when a node to which an
+asynchronous request has been made produces a tuple, the requesting node
+will receive a callback from the event loop via ExecAsyncResponse. Typically,
+the ExecAsyncResponse callback is the only one required for nodes that wish
+to request tuples asynchronously.
+
+On the other hand, nodes that wish to produce tuples asynchronously
+generally need to implement three methods:
+
+1. When an asynchronous request is made, the node's ExecAsyncRequest callback
+will be invoked; it should use ExecAsyncSetRequiredEvents to indicate the
+number of file descriptor events for which it wishes to wait and whether it
+wishes to receive a callback when the process latch is set. Alternatively,
+it can instead use ExecAsyncRequestDone if a result is available immediately.
+
+2. When the event loop wishes to wait or poll for file descriptor events and
+the process latch, the ExecAsyncConfigureWait callback is invoked to configure
+the file descriptor wait events for which the node wishes to wait.  This
+callback isn't needed if the node only cares about the process latch.
+
+3. When file descriptors or the process latch become ready, the node's
+ExecAsyncNotify callback is invoked.
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 2587ef7..9fcc4e4 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -464,11 +464,16 @@ ExecSupportsBackwardScan(Plan *node)
 			{
 				ListCell   *l;
 
+				/* With async, tuples may be interleaved, so can't back up. */
+				if (((Append *) node)->nasyncplans != 0)
+					return false;
+
 				foreach(l, ((Append *) node)->appendplans)
 				{
 					if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
 						return false;
 				}
+
 				/* need not check tlist because Append doesn't evaluate it */
 				return true;
 			}
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000..5858bb5
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,462 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *	  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "storage/latch.h"
+
+static bool ExecAsyncEventWait(EState *estate, long timeout);
+static void ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
+	bool reinit);
+static void ExecAsyncNotify(EState *estate, PendingAsyncRequest *areq);
+static void ExecAsyncResponse(EState *estate, PendingAsyncRequest *areq);
+
+#define EVENT_BUFFER_SIZE	16
+
+/*
+ * Asynchronously request a tuple from a designed async-aware node.
+ *
+ * requestor is the node that wants the tuple; requestee is the node from
+ * which it wants the tuple.  request_index is an arbitrary integer specified
+ * by the requestor which will be available at the time the requestor receives
+ * the tuple.  This is useful if the requestor has multiple children and
+ * needs an easy way to figure out which one is delivering a tuple.
+ */
+void
+ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
+				 PlanState *requestee)
+{
+	PendingAsyncRequest *areq = NULL;
+	int		i = estate->es_num_pending_async;
+
+	/*
+	 * If the number of pending asynchronous nodes exceeds the number of
+	 * available slots in the es_pending_async array, expand the array.
+	 * We start with 16 slots, and thereafter double the array size each
+	 * time we run out of slots.
+	 */
+	if (i >= estate->es_max_pending_async)
+	{
+		int	newmax;
+
+		newmax = estate->es_max_pending_async * 2;
+		if (estate->es_max_pending_async == 0)
+		{
+			newmax = 16;
+			estate->es_pending_async =
+				MemoryContextAllocZero(estate->es_query_cxt,
+								   newmax * sizeof(PendingAsyncRequest *));
+		}
+		else
+		{
+			int	newentries = newmax - estate->es_max_pending_async;
+
+			estate->es_pending_async =
+				repalloc(estate->es_pending_async,
+						 newmax * sizeof(PendingAsyncRequest *));
+			MemSet(&estate->es_pending_async[estate->es_max_pending_async],
+				   0, newentries * sizeof(PendingAsyncRequest *));
+		}
+		estate->es_max_pending_async = newmax;
+	}
+
+	/*
+	 * To avoid unnecessary palloc traffic, we reuse a previously-allocated
+	 * PendingAsyncRequest if there is one.  If not, we must allocate a new
+	 * one.
+	 */
+	if (estate->es_pending_async[i] == NULL)
+	{
+		areq = MemoryContextAllocZero(estate->es_query_cxt,
+									  sizeof(PendingAsyncRequest));
+		estate->es_pending_async[i] = areq;
+	}
+	else
+	{
+		areq = estate->es_pending_async[i];
+		MemSet(areq, 0, sizeof(PendingAsyncRequest));
+	}
+	areq->myindex = estate->es_num_pending_async++;
+
+	/* Initialize the new request. */
+	areq->requestor = requestor;
+	areq->request_index = request_index;
+	areq->requestee = requestee;
+
+	/* Give the requestee a chance to do whatever it wants. */
+	switch (nodeTag(requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanRequest(estate, areq);
+			break;
+		default:
+			/* If requestee doesn't support async, caller messed up. */
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(requestee));
+	}
+}
+
+/*
+ * Execute the main loop until the timeout expires or a result is delivered
+ * to the requestor.
+ *
+ * If the timeout is -1, there is no timeout; wait indefinitely until a
+ * result is ready for requestor.  If the timeout is 0, do not block, but
+ * poll for events and fire callbacks for as long as we can do so without
+ * blocking.  If timeout is greater than 0, block for at most the number
+ * of milliseconds indicated by the timeout.
+ *
+ * Returns true if a result was delivered to the requestor.  A return value
+ * of false indicates that the timeout was reached without delivering a
+ * result to the requestor.
+ */
+bool
+ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
+{
+	instr_time start_time;
+	long cur_timeout = timeout;
+	bool	requestor_done = false;
+
+	Assert(requestor != NULL);
+
+	/*
+	 * If we plan to wait - but not indefinitely - we need to record the
+	 * current time.
+	 */
+	if (timeout > 0)
+		INSTR_TIME_SET_CURRENT(start_time);
+
+	/* Main event loop: poll for events, deliver notifications. */
+	for (;;)
+	{
+		int		i;
+		bool	any_node_done = false;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Check for events, but don't block if there notifications that
+		 * have not been delivered yet.
+		 */
+		if (estate->es_async_callback_pending > 0)
+			ExecAsyncEventWait(estate, 0);
+		else if (!ExecAsyncEventWait(estate, cur_timeout))
+			cur_timeout = 0;			/* Timeout was reached. */
+		else
+		{
+			instr_time      cur_time;
+			long            cur_timeout = -1;
+
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout < 0)
+				cur_timeout = 0;
+		}
+
+		/* Deliver notifications. */
+		for (i = 0; i < estate->es_num_pending_async; ++i)
+		{
+			PendingAsyncRequest *areq = estate->es_pending_async[i];
+
+			/* Skip it if no callback is pending. */
+			if (!areq->callback_pending)
+				continue;
+
+			/*
+			 * Mark it as no longer needing a callback.  We must do this
+			 * before dispatching the callback in case the callback resets
+			 * the flag.
+			 */
+			areq->callback_pending = false;
+			estate->es_async_callback_pending--;
+
+			/* Perform the actual callback; set request_done if appropraite. */
+			if (!areq->request_complete)
+				ExecAsyncNotify(estate, areq);
+			else
+			{
+				any_node_done = true;
+				if (requestor == areq->requestor)
+					requestor_done = true;
+				ExecAsyncResponse(estate, areq);
+			}
+		}
+
+		/* If any node completed, compact the array. */
+		if (any_node_done)
+		{
+			int		hidx = 0,
+					tidx;
+
+			/*
+			 * Swap all non-yet-completed items to the start of the array.
+			 * Keep them in the same order.
+			 */
+			for (tidx = 0; tidx < estate->es_num_pending_async; ++tidx)
+			{
+				PendingAsyncRequest *head;
+				PendingAsyncRequest *tail = estate->es_pending_async[tidx];
+
+				if (!tail->callback_pending && tail->request_complete)
+					continue;
+				head = estate->es_pending_async[hidx];
+				estate->es_pending_async[tidx] = head;
+				estate->es_pending_async[hidx] = tail;
+				++hidx;
+			}
+			estate->es_num_pending_async = hidx;
+		}
+
+		/*
+		 * We only consider exiting the loop when no notifications are
+		 * pending.  Otherwise, each call to this function might advance
+		 * the computation by only a very small amount; to the contrary,
+		 * we want to push it forward as far as possible.
+		 */
+		if (estate->es_async_callback_pending == 0)
+		{
+			/* If requestor is ready, exit. */
+			if (requestor_done)
+				return true;
+			/* If timeout was 0 or has expired, exit. */
+			if (cur_timeout == 0)
+				return false;
+		}
+	}
+}
+
+/*
+ * Wait or poll for events.  As with ExecAsyncEventLoop, a timeout of -1
+ * means wait forever, 0 means don't wait at all, and >0 means wait for the
+ * indicated number of milliseconds.
+ *
+ * Returns true if we found some events and false if we timed out.
+ */
+static bool
+ExecAsyncEventWait(EState *estate, long timeout)
+{
+	WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+	int		noccurred;
+	int		i;
+	int		n;
+	bool	reinit = false;
+	bool	process_latch_set = false;
+
+	if (estate->es_wait_event_set == NULL)
+	{
+		/*
+		 * Allow for a few extra events without reinitializing.  It
+		 * doesn't seem worth the complexity of doing anything very
+		 * aggressive here, because plans that depend on massive numbers
+		 * of external FDs are likely to run afoul of kernel limits anyway.
+		 */
+		estate->es_allocated_fd_events = estate->es_total_fd_events + 16;
+		estate->es_wait_event_set =
+			CreateWaitEventSet(estate->es_query_cxt,
+							   estate->es_allocated_fd_events + 1);
+		AddWaitEventToSet(estate->es_wait_event_set,
+						  WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+		reinit = true;
+	}
+
+	/* Give each waiting node a chance to add or modify events. */
+	for (i = 0; i < estate->es_num_pending_async; ++i)
+	{
+		PendingAsyncRequest *areq = estate->es_pending_async[i];
+
+		if (areq->num_fd_events > 0)
+			ExecAsyncConfigureWait(estate, areq, reinit);
+	}
+
+	/* Wait for at least one event to occur. */
+	noccurred = WaitEventSetWait(estate->es_wait_event_set, timeout,
+								 occurred_event, EVENT_BUFFER_SIZE);
+	if (noccurred == 0)
+		return false;
+
+	/*
+	 * Loop over the occurred events and set the callback_pending flags
+	 * for the appropriate requests.  The waiting nodes should have
+	 * registered their wait events with user_data pointing back to the
+	 * PendingAsyncRequest, but the process latch needs special handling.
+	 */
+	for (n = 0; n < noccurred; ++n)
+	{
+		WaitEvent  *w = &occurred_event[n];
+
+		if ((w->events & WL_LATCH_SET) != 0)
+		{
+			process_latch_set = true;
+			continue;
+		}
+
+		if ((w->events & (WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE)) != 0)
+		{
+			PendingAsyncRequest *areq = w->user_data;
+
+			if (!areq->callback_pending)
+			{
+				Assert(!areq->request_complete);
+				areq->callback_pending = true;
+				estate->es_async_callback_pending++;
+			}
+		}
+	}
+
+	/*
+	 * If the process latch got set, we must schedule a callback for every
+	 * requestee that cares about it.
+	 */
+	if (process_latch_set)
+	{
+		for (i = 0; i < estate->es_num_pending_async; ++i)
+		{
+			PendingAsyncRequest *areq = estate->es_pending_async[i];
+
+			if (areq->wants_process_latch)
+			{
+				Assert(!areq->request_complete);
+				areq->callback_pending = true;
+			}
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor
+ * events for which it wishes to wait.  We expect the node-type specific
+ * callback to make one or more calls of the following form:
+ *
+ * AddWaitEventToSet(es->es_wait_event_set, events, fd, NULL, areq);
+ *
+ * The events should include only WL_SOCKET_READABLE or WL_SOCKET_WRITEABLE,
+ * and the number of calls should not exceed areq->num_fd_events (as
+ * prevously set via ExecAsyncSetRequiredEvents).
+ */
+static void
+ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
+					   bool reinit)
+{
+	switch (nodeTag(areq->requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanConfigureWait(estate, areq, reinit);
+			break;
+		default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(areq->requestee));
+	}
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+static void
+ExecAsyncNotify(EState *estate, PendingAsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestee))
+	{
+		case T_ForeignScanState:
+			ExecAsyncForeignScanNotify(estate, areq);
+			break;
+		default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(areq->requestee));
+	}
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+static void
+ExecAsyncResponse(EState *estate, PendingAsyncRequest *areq)
+{
+	switch (nodeTag(areq->requestor))
+	{
+		case T_AppendState:
+			ExecAsyncAppendResponse(estate, areq);
+			break;
+		default:
+			elog(ERROR, "unrecognized node type: %d",
+				(int) nodeTag(areq->requestor));
+	}
+}
+
+/*
+ * An executor node should call this function to signal that it needs to wait
+ * on one or more file descriptor events that can be registered on a
+ * WaitEventSet, and possibly also on the process latch.  num_fd_events
+ * should be the maximum number of file descriptor events that it will wish to
+ * register.  force_reset should be true if the node can't reuse the
+ * WaitEventSet it most recently initialized, for example because it needs to
+ * drop a wait event from the set.
+ */
+void
+ExecAsyncSetRequiredEvents(EState *estate, PendingAsyncRequest *areq,
+	int num_fd_events, bool wants_process_latch,
+	bool force_reset)
+{
+	estate->es_total_fd_events += num_fd_events - areq->num_fd_events;
+	areq->num_fd_events = num_fd_events;
+	areq->wants_process_latch = wants_process_latch;
+
+	if (force_reset && estate->es_wait_event_set != NULL)
+	{
+		FreeWaitEventSet(estate->es_wait_event_set);
+		estate->es_wait_event_set = NULL;
+	}
+}
+
+/*
+ * An async-capable node should call this function to deliver the tuple to
+ * the node which requested it.  The node can call this from its
+ * ExecAsyncRequest callback if the requested tuple is available immediately,
+ * or at a later time from its ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(EState *estate, PendingAsyncRequest *areq, Node *result)
+{
+	/*
+	 * Since the request is complete, the requestee is no longer allowed
+	 * to wait for any events.  Note that this forces a rebuild of
+	 * es_wait_event_set every time a process that was previously waiting
+	 * stops doing so.  It might be possible to defer that decision until
+	 * we actually wait again, because it's quite possible that a new
+	 * request will be made of the same node before any wait actually
+	 * happens.  However, we have to balance the cost of rebuilding the
+	 * WaitEventSet against the additional overhead of tracking which nodes
+	 * need a callback to remove registered wait events.  It's not clear
+	 * that we would come out ahead, so use brute force for now.
+	 */
+	if (areq->num_fd_events > 0 || areq->wants_process_latch)
+		ExecAsyncSetRequiredEvents(estate, areq, 0, false, true);
+
+	/* Save result and mark request as complete. */
+	areq->result = result;
+	areq->request_complete = true;
+
+	/* Make sure this request is flagged for a callback. */
+	if (!areq->callback_pending)
+	{
+		areq->callback_pending = true;
+		estate->es_async_callback_pending++;
+	}
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index a26bd63..bb06569 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -58,6 +58,7 @@
 #include "postgres.h"
 
 #include "executor/execdebug.h"
+#include "executor/execAsync.h"
 #include "executor/nodeAppend.h"
 
 static bool exec_append_initialize_next(AppendState *appendstate);
@@ -79,16 +80,21 @@ exec_append_initialize_next(AppendState *appendstate)
 	/*
 	 * get information from the append node
 	 */
-	whichplan = appendstate->as_whichplan;
+	whichplan = appendstate->as_whichsyncplan;
 
-	if (whichplan < 0)
+	/*
+	 * This routine is only responsible for setting up for nodes being scanned
+	 * synchronously, so the first node we can scan is given by nasyncplans
+	 * and the last is given by as_nplans - 1.
+	 */
+	if (whichplan < appendstate->as_nasyncplans)
 	{
 		/*
 		 * if scanning in reverse, we start at the last scan in the list and
 		 * then proceed back to the first.. in any case we inform ExecAppend
 		 * that we are at the end of the line by returning FALSE
 		 */
-		appendstate->as_whichplan = 0;
+		appendstate->as_whichsyncplan = appendstate->as_nasyncplans;
 		return FALSE;
 	}
 	else if (whichplan >= appendstate->as_nplans)
@@ -96,7 +102,7 @@ exec_append_initialize_next(AppendState *appendstate)
 		/*
 		 * as above, end the scan if we go beyond the last scan in our list..
 		 */
-		appendstate->as_whichplan = appendstate->as_nplans - 1;
+		appendstate->as_whichsyncplan = appendstate->as_nplans - 1;
 		return FALSE;
 	}
 	else
@@ -142,6 +148,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.state = estate;
 	appendstate->appendplans = appendplanstates;
 	appendstate->as_nplans = nplans;
+	appendstate->as_nasyncplans = node->nasyncplans;
+	appendstate->as_syncdone = (node->nasyncplans == nplans);
+	appendstate->as_asyncresult = (TupleTableSlot **)
+		palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+	/* initially, all async requests need a request */
+	for (i = 0; i < appendstate->as_nasyncplans; ++i)
+		appendstate->as_needrequest =
+			bms_add_member(appendstate->as_needrequest, i);
 
 	/*
 	 * Miscellaneous initialization
@@ -176,9 +191,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 	appendstate->ps.ps_ProjInfo = NULL;
 
 	/*
-	 * initialize to scan first subplan
+	 * initialize to scan first synchronous subplan
 	 */
-	appendstate->as_whichplan = 0;
+	appendstate->as_whichsyncplan = appendstate->as_nasyncplans;
 	exec_append_initialize_next(appendstate);
 
 	return appendstate;
@@ -193,15 +208,78 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
+	if (node->as_nasyncplans > 0)
+	{
+		EState *estate = node->ps.state;
+		int	i;
+
+		/*
+		 * If there are any asynchronously-generated results that have
+		 * not yet been returned, return one of them.
+		 */
+		if (node->as_nasyncresult > 0)
+		{
+			--node->as_nasyncresult;
+			return node->as_asyncresult[node->as_nasyncresult];
+		}
+
+		/*
+		 * If there are any nodes that need a new asynchronous request,
+		 * make all of them.
+		 */
+		while ((i = bms_first_member(node->as_needrequest)) >= 0)
+		{
+			ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]);
+			node->as_nasyncpending++;
+		}
+	}
+
 	for (;;)
 	{
 		PlanState  *subnode;
 		TupleTableSlot *result;
 
 		/*
-		 * figure out which subplan we are currently processing
+		 * if we have async requests outstanding, run the event loop
 		 */
-		subnode = node->appendplans[node->as_whichplan];
+		if (node->as_nasyncpending > 0)
+		{
+			long	timeout = node->as_syncdone ? -1 : 0;
+
+			for (;;)
+			{
+				if (node->as_nasyncpending == 0)
+				{
+					/*
+					 * If there is no asynchronous activity still pending
+					 * and the synchronous activity is also complete, we're
+					 * totally done scanning this node.  Otherwise, we're
+					 * done with the asynchronous stuff but must continue
+					 * scanning the synchronous children.
+					 */
+					if (node->as_syncdone)
+						return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+					break;
+				}
+				if (!ExecAsyncEventLoop(node->ps.state, &node->ps, timeout))
+				{
+					/* Timeout reached. */
+					break;
+				}
+				if (node->as_nasyncresult > 0)
+				{
+					/* Asynchronous subplan returned a tuple! */
+					--node->as_nasyncresult;
+					return node->as_asyncresult[node->as_nasyncresult];
+				}
+			}
+		}
+
+		/*
+		 * figure out which synchronous subplan we are currently processing
+		 */
+		Assert(!node->as_syncdone);
+		subnode = node->appendplans[node->as_whichsyncplan];
 
 		/*
 		 * get a tuple from the subplan
@@ -221,14 +299,21 @@ ExecAppend(AppendState *node)
 		/*
 		 * Go on to the "next" subplan in the appropriate direction. If no
 		 * more subplans, return the empty slot set up for us by
-		 * ExecInitAppend.
+		 * ExecInitAppend, unless there are async plans we have yet to finish.
 		 */
 		if (ScanDirectionIsForward(node->ps.state->es_direction))
-			node->as_whichplan++;
+			node->as_whichsyncplan++;
 		else
-			node->as_whichplan--;
+			node->as_whichsyncplan--;
 		if (!exec_append_initialize_next(node))
-			return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+		{
+			node->as_syncdone = true;
+			if (node->as_nasyncpending == 0)
+			{
+				Assert(bms_is_empty(node->as_needrequest));
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+			}
+		}
 
 		/* Else loop back and try to get a tuple from the new subplan */
 	}
@@ -267,6 +352,16 @@ ExecReScanAppend(AppendState *node)
 {
 	int			i;
 
+	/*
+	 * XXX. Cancel outstanding asynchronous tuple requests here! (How?)
+	 */
+
+	/* Reset async state. */
+	for (i = 0; i < node->as_nasyncplans; ++i)
+		node->as_needrequest = bms_add_member(node->as_needrequest, i);
+	node->as_nasyncresult = 0;
+	node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
 	for (i = 0; i < node->as_nplans; i++)
 	{
 		PlanState  *subnode = node->appendplans[i];
@@ -285,6 +380,47 @@ ExecReScanAppend(AppendState *node)
 		if (subnode->chgParam == NULL)
 			ExecReScan(subnode);
 	}
-	node->as_whichplan = 0;
+	node->as_whichsyncplan = node->as_nasyncplans;
 	exec_append_initialize_next(node);
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncAppendResponse
+ *
+ *		Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(EState *estate, PendingAsyncRequest *areq)
+{
+	AppendState *node = (AppendState *) areq->requestor;
+	TupleTableSlot *slot;
+
+	/* We shouldn't be called until the request is complete. */
+	Assert(areq->request_complete);
+
+	/* Our result slot shouldn't already be occupied. */
+	Assert(TupIsNull(node->ps.ps_ResultTupleSlot));
+
+	/* Result should be a TupleTableSlot or NULL. */
+	slot = (TupleTableSlot *) areq->result;
+	Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+	/* Request is no longer pending. */
+	Assert(node->as_nasyncpending > 0);
+	--node->as_nasyncpending;
+
+	/* If the result is NULL or an empty slot, there's nothing more to do. */
+	if (TupIsNull(slot))
+		return;
+
+	/* Save result so we can return it. */
+	Assert(node->as_nasyncresult < node->as_nasyncplans);
+	node->as_asyncresult[node->as_nasyncresult++] = slot;
+
+	/*
+	 * Mark the node that returned a result as ready for a new request.  We
+	 * don't launch another one here immediately because it might compelte
+	 */
+	bms_add_member(node->as_needrequest, areq->request_index);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index d886aaf..85d436f 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -355,3 +355,52 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc)
 		fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate);
 	}
 }
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanRequest
+ *
+ *		Initiate an asynchronous request
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(EState *estate, PendingAsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncRequest != NULL);
+	fdwroutine->ForeignAsyncRequest(estate, areq);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanConfigureWait
+ *
+ *		In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(EState *estate,
+	PendingAsyncRequest *areq, bool reinit)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+	fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecAsyncForeignScanNotify
+ *
+ *		Event loop callback
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(EState *estate, PendingAsyncRequest *areq)
+{
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	FdwRoutine *fdwroutine = node->fdwroutine;
+
+	Assert(fdwroutine->ForeignAsyncNotify != NULL);
+	fdwroutine->ForeignAsyncNotify(estate, areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 71714bc..23b4e18 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -218,6 +218,7 @@ _copyAppend(const Append *from)
 	 * copy remainder of node
 	 */
 	COPY_NODE_FIELD(appendplans);
+	COPY_SCALAR_FIELD(nasyncplans);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index ae86954..dc5b938 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -359,6 +359,7 @@ _outAppend(StringInfo str, const Append *node)
 	_outPlanInfo(str, (const Plan *) node);
 
 	WRITE_NODE_FIELD(appendplans);
+	WRITE_INT_FIELD(nasyncplans);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 917e6c8..69453b5 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1519,6 +1519,7 @@ _readAppend(void)
 	ReadCommonPlan(&local_node->plan);
 
 	READ_NODE_FIELD(appendplans);
+	READ_INT_FIELD(nasyncplans);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index ad49674..7caa8d3 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -193,7 +193,7 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual,
 			 Index scanrelid, int ctePlanId, int cteParam);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
 				   Index scanrelid, int wtParam);
-static Append *make_append(List *appendplans, List *tlist);
+static Append *make_append(List *asyncplans, int nasyncplans, List *tlist);
 static RecursiveUnion *make_recursive_union(List *tlist,
 					 Plan *lefttree,
 					 Plan *righttree,
@@ -270,6 +270,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
 				 List *resultRelations, List *subplans,
 				 List *withCheckOptionLists, List *returningLists,
 				 List *rowMarks, OnConflictExpr *onconflict, int epqParam);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -955,8 +956,10 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 {
 	Append	   *plan;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
-	List	   *subplans = NIL;
+	List	   *asyncplans = NIL;
+	List	   *syncplans = NIL;
 	ListCell   *subpaths;
+	int			nasyncplans = 0;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -991,7 +994,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		/* Must insist that all children return the same tlist */
 		subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST);
 
-		subplans = lappend(subplans, subplan);
+		/* Classify as async-capable or not */
+		if (is_async_capable_path(subpath))
+		{
+			asyncplans = lappend(asyncplans, subplan);
+			++nasyncplans;
+		}
+		else
+			syncplans = lappend(syncplans, subplan);
 	}
 
 	/*
@@ -1001,7 +1011,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	 * parent-rel Vars it'll be asked to emit.
 	 */
 
-	plan = make_append(subplans, tlist);
+	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans, tlist);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -4941,7 +4951,7 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, List *tlist)
+make_append(List *appendplans, int nasyncplans, List *tlist)
 {
 	Append	   *node = makeNode(Append);
 	Plan	   *plan = &node->plan;
@@ -4951,6 +4961,7 @@ make_append(List *appendplans, List *tlist)
 	plan->lefttree = NULL;
 	plan->righttree = NULL;
 	node->appendplans = appendplans;
+	node->nasyncplans = nasyncplans;
 
 	return node;
 }
@@ -6225,3 +6236,27 @@ is_projection_capable_plan(Plan *plan)
 	}
 	return true;
 }
+
+/*
+ * is_projection_capable_path
+ *		Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+	switch (nodeTag(path))
+	{
+		case T_ForeignPath:
+			{
+				FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+				Assert(fdwroutine != NULL);
+				if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+					fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+					return true;
+			}
+		default:
+			break;
+	}
+	return false;
+}
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000..2abc32d
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,29 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ *		Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(EState *estate, PlanState *requestor,
+		int request_index, PlanState *requestee);
+extern bool ExecAsyncEventLoop(EState *estate, PlanState *requestor,
+				long timeout);
+
+extern void ExecAsyncSetRequiredEvents(EState *estate,
+	PendingAsyncRequest *areq, int num_fd_events,
+	bool wants_process_latch, bool force_reset);
+extern void ExecAsyncRequestDone(EState *estate,
+	PendingAsyncRequest *areq, Node *result);
+
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index 51c381e..81a079d 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -21,4 +21,7 @@ extern TupleTableSlot *ExecAppend(AppendState *node);
 extern void ExecEndAppend(AppendState *node);
 extern void ExecReScanAppend(AppendState *node);
 
+extern void ExecAsyncAppendResponse(EState *estate,
+	PendingAsyncRequest *areq);
+
 #endif   /* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 0cdec4e..3e69ab0 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -29,4 +29,11 @@ extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 								shm_toc *toc);
 
+extern void ExecAsyncForeignScanRequest(EState *estate,
+	PendingAsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(EState *estate,
+	PendingAsyncRequest *areq, bool reinit);
+extern void ExecAsyncForeignScanNotify(EState *estate,
+	PendingAsyncRequest *areq);
+
 #endif   /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e1b0d0d..88feb9a 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -155,6 +155,15 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 															 RelOptInfo *rel,
 														 RangeTblEntry *rte);
 
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef void (*ForeignAsyncRequest_function) (EState *estate,
+											PendingAsyncRequest *areq);
+typedef void (*ForeignAsyncConfigureWait_function) (EState *estate,
+											PendingAsyncRequest *areq,
+											bool reinit);
+typedef void (*ForeignAsyncNotify_function) (EState *estate,
+											PendingAsyncRequest *areq);
+
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
@@ -224,6 +233,12 @@ typedef struct FdwRoutine
 	EstimateDSMForeignScan_function EstimateDSMForeignScan;
 	InitializeDSMForeignScan_function InitializeDSMForeignScan;
 	InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+	/* Support functions for asynchronous execution */
+	IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+	ForeignAsyncRequest_function ForeignAsyncRequest;
+	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+	ForeignAsyncNotify_function ForeignAsyncNotify;
 } FdwRoutine;
 
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f6f73f3..b50b41c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -347,6 +347,25 @@ typedef struct ResultRelInfo
 } ResultRelInfo;
 
 /* ----------------
+ *	  PendingAsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct PendingAsyncRequest
+{
+	int			myindex;			/* Index in es_pending_async. */
+	struct PlanState *requestor;	/* Node that wants a tuple. */
+	struct PlanState *requestee;	/* Node from which a tuple is wanted. */
+	int			request_index;	/* Scratch space for requestor. */
+	int			num_fd_events;	/* Max number of FD events requestee needs. */
+	bool		wants_process_latch;	/* Requestee cares about MyLatch. */
+	bool		callback_pending;			/* Callback is needed. */
+	bool		request_complete;		/* Request complete, result valid. */
+	Node	   *result;			/* Result (NULL if no more tuples). */
+} PendingAsyncRequest;
+
+/* ----------------
  *	  EState information
  *
  * Master working state for an Executor invocation
@@ -422,6 +441,31 @@ typedef struct EState
 	HeapTuple  *es_epqTuple;	/* array of EPQ substitute tuples */
 	bool	   *es_epqTupleSet; /* true if EPQ tuple is provided */
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
+
+	/*
+	 * Support for asynchronous execution.
+	 *
+	 * es_max_pending_async is the allocated size of es_pending_async, and
+	 * es_num_pending_aync is the number of entries that are currently valid.
+	 * (Entries after that may point to storage that can be reused.)
+	 * es_async_callback_pending is the number of PendingAsyncRequests for
+	 * which callback_pending is true.
+	 *
+	 * es_total_fd_events is the total number of FD events needed by all
+	 * pending async nodes, and es_allocated_fd_events is the number any
+	 * current wait event set was allocated to handle.  es_wait_event_set, if
+	 * non-NULL, is a previously allocated event set that may be reusable by a
+	 * future wait provided that nothing's been removed and not too many more
+	 * events have been added.
+	 */
+	int			es_num_pending_async;
+	int			es_max_pending_async;
+	int			es_async_callback_pending;
+	PendingAsyncRequest **es_pending_async;
+
+	int			es_total_fd_events;
+	int			es_allocated_fd_events;
+	struct WaitEventSet *es_wait_event_set;
 } EState;
 
 
@@ -1147,17 +1191,20 @@ typedef struct ModifyTableState
 
 /* ----------------
  *	 AppendState information
- *
- *		nplans			how many plans are in the array
- *		whichplan		which plan is being executed (0 .. n-1)
  * ----------------
  */
 typedef struct AppendState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	PlanState **appendplans;	/* array of PlanStates for my inputs */
-	int			as_nplans;
-	int			as_whichplan;
+	int			as_nplans;		/* total # of children */
+	int			as_nasyncplans;	/* # of async-capable children */
+	int			as_whichsyncplan; /* which sync plan is being executed  */
+	bool		as_syncdone;	/* all synchronous plans done? */
+	Bitmapset  *as_needrequest;	/* async plans needing a new request */
+	TupleTableSlot **as_asyncresult;	/* unreturned results of async plans */
+	int			as_nasyncresult;	/* # of valid entries in as_asyncresult */
+	int			as_nasyncpending;	/* # of outstanding async requests */
 } AppendState;
 
 /* ----------------
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index e2fbc7d..327119b 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -208,6 +208,7 @@ typedef struct Append
 {
 	Plan		plan;
 	List	   *appendplans;
+	int			nasyncplans;	/* # of async plans, always at start of list */
 } Append;
 
 /* ----------------
-- 
2.9.2

>From 4493e6d2d43a5864e9d381cb69270246e0c6234c Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 12 Oct 2016 14:03:53 +0900
Subject: [PATCH 2/6] Fix some bugs.

---
 contrib/postgres_fdw/expected/postgres_fdw.out | 142 ++++++++++++-------------
 contrib/postgres_fdw/postgres_fdw.c            |   3 +-
 src/backend/executor/execAsync.c               |   4 +-
 src/backend/postmaster/pgstat.c                |   3 +
 src/include/pgstat.h                           |   3 +-
 5 files changed, 81 insertions(+), 74 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 88b696c..f9fd172 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6181,12 +6181,12 @@ INSERT INTO b(aa) VALUES('bbbbb');
 SELECT tableoid::regclass, * FROM a;
  tableoid |  aa   
 ----------+-------
- a        | aaa
- a        | aaaa
- a        | aaaaa
  b        | bbb
  b        | bbbb
  b        | bbbbb
+ a        | aaa
+ a        | aaaa
+ a        | aaaaa
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6209,12 +6209,12 @@ UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- a        | aaa
- a        | zzzzzz
- a        | zzzzzz
  b        | bbb
  b        | bbbb
  b        | bbbbb
+ a        | aaa
+ a        | zzzzzz
+ a        | zzzzzz
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6237,12 +6237,12 @@ UPDATE b SET aa = 'new';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- a        | aaa
- a        | zzzzzz
- a        | zzzzzz
  b        | new
  b        | new
  b        | new
+ a        | aaa
+ a        | zzzzzz
+ a        | zzzzzz
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6265,12 +6265,12 @@ UPDATE a SET aa = 'newtoo';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- a        | newtoo
- a        | newtoo
- a        | newtoo
  b        | newtoo
  b        | newtoo
  b        | newtoo
+ a        | newtoo
+ a        | newtoo
+ a        | newtoo
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6329,120 +6329,120 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+                                                       QUERY PLAN                                                       
+------------------------------------------------------------------------------------------------------------------------
  LockRows
-   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
    ->  Hash Join
-         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
-         Hash Cond: (bar.f1 = foo.f1)
+         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+         Hash Cond: (bar2.f1 = foo2.f1)
          ->  Append
-               ->  Seq Scan on public.bar
-                     Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
                      Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
+               ->  Seq Scan on public.bar
+                     Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
                ->  HashAggregate
-                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                     Group Key: foo.f1
+                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+                     Group Key: foo2.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
 (22 rows)
 
 select * from bar where f1 in (select f1 from foo) for update;
  f1 | f2 
 ----+----
-  1 | 11
-  2 | 22
   3 | 33
   4 | 44
+  1 | 11
+  2 | 22
 (4 rows)
 
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+                                                       QUERY PLAN                                                       
+------------------------------------------------------------------------------------------------------------------------
  LockRows
-   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
    ->  Hash Join
-         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
-         Hash Cond: (bar.f1 = foo.f1)
+         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+         Hash Cond: (bar2.f1 = foo2.f1)
          ->  Append
-               ->  Seq Scan on public.bar
-                     Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
                      Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
+               ->  Seq Scan on public.bar
+                     Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
                ->  HashAggregate
-                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                     Group Key: foo.f1
+                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+                     Group Key: foo2.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
 (22 rows)
 
 select * from bar where f1 in (select f1 from foo) for share;
  f1 | f2 
 ----+----
-  1 | 11
-  2 | 22
   3 | 33
   4 | 44
+  1 | 11
+  2 | 22
 (4 rows)
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
-                                         QUERY PLAN                                          
----------------------------------------------------------------------------------------------
+                                               QUERY PLAN                                                
+---------------------------------------------------------------------------------------------------------
  Update on public.bar
    Update on public.bar
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo.ctid, foo.*, foo.tableoid
-         Hash Cond: (bar.f1 = foo.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+         Hash Cond: (bar.f1 = foo2.f1)
          ->  Seq Scan on public.bar
                Output: bar.f1, bar.f2, bar.ctid
          ->  Hash
-               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
                ->  HashAggregate
-                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                     Group Key: foo.f1
+                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+                     Group Key: foo2.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
    ->  Hash Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid
-         Hash Cond: (bar2.f1 = foo.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+         Hash Cond: (bar2.f1 = foo2.f1)
          ->  Foreign Scan on public.bar2
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Hash
-               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
                ->  HashAggregate
-                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
-                     Group Key: foo.f1
+                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+                     Group Key: foo2.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.*, foo.tableoid, foo.f1
 (37 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
@@ -6470,26 +6470,26 @@ where bar.f1 = ss.f1;
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
-         Hash Cond: (foo.f1 = bar.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo2.f1))
+         Hash Cond: (foo2.f1 = bar.f1)
          ->  Append
-               ->  Seq Scan on public.foo
-                     Output: ROW(foo.f1), foo.f1
                ->  Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Seq Scan on public.foo foo_1
-                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
                ->  Foreign Scan on public.foo2 foo2_1
                      Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
+               ->  Seq Scan on public.foo
+                     Output: ROW(foo.f1), foo.f1
+               ->  Seq Scan on public.foo foo_1
+                     Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
          ->  Hash
                Output: bar.f1, bar.f2, bar.ctid
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid
    ->  Merge Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo.f1))
-         Merge Cond: (bar2.f1 = foo.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo2.f1))
+         Merge Cond: (bar2.f1 = foo2.f1)
          ->  Sort
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Sort Key: bar2.f1
@@ -6497,19 +6497,19 @@ where bar.f1 = ss.f1;
                      Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                      Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Sort
-               Output: (ROW(foo.f1)), foo.f1
-               Sort Key: foo.f1
+               Output: (ROW(foo2.f1)), foo2.f1
+               Sort Key: foo2.f1
                ->  Append
-                     ->  Seq Scan on public.foo
-                           Output: ROW(foo.f1), foo.f1
                      ->  Foreign Scan on public.foo2
                            Output: ROW(foo2.f1), foo2.f1
                            Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Seq Scan on public.foo foo_1
-                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
                      ->  Foreign Scan on public.foo2 foo2_1
                            Output: ROW((foo2_1.f1 + 3)), (foo2_1.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
+                     ->  Seq Scan on public.foo
+                           Output: ROW(foo.f1), foo.f1
+                     ->  Seq Scan on public.foo foo_1
+                           Output: ROW((foo_1.f1 + 3)), (foo_1.f1 + 3)
 (45 rows)
 
 update bar set f2 = f2 + 100
@@ -6676,8 +6676,8 @@ update bar set f2 = f2 + 100 returning *;
 update bar set f2 = f2 + 100 returning *;
  f1 | f2  
 ----+-----
-  1 | 311
   2 | 322
+  1 | 311
   6 | 266
   3 | 333
   4 | 344
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c480945..e75f8a1 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -20,6 +20,7 @@
 #include "commands/explain.h"
 #include "commands/vacuum.h"
 #include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -4474,7 +4475,7 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 	TupleTableSlot *slot;
 
 	Assert(IsA(node, ForeignScanState));
-	slot = postgresIterateForeignScan(node);
+	slot = ExecForeignScan(node);
 	ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 5858bb5..e070c26 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -18,6 +18,7 @@
 #include "executor/nodeAppend.h"
 #include "executor/nodeForeignscan.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "storage/latch.h"
 
 static bool ExecAsyncEventWait(EState *estate, long timeout);
@@ -286,7 +287,8 @@ ExecAsyncEventWait(EState *estate, long timeout)
 
 	/* Wait for at least one event to occur. */
 	noccurred = WaitEventSetWait(estate->es_wait_event_set, timeout,
-								 occurred_event, EVENT_BUFFER_SIZE);
+								 occurred_event, EVENT_BUFFER_SIZE,
+								 WAIT_EVENT_ASYNC_WAIT);
 	if (noccurred == 0)
 		return false;
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 5112d6d..558bb8f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3393,6 +3393,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_SYNC_REP:
 			event_name = "SyncRep";
 			break;
+		case WAIT_EVENT_ASYNC_WAIT:
+			event_name = "AsyncExecWait";
+			break;
 		/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 1c9bf13..40c6d08 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -785,7 +785,8 @@ typedef enum
 	WAIT_EVENT_MQ_SEND,
 	WAIT_EVENT_PARALLEL_FINISH,
 	WAIT_EVENT_SAFE_SNAPSHOT,
-	WAIT_EVENT_SYNC_REP
+	WAIT_EVENT_SYNC_REP,
+	WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
-- 
2.9.2

>From 126ed476a6d41e5cfb54be387123ac3a8e9963d0 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 17 Oct 2016 15:54:32 +0900
Subject: [PATCH 3/6] Modify async execution infrastructure.

---
 contrib/postgres_fdw/expected/postgres_fdw.out |  68 ++++++++--------
 contrib/postgres_fdw/postgres_fdw.c            |   5 +-
 src/backend/executor/execAsync.c               | 105 ++++++++++++++-----------
 src/backend/executor/nodeAppend.c              |  50 ++++++------
 src/backend/executor/nodeForeignscan.c         |   4 +-
 src/backend/nodes/copyfuncs.c                  |   1 +
 src/backend/nodes/outfuncs.c                   |   1 +
 src/backend/nodes/readfuncs.c                  |   1 +
 src/backend/optimizer/plan/createplan.c        |  24 +++++-
 src/backend/utils/adt/ruleutils.c              |   6 +-
 src/include/executor/nodeForeignscan.h         |   2 +-
 src/include/foreign/fdwapi.h                   |   2 +-
 src/include/nodes/execnodes.h                  |  10 ++-
 src/include/nodes/plannodes.h                  |   1 +
 14 files changed, 167 insertions(+), 113 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f9fd172..4b76e41 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6329,13 +6329,13 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for update;
-                                                       QUERY PLAN                                                       
-------------------------------------------------------------------------------------------------------------------------
+                                          QUERY PLAN                                          
+----------------------------------------------------------------------------------------------
  LockRows
-   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
    ->  Hash Join
-         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Append
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
@@ -6343,10 +6343,10 @@ select * from bar where f1 in (select f1 from foo) for update;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6366,13 +6366,13 @@ select * from bar where f1 in (select f1 from foo) for update;
 
 explain (verbose, costs off)
 select * from bar where f1 in (select f1 from foo) for share;
-                                                       QUERY PLAN                                                       
-------------------------------------------------------------------------------------------------------------------------
+                                          QUERY PLAN                                          
+----------------------------------------------------------------------------------------------
  LockRows
-   Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
+   Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
    ->  Hash Join
-         Output: bar2.f1, bar2.f2, bar2.ctid, ((bar2.*)::bar), bar2.tableoid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Append
                ->  Foreign Scan on public.bar2
                      Output: bar2.f1, bar2.f2, bar2.ctid, bar2.*, bar2.tableoid
@@ -6380,10 +6380,10 @@ select * from bar where f1 in (select f1 from foo) for share;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6404,22 +6404,22 @@ select * from bar where f1 in (select f1 from foo) for share;
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
-                                               QUERY PLAN                                                
----------------------------------------------------------------------------------------------------------
+                                         QUERY PLAN                                          
+---------------------------------------------------------------------------------------------
  Update on public.bar
    Update on public.bar
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar.f1 = foo2.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar.f1 = foo.f1)
          ->  Seq Scan on public.bar
                Output: bar.f1, bar.f2, bar.ctid
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6427,16 +6427,16 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                            ->  Seq Scan on public.foo
                                  Output: foo.ctid, foo.*, foo.tableoid, foo.f1
    ->  Hash Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo2.ctid, ((foo2.*)::foo), foo2.tableoid
-         Hash Cond: (bar2.f1 = foo2.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, foo.ctid, foo.*, foo.tableoid
+         Hash Cond: (bar2.f1 = foo.f1)
          ->  Foreign Scan on public.bar2
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Hash
-               Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
+               Output: foo.ctid, foo.*, foo.tableoid, foo.f1
                ->  HashAggregate
-                     Output: foo2.ctid, ((foo2.*)::foo), foo2.tableoid, foo2.f1
-                     Group Key: foo2.f1
+                     Output: foo.ctid, foo.*, foo.tableoid, foo.f1
+                     Group Key: foo.f1
                      ->  Append
                            ->  Foreign Scan on public.foo2
                                  Output: foo2.ctid, foo2.*, foo2.tableoid, foo2.f1
@@ -6470,8 +6470,8 @@ where bar.f1 = ss.f1;
    Foreign Update on public.bar2
      Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1
    ->  Hash Join
-         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo2.f1))
-         Hash Cond: (foo2.f1 = bar.f1)
+         Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
+         Hash Cond: (foo.f1 = bar.f1)
          ->  Append
                ->  Foreign Scan on public.foo2
                      Output: ROW(foo2.f1), foo2.f1
@@ -6488,8 +6488,8 @@ where bar.f1 = ss.f1;
                ->  Seq Scan on public.bar
                      Output: bar.f1, bar.f2, bar.ctid
    ->  Merge Join
-         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo2.f1))
-         Merge Cond: (bar2.f1 = foo2.f1)
+         Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, (ROW(foo.f1))
+         Merge Cond: (bar2.f1 = foo.f1)
          ->  Sort
                Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                Sort Key: bar2.f1
@@ -6497,8 +6497,8 @@ where bar.f1 = ss.f1;
                      Output: bar2.f1, bar2.f2, bar2.f3, bar2.ctid
                      Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
          ->  Sort
-               Output: (ROW(foo2.f1)), foo2.f1
-               Sort Key: foo2.f1
+               Output: (ROW(foo.f1)), foo.f1
+               Sort Key: foo.f1
                ->  Append
                      ->  Foreign Scan on public.foo2
                            Output: ROW(foo2.f1), foo2.f1
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index e75f8a1..830212f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -354,7 +354,7 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
 static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
 static void postgresForeignAsyncRequest(EState *estate,
 							PendingAsyncRequest *areq);
-static void postgresForeignAsyncConfigureWait(EState *estate,
+static bool postgresForeignAsyncConfigureWait(EState *estate,
 								  PendingAsyncRequest *areq,
 								  bool reinit);
 static void postgresForeignAsyncNotify(EState *estate,
@@ -4479,11 +4479,12 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 	ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
-static void
+static bool
 postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 								  bool reinit)
 {
 	elog(ERROR, "postgresForeignAsyncConfigureWait");
+	return false;
 }
 
 static void
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index e070c26..33496a9 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -22,7 +22,7 @@
 #include "storage/latch.h"
 
 static bool ExecAsyncEventWait(EState *estate, long timeout);
-static void ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
+static bool ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 	bool reinit);
 static void ExecAsyncNotify(EState *estate, PendingAsyncRequest *areq);
 static void ExecAsyncResponse(EState *estate, PendingAsyncRequest *areq);
@@ -43,7 +43,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 				 PlanState *requestee)
 {
 	PendingAsyncRequest *areq = NULL;
-	int		i = estate->es_num_pending_async;
+	int		nasync = estate->es_num_pending_async;
 
 	/*
 	 * If the number of pending asynchronous nodes exceeds the number of
@@ -51,7 +51,7 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 	 * We start with 16 slots, and thereafter double the array size each
 	 * time we run out of slots.
 	 */
-	if (i >= estate->es_max_pending_async)
+	if (nasync >= estate->es_max_pending_async)
 	{
 		int	newmax;
 
@@ -81,25 +81,28 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 	 * PendingAsyncRequest if there is one.  If not, we must allocate a new
 	 * one.
 	 */
-	if (estate->es_pending_async[i] == NULL)
+	if (estate->es_pending_async[nasync] == NULL)
 	{
 		areq = MemoryContextAllocZero(estate->es_query_cxt,
 									  sizeof(PendingAsyncRequest));
-		estate->es_pending_async[i] = areq;
+		estate->es_pending_async[nasync] = areq;
 	}
 	else
 	{
-		areq = estate->es_pending_async[i];
+		areq = estate->es_pending_async[nasync];
 		MemSet(areq, 0, sizeof(PendingAsyncRequest));
 	}
-	areq->myindex = estate->es_num_pending_async++;
+	areq->myindex = estate->es_num_pending_async;
 
 	/* Initialize the new request. */
 	areq->requestor = requestor;
 	areq->request_index = request_index;
 	areq->requestee = requestee;
 
-	/* Give the requestee a chance to do whatever it wants. */
+	/*
+	 * Give the requestee a chance to do whatever it wants.
+	 * Requst functions return true if a result is immediately available.
+	 */
 	switch (nodeTag(requestee))
 	{
 		case T_ForeignScanState:
@@ -110,6 +113,20 @@ ExecAsyncRequest(EState *estate, PlanState *requestor, int request_index,
 			elog(ERROR, "unrecognized node type: %d",
 				(int) nodeTag(requestee));
 	}
+
+	/*
+	 * If a result is available, complete it immediately.
+	 */
+	if (areq->state == ASYNC_COMPLETE)
+	{
+		Assert(areq->result == NULL || IsA(areq->result, TupleTableSlot));
+		ExecAsyncResponse(estate, areq);
+
+		return;
+	}
+
+	/* No result available now, make this node pending */
+	estate->es_num_pending_async++;
 }
 
 /*
@@ -175,22 +192,19 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
 		{
 			PendingAsyncRequest *areq = estate->es_pending_async[i];
 
-			/* Skip it if no callback is pending. */
-			if (!areq->callback_pending)
-				continue;
-
-			/*
-			 * Mark it as no longer needing a callback.  We must do this
-			 * before dispatching the callback in case the callback resets
-			 * the flag.
-			 */
-			areq->callback_pending = false;
-			estate->es_async_callback_pending--;
-
-			/* Perform the actual callback; set request_done if appropraite. */
-			if (!areq->request_complete)
+			/* Skip it if not pending. */
+			if (areq->state == ASYNC_CALLBACK_PENDING)
+			{
+				/*
+				 * Mark it as no longer needing a callback.  We must do this
+				 * before dispatching the callback in case the callback resets
+				 * the flag.
+				 */
+				estate->es_async_callback_pending--;
 				ExecAsyncNotify(estate, areq);
-			else
+			}
+
+			if (areq->state == ASYNC_COMPLETE)
 			{
 				any_node_done = true;
 				if (requestor == areq->requestor)
@@ -214,7 +228,7 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
 				PendingAsyncRequest *head;
 				PendingAsyncRequest *tail = estate->es_pending_async[tidx];
 
-				if (!tail->callback_pending && tail->request_complete)
+				if (tail->state == ASYNC_COMPLETE)
 					continue;
 				head = estate->es_pending_async[hidx];
 				estate->es_pending_async[tidx] = head;
@@ -247,7 +261,8 @@ ExecAsyncEventLoop(EState *estate, PlanState *requestor, long timeout)
  * means wait forever, 0 means don't wait at all, and >0 means wait for the
  * indicated number of milliseconds.
  *
- * Returns true if we found some events and false if we timed out.
+ * Returns true if we found some events and false if we timed out or there's
+ * no event to wait. The latter is occur when the areq is processed during
  */
 static bool
 ExecAsyncEventWait(EState *estate, long timeout)
@@ -258,6 +273,7 @@ ExecAsyncEventWait(EState *estate, long timeout)
 	int		n;
 	bool	reinit = false;
 	bool	process_latch_set = false;
+	bool	added = false;
 
 	if (estate->es_wait_event_set == NULL)
 	{
@@ -282,13 +298,16 @@ ExecAsyncEventWait(EState *estate, long timeout)
 		PendingAsyncRequest *areq = estate->es_pending_async[i];
 
 		if (areq->num_fd_events > 0)
-			ExecAsyncConfigureWait(estate, areq, reinit);
+			added |= ExecAsyncConfigureWait(estate, areq, reinit);
 	}
 
+	Assert(added);
+
 	/* Wait for at least one event to occur. */
 	noccurred = WaitEventSetWait(estate->es_wait_event_set, timeout,
 								 occurred_event, EVENT_BUFFER_SIZE,
 								 WAIT_EVENT_ASYNC_WAIT);
+
 	if (noccurred == 0)
 		return false;
 
@@ -312,12 +331,10 @@ ExecAsyncEventWait(EState *estate, long timeout)
 		{
 			PendingAsyncRequest *areq = w->user_data;
 
-			if (!areq->callback_pending)
-			{
-				Assert(!areq->request_complete);
-				areq->callback_pending = true;
-				estate->es_async_callback_pending++;
-			}
+			Assert(areq->state == ASYNC_WAITING);
+
+			areq->state = ASYNC_CALLBACK_PENDING;
+			estate->es_async_callback_pending++;
 		}
 	}
 
@@ -333,8 +350,8 @@ ExecAsyncEventWait(EState *estate, long timeout)
 
 			if (areq->wants_process_latch)
 			{
-				Assert(!areq->request_complete);
-				areq->callback_pending = true;
+				Assert(areq->state == ASYNC_WAITING);
+				areq->state = ASYNC_CALLBACK_PENDING;
 			}
 		}
 	}
@@ -352,15 +369,19 @@ ExecAsyncEventWait(EState *estate, long timeout)
  * The events should include only WL_SOCKET_READABLE or WL_SOCKET_WRITEABLE,
  * and the number of calls should not exceed areq->num_fd_events (as
  * prevously set via ExecAsyncSetRequiredEvents).
+ *
+ * Individual requests can omit registering an event but it is a
+ * responsibility of the node driver to set at least one event per one
+ * requestor.
  */
-static void
+static bool
 ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 					   bool reinit)
 {
 	switch (nodeTag(areq->requestee))
 	{
 		case T_ForeignScanState:
-			ExecAsyncForeignScanConfigureWait(estate, areq, reinit);
+			return ExecAsyncForeignScanConfigureWait(estate, areq, reinit);
 			break;
 		default:
 			elog(ERROR, "unrecognized node type: %d",
@@ -419,6 +440,7 @@ ExecAsyncSetRequiredEvents(EState *estate, PendingAsyncRequest *areq,
 	estate->es_total_fd_events += num_fd_events - areq->num_fd_events;
 	areq->num_fd_events = num_fd_events;
 	areq->wants_process_latch = wants_process_latch;
+	areq->state = ASYNC_WAITING;
 
 	if (force_reset && estate->es_wait_event_set != NULL)
 	{
@@ -448,17 +470,12 @@ ExecAsyncRequestDone(EState *estate, PendingAsyncRequest *areq, Node *result)
 	 * need a callback to remove registered wait events.  It's not clear
 	 * that we would come out ahead, so use brute force for now.
 	 */
+	Assert(areq->state == ASYNC_IDLE || areq->state == ASYNC_CALLBACK_PENDING);
+
 	if (areq->num_fd_events > 0 || areq->wants_process_latch)
 		ExecAsyncSetRequiredEvents(estate, areq, 0, false, true);
 
 	/* Save result and mark request as complete. */
 	areq->result = result;
-	areq->request_complete = true;
-
-	/* Make sure this request is flagged for a callback. */
-	if (!areq->callback_pending)
-	{
-		areq->callback_pending = true;
-		estate->es_async_callback_pending++;
-	}
+	areq->state = ASYNC_COMPLETE;
 }
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index bb06569..c234f1f 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -229,9 +229,15 @@ ExecAppend(AppendState *node)
 		 */
 		while ((i = bms_first_member(node->as_needrequest)) >= 0)
 		{
-			ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]);
 			node->as_nasyncpending++;
+
+			ExecAsyncRequest(estate, &node->ps, i, node->appendplans[i]);
+			/* If this request immediately gives a result, take it. */
+			if (node->as_nasyncresult > 0)
+				return node->as_asyncresult[--node->as_nasyncresult];
 		}
+		if (node->as_nasyncpending == 0 && node->as_syncdone)
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 	}
 
 	for (;;)
@@ -246,32 +252,32 @@ ExecAppend(AppendState *node)
 		{
 			long	timeout = node->as_syncdone ? -1 : 0;
 
-			for (;;)
+			while (node->as_nasyncpending > 0)
 			{
-				if (node->as_nasyncpending == 0)
-				{
-					/*
-					 * If there is no asynchronous activity still pending
-					 * and the synchronous activity is also complete, we're
-					 * totally done scanning this node.  Otherwise, we're
-					 * done with the asynchronous stuff but must continue
-					 * scanning the synchronous children.
-					 */
-					if (node->as_syncdone)
-						return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-					break;
-				}
-				if (!ExecAsyncEventLoop(node->ps.state, &node->ps, timeout))
-				{
-					/* Timeout reached. */
-					break;
-				}
-				if (node->as_nasyncresult > 0)
+				if (ExecAsyncEventLoop(node->ps.state, &node->ps, timeout) &&
+					node->as_nasyncresult > 0)
 				{
 					/* Asynchronous subplan returned a tuple! */
 					--node->as_nasyncresult;
 					return node->as_asyncresult[node->as_nasyncresult];
 				}
+
+				/* Timeout reached. Go through to sync nodes if exists */
+				if (!node->as_syncdone)
+					break;
+			}
+
+			/*
+			 * If there is no asynchronous activity still pending and the
+			 * synchronous activity is also complete, we're totally done
+			 * scanning this node.  Otherwise, we're done with the
+			 * asynchronous stuff but must continue scanning the synchronous
+			 * children.
+			 */
+			if (node->as_syncdone)
+			{
+				Assert(node->as_nasyncpending == 0);
+				return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 			}
 		}
 
@@ -397,7 +403,7 @@ ExecAsyncAppendResponse(EState *estate, PendingAsyncRequest *areq)
 	TupleTableSlot *slot;
 
 	/* We shouldn't be called until the request is complete. */
-	Assert(areq->request_complete);
+	Assert(areq->state == ASYNC_COMPLETE);
 
 	/* Our result slot shouldn't already be occupied. */
 	Assert(TupIsNull(node->ps.ps_ResultTupleSlot));
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 85d436f..d3567bb 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -378,7 +378,7 @@ ExecAsyncForeignScanRequest(EState *estate, PendingAsyncRequest *areq)
  *		In async mode, configure for a wait
  * ----------------------------------------------------------------
  */
-void
+bool
 ExecAsyncForeignScanConfigureWait(EState *estate,
 	PendingAsyncRequest *areq, bool reinit)
 {
@@ -386,7 +386,7 @@ ExecAsyncForeignScanConfigureWait(EState *estate,
 	FdwRoutine *fdwroutine = node->fdwroutine;
 
 	Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
-	fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit);
+	return fdwroutine->ForeignAsyncConfigureWait(estate, areq, reinit);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 23b4e18..72d8cd6 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -219,6 +219,7 @@ _copyAppend(const Append *from)
 	 */
 	COPY_NODE_FIELD(appendplans);
 	COPY_SCALAR_FIELD(nasyncplans);
+	COPY_SCALAR_FIELD(referent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index dc5b938..1ebdc48 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -360,6 +360,7 @@ _outAppend(StringInfo str, const Append *node)
 
 	WRITE_NODE_FIELD(appendplans);
 	WRITE_INT_FIELD(nasyncplans);
+	WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 69453b5..8443a62 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1520,6 +1520,7 @@ _readAppend(void)
 
 	READ_NODE_FIELD(appendplans);
 	READ_INT_FIELD(nasyncplans);
+	READ_INT_FIELD(referent);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 7caa8d3..ff1d663 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -193,7 +193,8 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual,
 			 Index scanrelid, int ctePlanId, int cteParam);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
 				   Index scanrelid, int wtParam);
-static Append *make_append(List *asyncplans, int nasyncplans, List *tlist);
+static Append *make_append(List *asyncplans, int nasyncplans,
+						   int referent, List *tlist);
 static RecursiveUnion *make_recursive_union(List *tlist,
 					 Plan *lefttree,
 					 Plan *righttree,
@@ -960,6 +961,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	List	   *syncplans = NIL;
 	ListCell   *subpaths;
 	int			nasyncplans = 0;
+	bool		first = true;
+	bool		referent_is_sync = true;
 
 	/*
 	 * The subpaths list could be empty, if every child was proven empty by
@@ -985,7 +988,14 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		return plan;
 	}
 
-	/* Build the plan for each child */
+	/*
+	 * Build the plan for each child
+
+	 * The first child in an inheritance set is the representative in
+	 * explaining tlist entries (see set_deparse_planstate). We should keep
+	 * the first child in best_path->subpaths at the head of the subplan list
+	 * for the reason.
+	 */
 	foreach(subpaths, best_path->subpaths)
 	{
 		Path	   *subpath = (Path *) lfirst(subpaths);
@@ -999,9 +1009,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 		{
 			asyncplans = lappend(asyncplans, subplan);
 			++nasyncplans;
+			if (first)
+				referent_is_sync = false;
 		}
 		else
 			syncplans = lappend(syncplans, subplan);
+
+		first = false;
 	}
 
 	/*
@@ -1011,7 +1025,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
 	 * parent-rel Vars it'll be asked to emit.
 	 */
 
-	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans, tlist);
+	plan = make_append(list_concat(asyncplans, syncplans), nasyncplans,
+					   referent_is_sync ? nasyncplans : 0, tlist);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -4951,7 +4966,7 @@ make_foreignscan(List *qptlist,
 }
 
 static Append *
-make_append(List *appendplans, int nasyncplans, List *tlist)
+make_append(List *appendplans, int nasyncplans,	int referent, List *tlist)
 {
 	Append	   *node = makeNode(Append);
 	Plan	   *plan = &node->plan;
@@ -4962,6 +4977,7 @@ make_append(List *appendplans, int nasyncplans, List *tlist)
 	plan->righttree = NULL;
 	node->appendplans = appendplans;
 	node->nasyncplans = nasyncplans;
+	node->referent = referent;
 
 	return node;
 }
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 8a81d7a..de0e96c 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4056,7 +4056,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
 	 * lists containing references to non-target relations.
 	 */
 	if (IsA(ps, AppendState))
-		dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+	{
+		int idx = ((Append*)(((AppendState *) ps)->ps.plan))->referent;
+		dpns->outer_planstate =
+			((AppendState *) ps)->appendplans[idx];
+	}
 	else if (IsA(ps, MergeAppendState))
 		dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
 	else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 3e69ab0..47a3920 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,7 +31,7 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
 
 extern void ExecAsyncForeignScanRequest(EState *estate,
 	PendingAsyncRequest *areq);
-extern void ExecAsyncForeignScanConfigureWait(EState *estate,
+extern bool ExecAsyncForeignScanConfigureWait(EState *estate,
 	PendingAsyncRequest *areq, bool reinit);
 extern void ExecAsyncForeignScanNotify(EState *estate,
 	PendingAsyncRequest *areq);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 88feb9a..65517fd 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -158,7 +158,7 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
 typedef void (*ForeignAsyncRequest_function) (EState *estate,
 											PendingAsyncRequest *areq);
-typedef void (*ForeignAsyncConfigureWait_function) (EState *estate,
+typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate,
 											PendingAsyncRequest *areq,
 											bool reinit);
 typedef void (*ForeignAsyncNotify_function) (EState *estate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b50b41c..0c6af86 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -352,6 +352,13 @@ typedef struct ResultRelInfo
  * State for an asynchronous tuple request.
  * ----------------
  */
+typedef enum AsyncRequestState
+{
+	ASYNC_IDLE,
+	ASYNC_WAITING,
+	ASYNC_CALLBACK_PENDING,
+	ASYNC_COMPLETE
+} AsyncRequestState;
 typedef struct PendingAsyncRequest
 {
 	int			myindex;			/* Index in es_pending_async. */
@@ -360,8 +367,7 @@ typedef struct PendingAsyncRequest
 	int			request_index;	/* Scratch space for requestor. */
 	int			num_fd_events;	/* Max number of FD events requestee needs. */
 	bool		wants_process_latch;	/* Requestee cares about MyLatch. */
-	bool		callback_pending;			/* Callback is needed. */
-	bool		request_complete;		/* Request complete, result valid. */
+	AsyncRequestState state;
 	Node	   *result;			/* Result (NULL if no more tuples). */
 } PendingAsyncRequest;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 327119b..1df6693 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -209,6 +209,7 @@ typedef struct Append
 	Plan		plan;
 	List	   *appendplans;
 	int			nasyncplans;	/* # of async plans, always at start of list */
+	int			referent; 		/* index of inheritance tree referent */
 } Append;
 
 /* ----------------
-- 
2.9.2

>From 62d27e1420de596dbd6a3ecdae1dc1d0a51116cf Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 17 Oct 2016 16:00:56 +0900
Subject: [PATCH 4/6] Make postgres_fdw async-capable

---
 contrib/postgres_fdw/connection.c              |  79 ++--
 contrib/postgres_fdw/expected/postgres_fdw.out |  64 ++--
 contrib/postgres_fdw/postgres_fdw.c            | 483 +++++++++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h            |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql      |   4 +-
 src/backend/executor/execProcnode.c            |   9 +
 src/include/foreign/fdwapi.h                   |   2 +
 7 files changed, 510 insertions(+), 133 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index bcdddc2..ebc9417 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -49,6 +49,7 @@ typedef struct ConnCacheEntry
 								 * one level of subxact open, etc */
 	bool		have_prep_stmt; /* have we prepared any stmts in this xact? */
 	bool		have_error;		/* have any subxacts aborted in this xact? */
+	void		*storage;		/* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -64,6 +65,7 @@ static unsigned int prep_stmt_number = 0;
 static bool xact_got_connection = false;
 
 /* prototypes of private functions */
+static ConnCacheEntry *get_connection_entry(Oid umid);
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void check_conn_params(const char **keywords, const char **values);
 static void configure_remote_session(PGconn *conn);
@@ -75,26 +77,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
 					   SubTransactionId parentSubid,
 					   void *arg);
 
-
 /*
- * Get a PGconn which can be used to execute queries on the remote PostgreSQL
- * server with the user's authorization.  A new connection is established
- * if we don't already have a suitable one, and a transaction is opened at
- * the right subtransaction nesting depth if we didn't do that already.
- *
- * will_prep_stmt must be true if caller intends to create any prepared
- * statements.  Since those don't go away automatically at transaction end
- * (not even on error), we need this flag to cue manual cleanup.
- *
- * XXX Note that caching connections theoretically requires a mechanism to
- * detect change of FDW objects to invalidate already established connections.
- * We could manage that by watching for invalidation events on the relevant
- * syscaches.  For the moment, though, it's not clear that this would really
- * be useful and not mere pedantry.  We could not flush any active connections
- * mid-transaction anyway.
+ * Common function to acquire or create a connection cache entry.
  */
-PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+
+static ConnCacheEntry *
+get_connection_entry(Oid umid)
 {
 	bool		found;
 	ConnCacheEntry *entry;
@@ -122,11 +110,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 	}
 
-	/* Set flag that we did GetConnection during the current transaction */
-	xact_got_connection = true;
-
 	/* Create hash key for the entry.  Assume no pad bytes in key struct */
-	key = user->umid;
+	key = umid;
 
 	/*
 	 * Find or create cached entry for requested connection.
@@ -139,8 +124,39 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 		entry->xact_depth = 0;
 		entry->have_prep_stmt = false;
 		entry->have_error = false;
+		entry->storage = NULL;
 	}
 
+	return entry;
+}
+
+/*
+ * Get a PGconn which can be used to execute queries on the remote PostgreSQL
+ * server with the user's authorization.  A new connection is established
+ * if we don't already have a suitable one, and a transaction is opened at
+ * the right subtransaction nesting depth if we didn't do that already.
+ *
+ * will_prep_stmt must be true if caller intends to create any prepared
+ * statements.  Since those don't go away automatically at transaction end
+ * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * XXX Note that caching connections theoretically requires a mechanism to
+ * detect change of FDW objects to invalidate already established connections.
+ * We could manage that by watching for invalidation events on the relevant
+ * syscaches.  For the moment, though, it's not clear that this would really
+ * be useful and not mere pedantry.  We could not flush any active connections
+ * mid-transaction anyway.
+ */
+PGconn *
+GetConnection(UserMapping *user, bool will_prep_stmt)
+{
+	ConnCacheEntry *entry;
+
+	/* Set flag that we did GetConnection during the current transaction */
+	xact_got_connection = true;
+
+	entry = get_connection_entry(user->umid);
+
 	/*
 	 * We don't check the health of cached connection here, because it would
 	 * require some overhead.  Broken connection will be detected when the
@@ -177,6 +193,25 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 }
 
 /*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+	ConnCacheEntry *entry;
+
+	entry = get_connection_entry(user->umid);
+	if (entry->storage == NULL)
+	{
+		entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+		memset(entry->storage, 0, initsize);
+	}
+
+	return entry->storage;
+}
+
+/*
  * Connect to remote server using specified server and user mapping properties.
  */
 static PGconn *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 4b76e41..ca69074 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6181,12 +6181,12 @@ INSERT INTO b(aa) VALUES('bbbbb');
 SELECT tableoid::regclass, * FROM a;
  tableoid |  aa   
 ----------+-------
- b        | bbb
- b        | bbbb
- b        | bbbbb
  a        | aaa
  a        | aaaa
  a        | aaaaa
+ b        | bbb
+ b        | bbbb
+ b        | bbbbb
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6209,12 +6209,12 @@ UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | bbb
- b        | bbbb
- b        | bbbbb
  a        | aaa
  a        | zzzzzz
  a        | zzzzzz
+ b        | bbb
+ b        | bbbb
+ b        | bbbbb
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6237,12 +6237,12 @@ UPDATE b SET aa = 'new';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | new
- b        | new
- b        | new
  a        | aaa
  a        | zzzzzz
  a        | zzzzzz
+ b        | new
+ b        | new
+ b        | new
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6265,12 +6265,12 @@ UPDATE a SET aa = 'newtoo';
 SELECT tableoid::regclass, * FROM a;
  tableoid |   aa   
 ----------+--------
- b        | newtoo
- b        | newtoo
- b        | newtoo
  a        | newtoo
  a        | newtoo
  a        | newtoo
+ b        | newtoo
+ b        | newtoo
+ b        | newtoo
 (6 rows)
 
 SELECT tableoid::regclass, * FROM b;
@@ -6358,9 +6358,9 @@ select * from bar where f1 in (select f1 from foo) for update;
 select * from bar where f1 in (select f1 from foo) for update;
  f1 | f2 
 ----+----
+  1 | 11
   3 | 33
   4 | 44
-  1 | 11
   2 | 22
 (4 rows)
 
@@ -6395,9 +6395,9 @@ select * from bar where f1 in (select f1 from foo) for share;
 select * from bar where f1 in (select f1 from foo) for share;
  f1 | f2 
 ----+----
+  1 | 11
   3 | 33
   4 | 44
-  1 | 11
   2 | 22
 (4 rows)
 
@@ -6660,27 +6660,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
-  2 | 322
   1 | 311
-  6 | 266
+  2 | 322
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 830212f..9244e51 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -35,6 +35,7 @@
 #include "optimizer/var.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
@@ -54,6 +55,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -123,10 +127,27 @@ enum FdwDirectModifyPrivateIndex
 };
 
 /*
+ * Connection private area structure.
+ */
+ typedef struct PgFdwConnspecate
+{
+	ForeignScanState *current_owner;	/* The node currently running a query
+										 * on this connection*/
+} PgFdwConnspecate;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+	PGconn	   *conn;			/* connection for the scan */
+	PgFdwConnspecate *connspec;	/* connection private memory */
+} PgFdwState;
+
+/*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table. NULL
 								 * for a foreign join scan. */
 	TupleDesc	tupdesc;		/* tuple descriptor of scan */
@@ -137,7 +158,7 @@ typedef struct PgFdwScanState
 	List	   *retrieved_attrs;	/* list of retrieved attribute numbers */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
+	bool		result_ready;
 	unsigned int cursor_number; /* quasi-unique ID for my cursor */
 	bool		cursor_exists;	/* have we created the cursor? */
 	int			numParams;		/* number of parameters passed to query */
@@ -153,6 +174,13 @@ typedef struct PgFdwScanState
 	/* batch-level state, for optimizing rewinds and avoiding useless fetch */
 	int			fetch_ct_2;		/* Min(# of fetches done, 2) */
 	bool		eof_reached;	/* true if last fetch reached EOF */
+	bool		run_async;		/* true if run asynchronously */
+	bool		async_waiting;	/* true if requesting the parent to wait */
+	ForeignScanState *waiter;	/* Next node to run a query among nodes
+								 * sharing the same connection */
+	ForeignScanState *last_waiter;	/* A waiting node at the end of a waiting
+								 * list. Maintained only by the current
+									 * owner of the connection */
 
 	/* working memory contexts */
 	MemoryContext batch_cxt;	/* context holding current batch of tuples */
@@ -166,11 +194,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
 	/* for remote query execution */
-	PGconn	   *conn;			/* connection for the scan */
 	char	   *p_name;			/* name of prepared statement, if created */
 
 	/* extracted fdw_private data */
@@ -193,6 +221,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+	PgFdwState	s;				/* common structure */
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 
@@ -291,6 +320,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
 								RangeTblEntry *target_rte,
 								Relation target_relation);
@@ -355,8 +385,8 @@ static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
 static void postgresForeignAsyncRequest(EState *estate,
 							PendingAsyncRequest *areq);
 static bool postgresForeignAsyncConfigureWait(EState *estate,
-								  PendingAsyncRequest *areq,
-								  bool reinit);
+						    PendingAsyncRequest *areq,
+						    bool reinit);
 static void postgresForeignAsyncNotify(EState *estate,
 						   PendingAsyncRequest *areq);
 
@@ -379,7 +409,10 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 						  EquivalenceClass *ec, EquivalenceMember *em,
 						  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn);
+static void absorb_current_result(ForeignScanState *node);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
@@ -444,6 +477,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->IterateForeignScan = postgresIterateForeignScan;
 	routine->ReScanForeignScan = postgresReScanForeignScan;
 	routine->EndForeignScan = postgresEndForeignScan;
+	routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
 	/* Functions for updating foreign tables */
 	routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -1337,12 +1371,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	fsstate->conn = GetConnection(user, false);
+	fsstate->s.conn = GetConnection(user, false);
+	fsstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
+	fsstate->s.connspec->current_owner = NULL;
+	fsstate->waiter = NULL;
+	fsstate->last_waiter = node;
 
 	/* Assign a unique ID for my cursor */
-	fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+	fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
 	fsstate->cursor_exists = false;
 
+	/* Initialize async execution status */
+	fsstate->run_async = false;
+	fsstate->async_waiting = false;
+
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
@@ -1398,32 +1441,126 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
 	/*
-	 * If this is the first call after Begin or ReScan, we need to create the
-	 * cursor on the remote side.
-	 */
-	if (!fsstate->cursor_exists)
-		create_cursor(node);
-
-	/*
 	 * Get some more tuples, if we've run out.
 	 */
 	if (fsstate->next_tuple >= fsstate->num_tuples)
 	{
-		/* No point in another fetch if we already detected EOF, though. */
-		if (!fsstate->eof_reached)
-			fetch_more_data(node);
-		/* If we didn't get any tuples, must be end of data. */
+		ForeignScanState *next_conn_owner = node;
+
+		/* This node has sent a query on this connection */
+		if (fsstate->s.connspec->current_owner == node)
+		{
+			/* Check if the result is available */
+			if (PQisBusy(fsstate->s.conn))
+			{
+				int rc = WaitLatchOrSocket(NULL,
+										   WL_SOCKET_READABLE | WL_TIMEOUT,
+										   PQsocket(fsstate->s.conn), 0,
+										   WAIT_EVENT_ASYNC_WAIT);
+				if (fsstate->run_async && !(rc & WL_SOCKET_READABLE))
+				{
+					/*
+					 * This node is not ready yet. Tell the caller to wait.
+					 */
+					fsstate->result_ready = false;
+					return ExecClearTuple(slot);
+				}
+			}
+
+			Assert(fsstate->async_waiting);
+			fsstate->async_waiting = false;
+			fetch_received_data(node);
+
+			/*
+			 * If someone is waiting this node on the same connection, let the
+			 * first waiter be the next owner of this connection.
+			 */
+			if (fsstate->waiter)
+			{
+				PgFdwScanState *next_owner_state;
+
+				next_conn_owner = fsstate->waiter;
+				next_owner_state = GetPgFdwScanState(next_conn_owner);
+				fsstate->waiter = NULL;
+
+				/*
+				 * only the current owner is responsible to maintain the shortcut
+				 * to the last waiter
+				 */
+				next_owner_state->last_waiter = fsstate->last_waiter;
+
+				/*
+				 * for simplicity, last_waiter points itself on a node that no one
+				 * is waiting for.
+				 */
+				fsstate->last_waiter = node;
+			}
+		}
+		else if (fsstate->s.connspec->current_owner)
+		{
+			/*
+			 * Anyone else is holding this connection. Add myself to the tail
+			 * of the waiters' list then return not-ready.  To avoid scanning
+			 * through the waiters' list, the current owner is to maintain the
+			 * shortcut to the last waiter.
+			 */
+			PgFdwScanState *conn_owner_state =
+				GetPgFdwScanState(fsstate->s.connspec->current_owner);
+			ForeignScanState *last_waiter = conn_owner_state->last_waiter;
+			PgFdwScanState *last_waiter_state = GetPgFdwScanState(last_waiter);
+
+			last_waiter_state->waiter = node;
+			conn_owner_state->last_waiter = node;
+
+			/* Register the node to the async-waiting node list */
+			Assert(!GetPgFdwScanState(node)->async_waiting);
+
+			GetPgFdwScanState(node)->async_waiting = true;
+
+			fsstate->result_ready = fsstate->eof_reached;
+			return ExecClearTuple(slot);
+		}
+
+		/*
+		 * Send the next request for the next owner of this connection if
+		 * needed.
+		 */
+
+		if (!GetPgFdwScanState(next_conn_owner)->eof_reached)
+		{
+			PgFdwScanState *next_owner_state =
+				GetPgFdwScanState(next_conn_owner);
+
+			request_more_data(next_conn_owner);
+
+			/* Register the node to the async-waiting node list */
+			if (!next_owner_state->async_waiting)
+				next_owner_state->async_waiting = true;
+
+			if (!next_owner_state->run_async)
+				fetch_received_data(next_conn_owner);
+		}
+
+
+		/*
+		 * If we haven't received a result for the given node this time,
+		 * return with no tuple to give way to other nodes.
+		 */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
+		{
+			fsstate->result_ready = fsstate->eof_reached;
 			return ExecClearTuple(slot);
+		}
 	}
 
 	/*
 	 * Return the next tuple.
 	 */
+	fsstate->result_ready = true;
 	ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
 				   slot,
 				   InvalidBuffer,
@@ -1439,7 +1576,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	char		sql[64];
 	PGresult   *res;
 
@@ -1447,6 +1584,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	if (!fsstate->cursor_exists)
 		return;
 
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+
 	/*
 	 * If any internal parameters affecting this node have changed, we'd
 	 * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1475,9 +1615,9 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_exec_query(fsstate->conn, sql);
+	res = pgfdw_exec_query(fsstate->s.conn, sql);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+		pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
 	PQclear(res);
 
 	/* Now force a fresh FETCH. */
@@ -1495,7 +1635,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
 	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
 	if (fsstate == NULL)
@@ -1503,16 +1643,32 @@ postgresEndForeignScan(ForeignScanState *node)
 
 	/* Close the cursor if open, to prevent accumulation of cursors */
 	if (fsstate->cursor_exists)
-		close_cursor(fsstate->conn, fsstate->cursor_number);
+		close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
 	/* Release remote connection */
-	ReleaseConnection(fsstate->conn);
-	fsstate->conn = NULL;
+	ReleaseConnection(fsstate->s.conn);
+	fsstate->s.conn = NULL;
 
 	/* MemoryContexts will be deleted automatically. */
 }
 
 /*
+ * postgresShutdownForeignScan
+ *		Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+	if (plan->operation != CMD_SELECT)
+		return;
+
+	/* Absorb the ramining result */
+	absorb_current_result(node);
+}
+
+/*
  * postgresAddForeignUpdateTargets
  *		Add resjunk column(s) needed for update/delete on a foreign table
  */
@@ -1714,7 +1870,9 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
-	fmstate->conn = GetConnection(user, true);
+	fmstate->s.conn = GetConnection(user, true);
+	fmstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
 	fmstate->p_name = NULL;		/* prepared statement not made yet */
 
 	/* Deconstruct fdw_private data. */
@@ -1793,6 +1951,8 @@ postgresExecForeignInsert(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1803,14 +1963,14 @@ postgresExecForeignInsert(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1818,10 +1978,10 @@ postgresExecForeignInsert(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1859,6 +2019,8 @@ postgresExecForeignUpdate(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1879,14 +2041,14 @@ postgresExecForeignUpdate(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1894,10 +2056,10 @@ postgresExecForeignUpdate(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -1935,6 +2097,8 @@ postgresExecForeignDelete(EState *estate,
 	PGresult   *res;
 	int			n_rows;
 
+	vacate_connection((PgFdwState *)fmstate);
+
 	/* Set up the prepared statement on the remote server, if we didn't yet */
 	if (!fmstate->p_name)
 		prepare_foreign_modify(fmstate);
@@ -1955,14 +2119,14 @@ postgresExecForeignDelete(EState *estate,
 	/*
 	 * Execute the prepared statement.
 	 */
-	if (!PQsendQueryPrepared(fmstate->conn,
+	if (!PQsendQueryPrepared(fmstate->s.conn,
 							 fmstate->p_name,
 							 fmstate->p_nums,
 							 p_values,
 							 NULL,
 							 NULL,
 							 0))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -1970,10 +2134,10 @@ postgresExecForeignDelete(EState *estate,
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) !=
 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
 	/* Check number of rows affected, and fetch RETURNING tuple if any */
 	if (fmstate->has_returning)
@@ -2020,16 +2184,16 @@ postgresEndForeignModify(EState *estate,
 		 * We don't use a PG_TRY block here, so be careful not to throw error
 		 * without releasing the PGresult.
 		 */
-		res = pgfdw_exec_query(fmstate->conn, sql);
+		res = pgfdw_exec_query(fmstate->s.conn, sql);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+			pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
 		PQclear(res);
 		fmstate->p_name = NULL;
 	}
 
 	/* Release remote connection */
-	ReleaseConnection(fmstate->conn);
-	fmstate->conn = NULL;
+	ReleaseConnection(fmstate->s.conn);
+	fmstate->s.conn = NULL;
 }
 
 /*
@@ -2309,7 +2473,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	 * Get connection to the foreign server.  Connection manager will
 	 * establish new connection if necessary.
 	 */
-	dmstate->conn = GetConnection(user, false);
+	dmstate->s.conn = GetConnection(user, false);
+	dmstate->s.connspec = (PgFdwConnspecate *)
+		GetConnectionSpecificStorage(user, sizeof(PgFdwConnspecate));
 
 	/* Initialize state variable */
 	dmstate->num_tuples = -1;	/* -1 means not set yet */
@@ -2362,7 +2528,10 @@ postgresIterateDirectModify(ForeignScanState *node)
 	 * If this is the first call after Begin, execute the statement.
 	 */
 	if (dmstate->num_tuples == -1)
+	{
+		vacate_connection((PgFdwState *)dmstate);
 		execute_dml_stmt(node);
+	}
 
 	/*
 	 * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2409,8 +2578,8 @@ postgresEndDirectModify(ForeignScanState *node)
 		PQclear(dmstate->result);
 
 	/* Release remote connection */
-	ReleaseConnection(dmstate->conn);
-	dmstate->conn = NULL;
+	ReleaseConnection(dmstate->s.conn);
+	dmstate->s.conn = NULL;
 
 	/* MemoryContext will be deleted automatically. */
 }
@@ -2529,6 +2698,7 @@ estimate_path_cost_size(PlannerInfo *root,
 		List	   *local_param_join_conds;
 		StringInfoData sql;
 		PGconn	   *conn;
+		PgFdwConnspecate *connspec;
 		Selectivity local_sel;
 		QualCost	local_cost;
 		List	   *fdw_scan_tlist = NIL;
@@ -2572,6 +2742,16 @@ estimate_path_cost_size(PlannerInfo *root,
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
+		connspec = GetConnectionSpecificStorage(fpinfo->user,
+												sizeof(PgFdwConnspecate));
+		if (connspec)
+		{
+			PgFdwState tmpstate;
+			tmpstate.conn = conn;
+			tmpstate.connspec = connspec;
+			vacate_connection(&tmpstate);
+		}
+
 		get_remote_estimate(sql.data, conn, &rows, &width,
 							&startup_cost, &total_cost);
 		ReleaseConnection(conn);
@@ -2926,11 +3106,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
-	PGconn	   *conn = fsstate->conn;
+	PGconn	   *conn = fsstate->s.conn;
 	StringInfoData buf;
 	PGresult   *res;
 
@@ -2996,47 +3176,96 @@ create_cursor(ForeignScanState *node)
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	PGconn	   *conn = fsstate->s.conn;
+	char		sql[64];
+
+	/* The connection should be vacant */
+	Assert(fsstate->s.connspec->current_owner == NULL);
+
+	/*
+	 * If this is the first call after Begin or ReScan, we need to create the
+	 * cursor on the remote side.
+	 */
+	if (!fsstate->cursor_exists)
+		create_cursor(node);
+
+	snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+			 fsstate->fetch_size, fsstate->cursor_number);
+
+	if (!PQsendQuery(conn, sql))
+		pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+	fsstate->s.connspec->current_owner = node;
+}
+
+/*
+ * Fetch some more rows from the node's cursor.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
 {
-	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
 	PGresult   *volatile res = NULL;
 	MemoryContext oldcontext;
 
+	/* I should be the current connection owner */
+	Assert(fsstate->s.connspec->current_owner == node);
+
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
-	 * batch.
+	 * batch if no tuple is remaining
 	 */
-	fsstate->tuples = NULL;
-	MemoryContextReset(fsstate->batch_cxt);
+	if (fsstate->next_tuple >= fsstate->num_tuples)
+	{
+		fsstate->tuples = NULL;
+		fsstate->num_tuples = 0;
+		MemoryContextReset(fsstate->batch_cxt);
+	}
+	else if (fsstate->next_tuple > 0)
+	{
+		/* move the remaining tuples to the beginning of the store */
+		int n = 0;
+
+		while(fsstate->next_tuple < fsstate->num_tuples)
+			fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+		fsstate->num_tuples = n;
+	}
+
 	oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
 	/* PGresult must be released before leaving this function. */
 	PG_TRY();
 	{
-		PGconn	   *conn = fsstate->conn;
+		PGconn	   *conn = fsstate->s.conn;
 		char		sql[64];
-		int			numrows;
+		int			addrows;
+		size_t		newsize;
 		int			i;
 
 		snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
 				 fsstate->fetch_size, fsstate->cursor_number);
 
-		res = pgfdw_exec_query(conn, sql);
+		res = pgfdw_get_result(conn, sql);
 		/* On error, report the original query, not the FETCH. */
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
 		/* Convert the data into HeapTuples */
-		numrows = PQntuples(res);
-		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-		fsstate->num_tuples = numrows;
-		fsstate->next_tuple = 0;
+		addrows = PQntuples(res);
+		newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+		if (fsstate->tuples)
+			fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+		else
+			fsstate->tuples = (HeapTuple *) palloc(newsize);
 
-		for (i = 0; i < numrows; i++)
+		for (i = 0; i < addrows; i++)
 		{
 			Assert(IsA(node->ss.ps.plan, ForeignScan));
 
-			fsstate->tuples[i] =
+			fsstate->tuples[fsstate->num_tuples + i] =
 				make_tuple_from_result_row(res, i,
 										   fsstate->rel,
 										   fsstate->attinmeta,
@@ -3046,27 +3275,82 @@ fetch_more_data(ForeignScanState *node)
 		}
 
 		/* Update fetch_ct_2 */
-		if (fsstate->fetch_ct_2 < 2)
+		if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
 			fsstate->fetch_ct_2++;
 
+		fsstate->next_tuple = 0;
+		fsstate->num_tuples += addrows;
+
 		/* Must be EOF if we didn't get as many tuples as we asked for. */
-		fsstate->eof_reached = (numrows < fsstate->fetch_size);
+		fsstate->eof_reached = (addrows < fsstate->fetch_size);
 
 		PQclear(res);
 		res = NULL;
 	}
 	PG_CATCH();
 	{
+		fsstate->s.connspec->current_owner = NULL;
 		if (res)
 			PQclear(res);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
+	fsstate->s.connspec->current_owner = NULL;
+
 	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate)
+{
+	PgFdwConnspecate *connspec = fdwstate->connspec;
+	ForeignScanState *owner;
+
+	if (connspec == NULL || connspec->current_owner == NULL)
+		return;
+
+	/*
+	 * let the current connection owner read the result for the running query
+	 */
+	owner = connspec->current_owner;
+	fetch_received_data(owner);
+
+	/* Clear the waiting list */
+	while (owner)
+	{
+		PgFdwScanState *fsstate = GetPgFdwScanState(owner);
+
+		fsstate->last_waiter = NULL;
+		owner = fsstate->waiter;
+		fsstate->waiter = NULL;
+	}
+}
+
+/*
+ * Absorb the result of the current query.
+ */
+static void
+absorb_current_result(ForeignScanState *node)
+{
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+	ForeignScanState *owner = fsstate->s.connspec->current_owner;
+
+	if (owner)
+	{
+		PgFdwScanState *target_state = GetPgFdwScanState(owner);
+		PGconn *conn = target_state->s.conn;
+
+		while(PQisBusy(conn))
+			PQclear(PQgetResult(conn));
+		fsstate->s.connspec->current_owner = NULL;
+		fsstate->async_waiting = false;
+	}
+}
+/*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
  *
@@ -3150,7 +3434,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
 	/* Construct name we'll use for the prepared statement. */
 	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
-			 GetPrepStmtNumber(fmstate->conn));
+			 GetPrepStmtNumber(fmstate->s.conn));
 	p_name = pstrdup(prep_name);
 
 	/*
@@ -3160,12 +3444,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * the prepared statements we use in this module are simple enough that
 	 * the remote server will make the right choices.
 	 */
-	if (!PQsendPrepare(fmstate->conn,
+	if (!PQsendPrepare(fmstate->s.conn,
 					   p_name,
 					   fmstate->query,
 					   0,
 					   NULL))
-		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+		pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3173,9 +3457,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+	res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+		pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 	PQclear(res);
 
 	/* This action shows that the prepare has been done. */
@@ -3306,9 +3590,9 @@ execute_dml_stmt(ForeignScanState *node)
 	 * the desired result.  This allows us to avoid assuming that the remote
 	 * server has the same OIDs we do for the parameters' types.
 	 */
-	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+	if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
 						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+		pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
 	/*
 	 * Get the result, and check for success.
@@ -3316,10 +3600,10 @@ execute_dml_stmt(ForeignScanState *node)
 	 * We don't use a PG_TRY block here, so be careful not to throw error
 	 * without releasing the PGresult.
 	 */
-	dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+	dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
 	if (PQresultStatus(dmstate->result) !=
 		(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-		pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+		pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
 						   dmstate->query);
 
 	/* Get the number of rows affected. */
@@ -4465,8 +4749,10 @@ postgresIsForeignPathAsyncCapable(ForeignPath *path)
 }
 
 /*
- * XXX. Just for testing purposes, let's run everything through the async
- * mechanism but return tuples synchronously.
+ * Accept async request. Notify to the caller if the next tuple is immediately
+ * available. ExecForeignScan does additional work to finishing the returning
+ * tuple, so call it instead of postgresIterateForeignScan to acquire a tuple
+ * in expected shape.
  */
 static void
 postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
@@ -4475,22 +4761,59 @@ postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 	TupleTableSlot *slot;
 
 	Assert(IsA(node, ForeignScanState));
+	GetPgFdwScanState(node)->run_async = true;
 	slot = ExecForeignScan(node);
-	ExecAsyncRequestDone(estate, areq, (Node *) slot);
+	if (GetPgFdwScanState(node)->result_ready)
+		ExecAsyncRequestDone(estate, areq, (Node *) slot);
+	else
+		ExecAsyncSetRequiredEvents(estate, areq, 1, false, false);
 }
 
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection owner. Elsewise
+ * another node on this connection is the owner.
+ */
 static bool
 postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
-								  bool reinit)
+						   bool reinit)
 {
-	elog(ERROR, "postgresForeignAsyncConfigureWait");
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+	/* If the caller didn't reinit, this event is already in event set */
+	if (!reinit)
+		return true;
+
+	if (fsstate->s.connspec->current_owner == node)
+	{
+		AddWaitEventToSet(estate->es_wait_event_set,
+						  WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+						  NULL, areq);
+		return true;
+	}
+
 	return false;
 }
 
+/*
+ * Process a notification from async mechanism. ExecForeignScan does
+ * additional work to complete the returning tuple, so call it instead of
+ * postgresIterateForeignScan to acquire a completed tuple.
+ */
 static void
 postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq)
 {
-	elog(ERROR, "postgresForeignAsyncNotify");
+	ForeignScanState *node = (ForeignScanState *) areq->requestee;
+	TupleTableSlot *slot;
+
+	Assert(IsA(node, ForeignScanState));
+	slot = ExecForeignScan(node);
+	Assert(GetPgFdwScanState(node)->result_ready);
+
+	ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
 /*
@@ -4850,7 +5173,7 @@ make_tuple_from_result_row(PGresult *res,
 		PgFdwScanState *fdw_sstate;
 
 		Assert(fsstate);
-		fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
+		fdw_sstate = GetPgFdwScanState(fsstate);
 		tupdesc = fdw_sstate->tupdesc;
 	}
 
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index f8c255e..1800977 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -79,6 +79,7 @@ typedef struct PgFdwRelationInfo
 	UserMapping *user;			/* only set in use_remote_estimate mode */
 
 	int			fetch_size;		/* fetch size for this remote table */
+	bool		allow_prefetch;	/* true to allow overlapped fetching  */
 
 	/*
 	 * Name of the relation while EXPLAINing ForeignScan. It is used for join
@@ -103,6 +104,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index bb9d41a..d4b5fad 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1552,8 +1552,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 drop table foo cascade;
 drop table bar cascade;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 554244f..f864abe 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -114,6 +114,7 @@
 #include "executor/nodeValuesscan.h"
 #include "executor/nodeWindowAgg.h"
 #include "executor/nodeWorktablescan.h"
+#include "foreign/fdwapi.h"
 #include "nodes/nodeFuncs.h"
 #include "miscadmin.h"
 
@@ -806,6 +807,14 @@ ExecShutdownNode(PlanState *node)
 		case T_GatherState:
 			ExecShutdownGather((GatherState *) node);
 			break;
+		case T_ForeignScanState:
+		{
+			ForeignScanState *fsstate = (ForeignScanState *)node;
+			FdwRoutine *fdwroutine = fsstate->fdwroutine;
+			if (fdwroutine->ShutdownForeignScan)
+				fdwroutine->ShutdownForeignScan((ForeignScanState *) node);
+		}
+		break;
 		default:
 			break;
 	}
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 65517fd..e40db0e 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -163,6 +163,7 @@ typedef bool (*ForeignAsyncConfigureWait_function) (EState *estate,
 											bool reinit);
 typedef void (*ForeignAsyncNotify_function) (EState *estate,
 											PendingAsyncRequest *areq);
+typedef void (*ShutdownForeignScan_function) (ForeignScanState *node);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -239,6 +240,7 @@ typedef struct FdwRoutine
 	ForeignAsyncRequest_function ForeignAsyncRequest;
 	ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
 	ForeignAsyncNotify_function ForeignAsyncNotify;
+	ShutdownForeignScan_function ShutdownForeignScan;
 } FdwRoutine;
 
 
-- 
2.9.2

>From 233e2e5125cdea90fa10fc05dd5ff1885f09cff2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 25 Oct 2016 18:01:56 +0900
Subject: [PATCH 5/6] Use resource owner to prevent wait event set from leaking

Wait event sets created for async execution can live for some
iterations so it leaks in the case of errors during the
iterations. This commit uses resource owner to prevent such leaks.
---
 src/backend/executor/execAsync.c      | 28 ++++++++++++++--
 src/backend/storage/ipc/latch.c       | 19 ++++++++++-
 src/backend/utils/resowner/resowner.c | 63 +++++++++++++++++++++++++++++++++++
 src/include/utils/resowner_private.h  |  8 +++++
 4 files changed, 114 insertions(+), 4 deletions(-)

diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 33496a9..40e3f67 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -20,6 +20,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/latch.h"
+#include "utils/resowner_private.h"
 
 static bool ExecAsyncEventWait(EState *estate, long timeout);
 static bool ExecAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
@@ -277,6 +278,8 @@ ExecAsyncEventWait(EState *estate, long timeout)
 
 	if (estate->es_wait_event_set == NULL)
 	{
+		ResourceOwner savedOwner;
+
 		/*
 		 * Allow for a few extra events without reinitializing.  It
 		 * doesn't seem worth the complexity of doing anything very
@@ -284,9 +287,28 @@ ExecAsyncEventWait(EState *estate, long timeout)
 		 * of external FDs are likely to run afoul of kernel limits anyway.
 		 */
 		estate->es_allocated_fd_events = estate->es_total_fd_events + 16;
-		estate->es_wait_event_set =
-			CreateWaitEventSet(estate->es_query_cxt,
-							   estate->es_allocated_fd_events + 1);
+
+		/*
+		 * The wait event set created here should be released in case of
+		 * error.
+		 */
+		savedOwner = CurrentResourceOwner;
+		CurrentResourceOwner = TopTransactionResourceOwner;
+
+		PG_TRY();
+		{
+			estate->es_wait_event_set =
+				CreateWaitEventSet(estate->es_query_cxt,
+								   estate->es_allocated_fd_events + 1);
+		}
+		PG_CATCH();
+		{
+			CurrentResourceOwner = savedOwner;
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
+
+		CurrentResourceOwner = savedOwner;
 		AddWaitEventToSet(estate->es_wait_event_set,
 						  WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 		reinit = true;
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 8488f94..b8bcae9 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -62,6 +62,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -90,6 +91,7 @@ struct WaitEventSet
 	int			nevents;		/* number of registered events */
 	int			nevents_space;	/* maximum number of events in this set */
 
+	ResourceOwner	resowner;	/* Resource owner */
 	/*
 	 * Array, of nevents_space length, storing the definition of events this
 	 * set is waiting for.
@@ -324,7 +326,13 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	int			ret = 0;
 	int			rc;
 	WaitEvent	event;
-	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+	WaitEventSet *set;
+	ResourceOwner savedOwner = CurrentResourceOwner;
+
+	/* This function doesn't need resowner for event set */
+	CurrentResourceOwner = NULL;
+	set = CreateWaitEventSet(CurrentMemoryContext, 3);
+	CurrentResourceOwner = savedOwner;
 
 	if (wakeEvents & WL_TIMEOUT)
 		Assert(timeout >= 0);
@@ -488,6 +496,9 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	char	   *data;
 	Size		sz = 0;
 
+	if (CurrentResourceOwner)
+		ResourceOwnerEnlargeWESs(CurrentResourceOwner);
+
 	/*
 	 * Use MAXALIGN size/alignment to guarantee that later uses of memory are
 	 * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -547,6 +558,9 @@ CreateWaitEventSet(MemoryContext context, int nevents)
 	StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+	set->resowner = CurrentResourceOwner;
+	if (CurrentResourceOwner)
+		ResourceOwnerRememberWES(set->resowner, set);
 	return set;
 }
 
@@ -582,6 +596,9 @@ FreeWaitEventSet(WaitEventSet *set)
 	}
 #endif
 
+	if (set->resowner != NULL)
+		ResourceOwnerForgetWES(set->resowner, set);
+
 	pfree(set);
 }
 
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 07075ce..272e460 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -124,6 +124,7 @@ typedef struct ResourceOwnerData
 	ResourceArray snapshotarr;	/* snapshot references */
 	ResourceArray filearr;		/* open temporary files */
 	ResourceArray dsmarr;		/* dynamic shmem segments */
+	ResourceArray wesarr;		/* wait event sets */
 
 	/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
 	int			nlocks;			/* number of owned locks */
@@ -169,6 +170,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -437,6 +439,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	ResourceArrayInit(&(owner->snapshotarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
 	ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
+	ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
 	return owner;
 }
@@ -552,6 +555,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 				PrintDSMLeakWarning(res);
 			dsm_detach(res);
 		}
+
+		/* Ditto for wait event sets */
+		while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+		{
+			WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+			if (isCommit)
+				PrintWESLeakWarning(event);
+			FreeWaitEventSet(event);
+		}
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
@@ -702,6 +715,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	Assert(owner->snapshotarr.nitems == 0);
 	Assert(owner->filearr.nitems == 0);
 	Assert(owner->dsmarr.nitems == 0);
+	Assert(owner->waiteventarr.nitems == 0);
 	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
 	/*
@@ -728,6 +742,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	ResourceArrayFree(&(owner->snapshotarr));
 	ResourceArrayFree(&(owner->filearr));
 	ResourceArrayFree(&(owner->dsmarr));
+	ResourceArrayFree(&(owner->wesarr));
 
 	pfree(owner);
 }
@@ -1270,3 +1285,51 @@ PrintDSMLeakWarning(dsm_segment *seg)
 	elog(WARNING, "dynamic shared memory leak: segment %u still referenced",
 		 dsm_segment_handle(seg));
 }
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+	ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+	ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+	/* XXXX: There's no property to identify a wait event set */
+	if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+		elog(ERROR, "wait event set %p is not owned by resource owner %s",
+			 events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+	/* XXXX: There's no property to identify a wait event set */
+	elog(WARNING, "wait event set leak: %p still referenced",
+		 events);
+}
+
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index fd32090..6087257e7 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -88,4 +89,11 @@ extern void ResourceOwnerRememberDSM(ResourceOwner owner,
 extern void ResourceOwnerForgetDSM(ResourceOwner owner,
 					   dsm_segment *);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+						 WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+					   WaitEventSet *);
+
 #endif   /* RESOWNER_PRIVATE_H */
-- 
2.9.2

>From 11749cc592ac8369fcc9fbfb362ddd2a6f2f0a90 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 25 Oct 2016 18:05:30 +0900
Subject: [PATCH 6/6] Apply unlikely to suggest synchronous route of
 ExecAppend.

ExecAppend seems to get slowed down by penalty of misprediction of
branches related to async-execution. Apply unlikey to them to prevent
such penalty on exiting route. Asynchronous execution is already
having a lot of additional code so this doesn't add siginificant
degradation.
---
 src/backend/executor/nodeAppend.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index c234f1f..e82547d 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -208,7 +208,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecAppend(AppendState *node)
 {
-	if (node->as_nasyncplans > 0)
+	if (unlikely(node->as_nasyncplans > 0))
 	{
 		EState *estate = node->ps.state;
 		int	i;
@@ -248,7 +248,7 @@ ExecAppend(AppendState *node)
 		/*
 		 * if we have async requests outstanding, run the event loop
 		 */
-		if (node->as_nasyncpending > 0)
+		if (unlikely(node->as_nasyncpending > 0))
 		{
 			long	timeout = node->as_syncdone ? -1 : 0;
 
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to