 contrib/postgres_fdw/postgres_fdw.c    | 46 ++++++++++++++++++++++-
 src/backend/executor/nodeLimit.c       | 68 ++--------------------------------
 src/backend/executor/nodeMergeAppend.c |  6 ++-
 src/backend/executor/nodeResult.c      | 25 +++++++++++++
 src/backend/executor/nodeSort.c        | 10 +++++
 src/include/nodes/execnodes.h          |  3 ++
 6 files changed, 90 insertions(+), 68 deletions(-)

diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index fbe6929..721a631 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2428,6 +2428,17 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
 	if (es->verbose)
 	{
 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
+
+		/* LIMIT clause is attached on execution time */
+		if (node->ss.ps.ps_numTuples > 0)
+		{
+			StringInfoData	buf;
+
+			initStringInfo(&buf);
+			appendStringInfo(&buf, "%s LIMIT %lu",
+							 sql, node->ss.ps.ps_numTuples);
+			sql = buf.data;
+		}
 		ExplainPropertyText("Remote SQL", sql, es);
 	}
 }
@@ -2553,6 +2564,13 @@ estimate_path_cost_size(PlannerInfo *root,
 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
 								remote_conds, pathkeys, &retrieved_attrs,
 								NULL);
+		/*
+		 * Attach LIMIT if this path is top-level and constant value is
+		 * specified.
+		 */
+		if (root->limit_tuples >= 0.0 &&
+			bms_equal(root->all_baserels, foreignrel->relids))
+			appendStringInfo(&sql, " LIMIT %.0f", root->limit_tuples);
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
@@ -2627,6 +2645,13 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			/* Estimate of number of rows in cross product */
 			nrows = fpinfo_i->rows * fpinfo_o->rows;
+			if (root->limit_tuples >= 0.0 &&
+				bms_equal(root->all_baserels, foreignrel->relids))
+			{
+				nrows = Min(nrows, root->limit_tuples);
+				rows = Min(rows, root->limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimate to at most size of cross product */
 			retrieved_rows = Min(retrieved_rows, nrows);
 
@@ -2724,6 +2749,10 @@ estimate_path_cost_size(PlannerInfo *root,
 			/*
 			 * Number of rows expected from foreign server will be same as
 			 * that of number of groups.
+			 *
+			 * MEMO: root->limit_tuples is not attached when query contains
+			 * grouping-clause or aggregate functions. So, we don's adjust
+			 * rows even if LIMIT <const> is supplied.
 			 */
 			rows = retrieved_rows = numGroups;
 
@@ -2754,8 +2783,17 @@ estimate_path_cost_size(PlannerInfo *root,
 		}
 		else
 		{
+			double		nrows = foreignrel->tuples;
+
+			if (root->limit_tuples >= 0.0 &&
+				bms_equal(root->all_baserels, foreignrel->relids))
+			{
+				nrows = Min(nrows, root->limit_tuples);
+				rows = Min(rows, root->limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
-			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
+			retrieved_rows = Min(retrieved_rows, nrows);
 
 			/*
 			 * Cost as though this were a seqscan, which is pessimistic.  We
@@ -2768,7 +2806,7 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			startup_cost += foreignrel->baserestrictcost.startup;
 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
-			run_cost += cpu_per_tuple * foreignrel->tuples;
+			run_cost += cpu_per_tuple * nrows;
 		}
 
 		/*
@@ -2942,6 +2980,10 @@ create_cursor(ForeignScanState *node)
 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
 					 fsstate->cursor_number, fsstate->query);
 
+	/* Append LIMIT if numTuples were passed-down */
+	if (node->ss.ps.ps_numTuples > 0)
+		appendStringInfo(&buf, " LIMIT %ld", node->ss.ps.ps_numTuples);
+
 	/*
 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
 	 * to infer types for all parameters.  Since we explicitly cast every
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index faf32e1..df3fe01 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -26,7 +26,6 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
@@ -232,6 +231,7 @@ static void
 recompute_limits(LimitState *node)
 {
 	ExprContext *econtext = node->ps.ps_ExprContext;
+	uint64		tuples_needed;
 	Datum		val;
 	bool		isNull;
 
@@ -296,70 +296,8 @@ recompute_limits(LimitState *node)
 	node->lstate = LIMIT_RESCAN;
 
 	/* Notify child node about limit, if useful */
-	pass_down_bound(node, outerPlanState(node));
-}
-
-/*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  Also, if our input is a MergeAppend, we can apply the
- * same bound to any Sorts that are direct children of the MergeAppend,
- * since the MergeAppend surely need read no more than that many tuples from
- * any one input.  We also have to be prepared to look through a Result,
- * since the planner might stick one atop MergeAppend for projection purposes.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
- */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
-{
-	if (IsA(child_node, SortState))
-	{
-		SortState  *sortState = (SortState *) child_node;
-		int64		tuples_needed = node->count + node->offset;
-
-		/* negative test checks for overflow in sum */
-		if (node->noCount || tuples_needed < 0)
-		{
-			/* make sure flag gets reset if needed upon rescan */
-			sortState->bounded = false;
-		}
-		else
-		{
-			sortState->bounded = true;
-			sortState->bound = tuples_needed;
-		}
-	}
-	else if (IsA(child_node, MergeAppendState))
-	{
-		MergeAppendState *maState = (MergeAppendState *) child_node;
-		int			i;
-
-		for (i = 0; i < maState->ms_nplans; i++)
-			pass_down_bound(node, maState->mergeplans[i]);
-	}
-	else if (IsA(child_node, ResultState))
-	{
-		/*
-		 * An extra consideration here is that if the Result is projecting a
-		 * targetlist that contains any SRFs, we can't assume that every input
-		 * tuple generates an output tuple, so a Sort underneath might need to
-		 * return more than N tuples to satisfy LIMIT N. So we cannot use
-		 * bounded sort.
-		 *
-		 * If Result supported qual checking, we'd have to punt on seeing a
-		 * qual, too.  Note that having a resconstantqual is not a
-		 * showstopper: if that fails we're not getting any rows at all.
-		 */
-		if (outerPlanState(child_node) &&
-			!expression_returns_set((Node *) child_node->plan->targetlist))
-			pass_down_bound(node, outerPlanState(child_node));
-	}
+	tuples_needed = (node->noCount ? 0 : node->count + node->offset);
+	outerPlanState(node)->ps_numTuples = tuples_needed;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..91707c6 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -174,10 +174,14 @@ ExecMergeAppend(MergeAppendState *node)
 		/*
 		 * First time through: pull the first tuple from each subplan, and set
 		 * up the heap.
+		 * Also, pass down the required number of tuples
 		 */
 		for (i = 0; i < node->ms_nplans; i++)
 		{
-			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+			PlanState  *pstate = node->mergeplans[i];
+
+			pstate->ps_numTuples = node->ps.ps_numTuples;
+			node->ms_slots[i] = ExecProcNode(pstate);
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 4007b76..9e83eb2 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -47,6 +47,7 @@
 
 #include "executor/executor.h"
 #include "executor/nodeResult.h"
+#include "nodes/nodeFuncs.h"
 #include "utils/memutils.h"
 
 
@@ -75,6 +76,28 @@ ExecResult(ResultState *node)
 	econtext = node->ps.ps_ExprContext;
 
 	/*
+	 * Pass down the number of required tuples by the upper node
+	 *
+	 * An extra consideration here is that if the Result is projecting a
+	 * targetlist that contains any SRFs, we can't assume that every input
+	 * tuple generates an output tuple, so a Sort underneath might need to
+	 * return more than N tuples to satisfy LIMIT N. So we cannot use
+	 * bounded sort.
+	 *
+	 * If Result supported qual checking, we'd have to punt on seeing a
+	 * qual, too.  Note that having a resconstantqual is not a
+	 * showstopper: if that fails we're not getting any rows at all.
+	 */
+	if (!node->rs_started)
+	{
+		if (outerPlanState(node) &&
+			!expression_returns_set((Node *) node->ps.plan->targetlist))
+			outerPlanState(node)->ps_numTuples = node->ps.ps_numTuples;
+
+		node->rs_started = true;
+	}
+
+	/*
 	 * check constant qualifications like (2 > 1), if not already done
 	 */
 	if (node->rs_checkqual)
@@ -218,6 +241,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)
 	resstate->ps.plan = (Plan *) node;
 	resstate->ps.state = estate;
 
+	resstate->rs_started = false;
 	resstate->rs_done = false;
 	resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
 
@@ -294,6 +318,7 @@ ExecEndResult(ResultState *node)
 void
 ExecReScanResult(ResultState *node)
 {
+	node->rs_started = false;
 	node->rs_done = false;
 	node->ps.ps_TupFromTlist = false;
 	node->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a34dcc5..6ec7abb 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -66,6 +66,16 @@ ExecSort(SortState *node)
 
 		SO1_printf("ExecSort: %s\n",
 				   "sorting subplan");
+		/*
+		 * Check bounds according to the required number of tuples
+		 */
+		if (node->ss.ps.ps_numTuples == 0)
+			node->bounded = false;
+		else
+		{
+			node->bounded = true;
+			node->bound = node->ss.ps.ps_numTuples;
+		}
 
 		/*
 		 * Want to scan subplan in the forward direction while creating the
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f6f73f3..3b84959 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1066,6 +1066,8 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	uint64		ps_numTuples;	/* number of tuples required by the upper
+								 * node if any. 0 means all the tuples */
 } PlanState;
 
 /* ----------------
@@ -1114,6 +1116,7 @@ typedef struct ResultState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	ExprState  *resconstantqual;
+	bool		rs_started;		/* are we already called? */
 	bool		rs_done;		/* are we done? */
 	bool		rs_checkqual;	/* do we need to check the qual? */
 } ResultState;
