Hi Alexander,

I took a quick look at the patch. Some things I fixed myself in the attached patch v.21. Here is the summary:

Typo in compare_fractional_path_costs() should be fixed as a separate patch.
Remove unused function estimate_pathkeys_groups.
Extra MemoryContextReset before tuplesort_end() shouldn't be a big deal, so we don't have to add a parameter to tuplesoft_free().
Add comment to maincontext declaration.
Fix typo in INITIAL_MEMTUPSIZE.
Remove trailing whitespace.

Some other things I found:

In tuplesort_reset:
if (state->memtupsize < INITIAL_MEMTUPSIZE)
    <reallocate memtuples to INITIAL_MEMTUPSIZE>
I'd add a comment explaining when and why we have to do this. Also maybe a comment to other allocations of memtuples in tuplesort_begin() and mergeruns(), explaining why it is reallocated and why in maincontext.


In tuplesort_updatemax:
    /* XXX */
    if (spaceUsedOnDisk > state->maxSpaceOnDisk ||
        (spaceUsedOnDisk == state->maxSpaceOnDisk && spaceUsed > state->maxSpace))
XXX. Also comparing bools with '>' looks confusing to me.


We should add a comment on top of tuplesort.c, explaining that we now have a faster way to sort multiple batches of data using the same sort conditions.


The name 'main context' sounds somewhat vague. Maybe 'top context'? Not sure.


In ExecSupportBackwardsScan:
        case T_IncrementalSort:
            return false;
This separate case looks useless, I'd either add a comment explaining why it can't scan backwards, or just return false by default.



That's all I have for today; tomorrow I'll continue with reviewing the planner part of the patch.

--
Alexander Kuzmenkov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d6e387d63..d11777cb90 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -1999,28 +1999,62 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
  119
 (10 rows)
 
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, not pushed down.  For this query, essential optimization is top-N
+-- sort.  But it can't be processed at remote side, because we never do LIMIT
+-- push down.  Assuming that sorting is not worth it to push down, CROSS JOIN
+-- is also not pushed down in order to transfer less tuples over network.
 EXPLAIN (VERBOSE, COSTS OFF)
-SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
-                             QUERY PLAN                              
----------------------------------------------------------------------
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+                            QUERY PLAN                            
+------------------------------------------------------------------
  Limit
-   Output: t1.c1, t2.c1
+   Output: t1.c3, t2.c3
    ->  Sort
-         Output: t1.c1, t2.c1
-         Sort Key: t1.c1, t2.c1
+         Output: t1.c3, t2.c3
+         Sort Key: t1.c3, t2.c3
          ->  Nested Loop
-               Output: t1.c1, t2.c1
+               Output: t1.c3, t2.c3
                ->  Foreign Scan on public.ft1 t1
-                     Output: t1.c1
-                     Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
+                     Output: t1.c3
+                     Remote SQL: SELECT c3 FROM "S 1"."T 1"
                ->  Materialize
-                     Output: t2.c1
+                     Output: t2.c3
                      ->  Foreign Scan on public.ft2 t2
-                           Output: t2.c1
-                           Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
+                           Output: t2.c3
+                           Remote SQL: SELECT c3 FROM "S 1"."T 1"
 (15 rows)
 
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+  c3   |  c3   
+-------+-------
+ 00001 | 00101
+ 00001 | 00102
+ 00001 | 00103
+ 00001 | 00104
+ 00001 | 00105
+ 00001 | 00106
+ 00001 | 00107
+ 00001 | 00108
+ 00001 | 00109
+ 00001 | 00110
+(10 rows)
+
+-- CROSS JOIN, pushed down.  Unlike previous query, remote side is able to
+-- return tuples in given order without full sort, but using index scan and
+-- incremental sort.  This is much cheaper than full sort on local side, even
+-- despite we don't know LIMIT on remote side.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+                                                                            QUERY PLAN                                                                             
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: t1.c1, t2.c1
+   ->  Foreign Scan
+         Output: t1.c1, t2.c1
+         Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2)
+         Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) ORDER BY r1."C 1" ASC NULLS LAST, r2."C 1" ASC NULLS LAST
+(6 rows)
+
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
  c1 | c1  
 ----+-----
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4d2e43c9f0..729086ee29 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -514,7 +514,17 @@ SELECT t1.c1 FROM ft1 t1 WHERE EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c1)
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
--- CROSS JOIN, not pushed down
+-- CROSS JOIN, not pushed down.  For this query, essential optimization is top-N
+-- sort.  But it can't be processed at remote side, because we never do LIMIT
+-- push down.  Assuming that sorting is not worth it to push down, CROSS JOIN
+-- is also not pushed down in order to transfer less tuples over network.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+SELECT t1.c3, t2.c3 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c3, t2.c3 OFFSET 100 LIMIT 10;
+-- CROSS JOIN, pushed down.  Unlike previous query, remote side is able to
+-- return tuples in given order without full sort, but using index scan and
+-- incremental sort.  This is much cheaper than full sort on local side, even
+-- despite we don't know LIMIT on remote side.
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e7d408824e..f2da888056 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3692,6 +3692,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-incrementalsort" xreflabel="enable_incrementalsort">
+      <term><varname>enable_incrementalsort</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_incrementalsort</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of incremental sort
+        steps. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-indexscan" xreflabel="enable_indexscan">
       <term><varname>enable_indexscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 8a58672a94..edd71ae133 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -81,6 +81,8 @@ static void show_upper_qual(List *qual, const char *qlabel,
 				ExplainState *es);
 static void show_sort_keys(SortState *sortstate, List *ancestors,
 			   ExplainState *es);
+static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+					   List *ancestors, ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
@@ -94,7 +96,7 @@ static void show_grouping_set_keys(PlanState *planstate,
 static void show_group_keys(GroupState *gstate, List *ancestors,
 				ExplainState *es);
 static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es);
 static void show_sortorder_options(StringInfo buf, Node *sortexpr,
@@ -102,6 +104,8 @@ static void show_sortorder_options(StringInfo buf, Node *sortexpr,
 static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
 				 List *ancestors, ExplainState *es);
 static void show_sort_info(SortState *sortstate, ExplainState *es);
+static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
+									   ExplainState *es);
 static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 					ExplainState *es);
@@ -1064,6 +1068,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Sort:
 			pname = sname = "Sort";
 			break;
+		case T_IncrementalSort:
+			pname = sname = "Incremental Sort";
+			break;
 		case T_Group:
 			pname = sname = "Group";
 			break;
@@ -1674,6 +1681,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
 			show_sort_info(castNode(SortState, planstate), es);
 			break;
+		case T_IncrementalSort:
+			show_incremental_sort_keys(castNode(IncrementalSortState, planstate),
+									   ancestors, es);
+			show_incremental_sort_info(castNode(IncrementalSortState, planstate),
+									   es);
+			break;
 		case T_MergeAppend:
 			show_merge_append_keys(castNode(MergeAppendState, planstate),
 								   ancestors, es);
@@ -2001,15 +2014,38 @@ static void
 show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
 {
 	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
+	int			presortedCols;
+
+	if (IsA(plan, IncrementalSort))
+		presortedCols = ((IncrementalSort *) plan)->presortedCols;
+	else
+		presortedCols = 0;
 
 	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, presortedCols, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
 }
 
 /*
+ * Show the sort keys for a IncrementalSort node.
+ */
+static void
+show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+						   List *ancestors, ExplainState *es)
+{
+	IncrementalSort *plan = (IncrementalSort *) incrsortstate->ss.ps.plan;
+
+	show_sort_group_keys((PlanState *) incrsortstate, "Sort Key",
+						 plan->sort.numCols, plan->presortedCols,
+						 plan->sort.sortColIdx,
+						 plan->sort.sortOperators, plan->sort.collations,
+						 plan->sort.nullsFirst,
+						 ancestors, es);
+}
+
+/*
  * Likewise, for a MergeAppend node.
  */
 static void
@@ -2019,7 +2055,7 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
 
 	show_sort_group_keys((PlanState *) mstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, 0, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
@@ -2043,7 +2079,7 @@ show_agg_keys(AggState *astate, List *ancestors,
 			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
 		else
 			show_sort_group_keys(outerPlanState(astate), "Group Key",
-								 plan->numCols, plan->grpColIdx,
+								 plan->numCols, 0, plan->grpColIdx,
 								 NULL, NULL, NULL,
 								 ancestors, es);
 
@@ -2112,7 +2148,7 @@ show_grouping_set_keys(PlanState *planstate,
 	if (sortnode)
 	{
 		show_sort_group_keys(planstate, "Sort Key",
-							 sortnode->numCols, sortnode->sortColIdx,
+							 sortnode->numCols, 0, sortnode->sortColIdx,
 							 sortnode->sortOperators, sortnode->collations,
 							 sortnode->nullsFirst,
 							 ancestors, es);
@@ -2169,7 +2205,7 @@ show_group_keys(GroupState *gstate, List *ancestors,
 	/* The key columns refer to the tlist of the child plan */
 	ancestors = lcons(gstate, ancestors);
 	show_sort_group_keys(outerPlanState(gstate), "Group Key",
-						 plan->numCols, plan->grpColIdx,
+						 plan->numCols, 0, plan->grpColIdx,
 						 NULL, NULL, NULL,
 						 ancestors, es);
 	ancestors = list_delete_first(ancestors);
@@ -2182,13 +2218,14 @@ show_group_keys(GroupState *gstate, List *ancestors,
  */
 static void
 show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es)
 {
 	Plan	   *plan = planstate->plan;
 	List	   *context;
 	List	   *result = NIL;
+	List	   *resultPresorted = NIL;
 	StringInfoData sortkeybuf;
 	bool		useprefix;
 	int			keyno;
@@ -2228,9 +2265,13 @@ show_sort_group_keys(PlanState *planstate, const char *qlabel,
 								   nullsFirst[keyno]);
 		/* Emit one property-list item per sort key */
 		result = lappend(result, pstrdup(sortkeybuf.data));
+		if (keyno < nPresortedKeys)
+			resultPresorted = lappend(resultPresorted, exprstr);
 	}
 
 	ExplainPropertyList(qlabel, result, es);
+	if (nPresortedKeys > 0)
+		ExplainPropertyList("Presorted Key", resultPresorted, es);
 }
 
 /*
@@ -2439,6 +2480,95 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 }
 
 /*
+ * If it's EXPLAIN ANALYZE, show tuplesort stats for a incremental sort node
+ */
+static void
+show_incremental_sort_info(IncrementalSortState *incrsortstate,
+						   ExplainState *es)
+{
+	if (es->analyze && incrsortstate->sort_Done &&
+		incrsortstate->tuplesortstate != NULL)
+	{
+		Tuplesortstate *state = (Tuplesortstate *) incrsortstate->tuplesortstate;
+		TuplesortInstrumentation stats;
+		const char *sortMethod;
+		const char *spaceType;
+		long		spaceUsed;
+
+		tuplesort_get_stats(state, &stats);
+		sortMethod = tuplesort_method_name(stats.sortMethod);
+		spaceType = tuplesort_space_type_name(stats.spaceType);
+		spaceUsed = stats.spaceUsed;
+
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Method: %s  %s: %ldkB\n",
+							 sortMethod, spaceType, spaceUsed);
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Groups: %ld\n",
+							 incrsortstate->groupsCount);
+		}
+		else
+		{
+			ExplainPropertyText("Sort Method", sortMethod, es);
+			ExplainPropertyInteger("Sort Space Used", "kB", spaceUsed, es);
+			ExplainPropertyText("Sort Space Type", spaceType, es);
+			ExplainPropertyInteger("Sort Groups:", NULL,
+								   incrsortstate->groupsCount, es);
+		}
+	}
+
+	if (incrsortstate->shared_info != NULL)
+	{
+		int			n;
+		bool		opened_group = false;
+
+		for (n = 0; n < incrsortstate->shared_info->num_workers; n++)
+		{
+			TuplesortInstrumentation *sinstrument;
+			const char *sortMethod;
+			const char *spaceType;
+			long		spaceUsed;
+			int64		groupsCount;
+
+			sinstrument = &incrsortstate->shared_info->sinfo[n].sinstrument;
+			groupsCount = incrsortstate->shared_info->sinfo[n].groupsCount;
+			if (sinstrument->sortMethod == SORT_TYPE_STILL_IN_PROGRESS)
+				continue;		/* ignore any unfilled slots */
+			sortMethod = tuplesort_method_name(sinstrument->sortMethod);
+			spaceType = tuplesort_space_type_name(sinstrument->spaceType);
+			spaceUsed = sinstrument->spaceUsed;
+
+			if (es->format == EXPLAIN_FORMAT_TEXT)
+			{
+				appendStringInfoSpaces(es->str, es->indent * 2);
+				appendStringInfo(es->str,
+								 "Worker %d:  Sort Method: %s  %s: %ldkB  Groups: %ld\n",
+								 n, sortMethod, spaceType, spaceUsed, groupsCount);
+			}
+			else
+			{
+				if (!opened_group)
+				{
+					ExplainOpenGroup("Workers", "Workers", false, es);
+					opened_group = true;
+				}
+				ExplainOpenGroup("Worker", NULL, true, es);
+				ExplainPropertyInteger("Worker Number", NULL, n, es);
+				ExplainPropertyText("Sort Method", sortMethod, es);
+				ExplainPropertyInteger("Sort Space Used", "kB", spaceUsed, es);
+				ExplainPropertyText("Sort Space Type", spaceType, es);
+				ExplainPropertyInteger("Sort Groups", NULL, groupsCount, es);
+				ExplainCloseGroup("Worker", NULL, true, es);
+			}
+		}
+		if (opened_group)
+			ExplainCloseGroup("Workers", "Workers", false, es);
+	}
+}
+
+/*
  * Show information on hash buckets/batches.
  */
 static void
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index cc09895fa5..572aca05fb 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,8 +24,8 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
        nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
-       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
-       nodeValuesscan.o \
+       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o \
+       nodeSort.o nodeIncrementalSort.o nodeUnique.o nodeValuesscan.o \
        nodeCtescan.o nodeNamedtuplestorescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
        nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 9e78421978..34e05330ea 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -31,6 +31,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -253,6 +254,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecReScanIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecReScanGroup((GroupState *) node);
 			break;
@@ -525,8 +530,12 @@ ExecSupportsBackwardScan(Plan *node)
 		case T_CteScan:
 		case T_Material:
 		case T_Sort:
+			/* these don't evaluate tlist */
 			return true;
 
+		case T_IncrementalSort:
+			return false;
+
 		case T_LockRows:
 		case T_Limit:
 			return ExecSupportsBackwardScan(outerPlan(node));
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 52f1a96db5..fc3910502b 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -32,6 +32,7 @@
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -281,6 +282,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
+			break;
 
 		default:
 			break;
@@ -494,6 +499,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
+			break;
 
 		default:
 			break;
@@ -918,6 +927,7 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 			break;
 		case T_HashState:
 		case T_SortState:
+		case T_IncrementalSortState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
 
@@ -978,6 +988,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_SortState:
 			ExecSortRetrieveInstrumentation((SortState *) planstate);
 			break;
+		case T_IncrementalSortState:
+			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
+			break;
 		case T_HashState:
 			ExecHashRetrieveInstrumentation((HashState *) planstate);
 			break;
@@ -1227,6 +1240,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
+												pwcxt);
+			break;
 
 		default:
 			break;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 43a27a9af2..17163448a3 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -88,6 +88,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 												estate, eflags);
 			break;
 
+		case T_IncrementalSort:
+			result = (PlanState *) ExecInitIncrementalSort((IncrementalSort *) node,
+														   estate, eflags);
+			break;
+
 		case T_Group:
 			result = (PlanState *) ExecInitGroup((Group *) node,
 												 estate, eflags);
@@ -695,6 +701,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecEndIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecEndGroup((GroupState *) node);
 			break;
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 1b1334006f..77013909a8 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -373,7 +373,7 @@ initialize_phase(AggState *aggstate, int newphase)
 												  sortnode->collations,
 												  sortnode->nullsFirst,
 												  work_mem,
-												  NULL, false);
+												  NULL, false, false);
 	}
 
 	aggstate->current_phase = newphase;
@@ -460,7 +460,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 									 pertrans->sortOperators,
 									 pertrans->sortCollations,
 									 pertrans->sortNullsFirst,
-									 work_mem, NULL, false);
+									 work_mem, NULL, false, false);
 	}
 
 	/*
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c
new file mode 100644
index 0000000000..1f5e41f95a
--- /dev/null
+++ b/src/backend/executor/nodeIncrementalSort.c
@@ -0,0 +1,631 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncremenalSort.c
+ *	  Routines to handle incremental sorting of relations.
+ *
+ * DESCRIPTION
+ *
+ *		Incremental sort is a specially optimized kind of multikey sort used
+ *		when the input is already presorted by a prefix of the required keys
+ *		list.  Thus, when it's required to sort by (key1, key2 ... keyN) and
+ *		result is already sorted by (key1, key2 ... keyM), M < N, we sort groups
+ *		where values of (key1, key2 ... keyM) are equal.
+ *
+ *		Consider the following example.  We have input tuples consisting from
+ *		two integers (x, y) already presorted by x, while it's required to
+ *		sort them by x and y.  Let input tuples be following.
+ *
+ *		(1, 5)
+ *		(1, 2)
+ *		(2, 10)
+ *		(2, 1)
+ *		(2, 5)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort algorithm would sort by y following groups, which have
+ *		equal x, individually:
+ *			(1, 5) (1, 2)
+ *			(2, 10) (2, 1) (2, 5)
+ *			(3, 3) (3, 7)
+ *
+ *		After sorting these groups and putting them altogether, we would get
+ *		following tuple set which is actually sorted by x and y.
+ *
+ *		(1, 2)
+ *		(1, 5)
+ *		(2, 1)
+ *		(2, 5)
+ *		(2, 10)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort is faster than full sort on large datasets.  But
+ *		the case of most huge benefit of incremental sort is queries with
+ *		LIMIT because incremental sort can return first tuples without reading
+ *		whole input dataset.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeIncremenalSort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/execdebug.h"
+#include "executor/nodeIncrementalSort.h"
+#include "miscadmin.h"
+#include "utils/lsyscache.h"
+#include "utils/tuplesort.h"
+
+/*
+ * Prepare information for presortedKeys comparison.
+ */
+static void
+preparePresortedCols(IncrementalSortState *node)
+{
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	int					presortedCols,
+						i;
+
+	Assert(IsA(plannode, IncrementalSort));
+	presortedCols = plannode->presortedCols;
+
+	node->presortedKeys = (PresortedKeyData *) palloc(presortedCols *
+													sizeof(PresortedKeyData));
+
+	for (i = 0; i < presortedCols; i++)
+	{
+		Oid					equalityOp,
+							equalityFunc;
+		PresortedKeyData   *key;
+
+		key = &node->presortedKeys[i];
+		key->attno = plannode->sort.sortColIdx[i];
+
+		equalityOp = get_equality_op_for_ordering_op(
+										plannode->sort.sortOperators[i], NULL);
+		if (!OidIsValid(equalityOp))
+			elog(ERROR, "missing equality operator for ordering operator %u",
+					plannode->sort.sortOperators[i]);
+
+		equalityFunc = get_opcode(equalityOp);
+		if (!OidIsValid(equalityFunc))
+			elog(ERROR, "missing function for operator %u", equalityOp);
+
+		/* Lookup the comparison function */
+		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
+
+		/* We can initialize the callinfo just once and re-use it */
+		InitFunctionCallInfoData(key->fcinfo, &key->flinfo, 2,
+								plannode->sort.collations[i], NULL, NULL);
+		key->fcinfo.argnull[0] = false;
+		key->fcinfo.argnull[1] = false;
+	}
+}
+
+/*
+ * Check if first "presortedCols" sort values are equal.
+ */
+static bool
+cmpSortPresortedCols(IncrementalSortState *node, TupleTableSlot *a,
+															TupleTableSlot *b)
+{
+	int n, i;
+
+	Assert(IsA(node->ss.ps.plan, IncrementalSort));
+
+	n = ((IncrementalSort *) node->ss.ps.plan)->presortedCols;
+
+	for (i = n - 1; i >= 0; i--)
+	{
+		Datum				datumA,
+							datumB,
+							result;
+		bool				isnullA,
+							isnullB;
+		AttrNumber			attno = node->presortedKeys[i].attno;
+		PresortedKeyData   *key;
+
+		datumA = slot_getattr(a, attno, &isnullA);
+		datumB = slot_getattr(b, attno, &isnullB);
+
+		/* Special case for NULL-vs-NULL, else use standard comparison */
+		if (isnullA || isnullB)
+		{
+			if (isnullA == isnullB)
+				continue;
+			else
+				return false;
+		}
+
+		key = &node->presortedKeys[i];
+
+		key->fcinfo.arg[0] = datumA;
+		key->fcinfo.arg[1] = datumB;
+
+		/* just for paranoia's sake, we reset isnull each time */
+		key->fcinfo.isnull = false;
+
+		result = FunctionCallInvoke(&key->fcinfo);
+
+		/* Check for null result, since caller is clearly not expecting one */
+		if (key->fcinfo.isnull)
+			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
+
+		if (!DatumGetBool(result))
+			return false;
+	}
+	return true;
+}
+
+/*
+ * Copying of tuples to the node->grpPivotSlot introduces some overhead.  It's
+ * especially notable when groups are containing one or few tuples.  In order
+ * to cope this problem we don't copy pivot tuple before the group contains
+ * at least MIN_GROUP_SIZE of tuples.  Surely, it might reduce efficiency of
+ * incremental sort, but it reduces the probability of regression.
+ */
+#define MIN_GROUP_SIZE 32
+
+/* ----------------------------------------------------------------
+ *		ExecIncrementalSort
+ *
+ *		Assuming that outer subtree returns tuple presorted by some prefix
+ *		of target sort columns, performs incremental sort.  It fetches
+ *		groups of tuples where prefix sort columns are equal and sorts them
+ *		using tuplesort.  This approach allows to evade sorting of whole
+ *		dataset.  Besides taking less memory and being faster, it allows to
+ *		start returning tuples before fetching full dataset from outer
+ *		subtree.
+ *
+ *		Conditions:
+ *		  -- none.
+ *
+ *		Initial States:
+ *		  -- the outer child is prepared to return the first tuple.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecIncrementalSort(PlanState *pstate)
+{
+	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
+	EState			   *estate;
+	ScanDirection		dir;
+	Tuplesortstate	   *tuplesortstate;
+	TupleTableSlot	   *slot;
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	PlanState		   *outerNode;
+	TupleDesc			tupDesc;
+	int64				nTuples = 0;
+
+	/*
+	 * get state info from node
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "entering routine");
+
+	estate = node->ss.ps.state;
+	dir = estate->es_direction;
+	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
+
+	/*
+	 * Return next tuple from sorted set if any.
+	 */
+	if (node->sort_Done)
+	{
+		slot = node->ss.ps.ps_ResultTupleSlot;
+		if (tuplesort_gettupleslot(tuplesortstate,
+									  ScanDirectionIsForward(dir),
+									  false, slot, NULL) || node->finished)
+			return slot;
+	}
+
+	/*
+	 * If first time through, read all tuples from outer plan and pass them to
+	 * tuplesort.c. Subsequent calls just fetch tuples from tuplesort.
+	 */
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "sorting subplan");
+
+	/*
+	 * Want to scan subplan in the forward direction while creating the
+	 * sorted data.
+	 */
+	estate->es_direction = ForwardScanDirection;
+
+	/*
+	 * Initialize tuplesort module.
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "calling tuplesort_begin");
+
+	outerNode = outerPlanState(node);
+	tupDesc = ExecGetResultType(outerNode);
+
+	if (node->tuplesortstate == NULL)
+	{
+		/*
+		 * We are going to process the first group of presorted data.
+		 * Initialize support structures for cmpSortPresortedCols - already
+		 * sorted columns.
+		 */
+		preparePresortedCols(node);
+
+		/*
+		 * Pass all the columns to tuplesort.  We pass to tuple sort groups
+		 * of at least MIN_GROUP_SIZE size.  Thus, these groups doesn't
+		 * necessary have equal value of the first column.  We unlikely will
+		 * have huge groups with incremental sort.  Therefore usage of
+		 * abbreviated keys would be likely a waste of time.
+		 */
+		tuplesortstate = tuplesort_begin_heap(
+									tupDesc,
+									plannode->sort.numCols,
+									plannode->sort.sortColIdx,
+									plannode->sort.sortOperators,
+									plannode->sort.collations,
+									plannode->sort.nullsFirst,
+									work_mem,
+									NULL,
+									false,
+									true);
+		node->tuplesortstate = (void *) tuplesortstate;
+	}
+	else
+	{
+		/* Next group of presorted data */
+		tuplesort_reset((Tuplesortstate *) node->tuplesortstate);
+	}
+	node->groupsCount++;
+
+	/* Calculate remaining bound for bounded sort */
+	if (node->bounded)
+		tuplesort_set_bound(tuplesortstate, node->bound - node->bound_Done);
+
+	/* Put saved tuple to tuplesort if any */
+	if (!TupIsNull(node->grpPivotSlot))
+	{
+		tuplesort_puttupleslot(tuplesortstate, node->grpPivotSlot);
+		ExecClearTuple(node->grpPivotSlot);
+		nTuples++;
+	}
+
+	/*
+	 * Put next group of tuples where presortedCols sort values are equal to
+	 * tuplesort.
+	 */
+	for (;;)
+	{
+		slot = ExecProcNode(outerNode);
+
+		if (TupIsNull(slot))
+		{
+			node->finished = true;
+			break;
+		}
+
+		/* Put next group of presorted data to the tuplesort */
+		if (nTuples < MIN_GROUP_SIZE)
+		{
+			tuplesort_puttupleslot(tuplesortstate, slot);
+
+			/* Save last tuple in minimal group */
+			if (nTuples == MIN_GROUP_SIZE - 1)
+				ExecCopySlot(node->grpPivotSlot, slot);
+			nTuples++;
+		}
+		else
+		{
+			/* Iterate while presorted cols are the same as in saved tuple */
+			if (cmpSortPresortedCols(node, node->grpPivotSlot, slot))
+			{
+				tuplesort_puttupleslot(tuplesortstate, slot);
+				nTuples++;
+			}
+			else
+			{
+				ExecCopySlot(node->grpPivotSlot, slot);
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Complete the sort.
+	 */
+	tuplesort_performsort(tuplesortstate);
+
+	/*
+	 * restore to user specified direction
+	 */
+	estate->es_direction = dir;
+
+	/*
+	 * finally set the sorted flag to true
+	 */
+	node->sort_Done = true;
+	node->bounded_Done = node->bounded;
+	if (node->shared_info && node->am_worker)
+	{
+		TuplesortInstrumentation *si;
+
+		Assert(IsParallelWorker());
+		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+		si = &node->shared_info->sinfo[ParallelWorkerNumber].sinstrument;
+		tuplesort_get_stats(tuplesortstate, si);
+		node->shared_info->sinfo[ParallelWorkerNumber].groupsCount =
+															node->groupsCount;
+	}
+
+	/*
+	 * Adjust bound_Done with number of tuples we've actually sorted.
+	 */
+	if (node->bounded)
+	{
+		if (node->finished)
+			node->bound_Done = node->bound;
+		else
+			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
+	}
+
+	SO1_printf("ExecIncrementalSort: %s\n", "sorting done");
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "retrieving tuple from tuplesort");
+
+	/*
+	 * Get the first or next tuple from tuplesort. Returns NULL if no more
+	 * tuples.
+	 */
+	slot = node->ss.ps.ps_ResultTupleSlot;
+	(void) tuplesort_gettupleslot(tuplesortstate,
+								  ScanDirectionIsForward(dir),
+								  false, slot, NULL);
+	return slot;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecInitIncrementalSort
+ *
+ *		Creates the run-time state information for the sort node
+ *		produced by the planner and initializes its outer subtree.
+ * ----------------------------------------------------------------
+ */
+IncrementalSortState *
+ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
+{
+	IncrementalSortState   *incrsortstate;
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "initializing sort node");
+
+	/*
+	 * Incremental sort can't be used with either EXEC_FLAG_REWIND,
+	 * EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK, because we hold only current
+	 * bucket in tuplesortstate.
+	 */
+	Assert((eflags & (EXEC_FLAG_REWIND |
+					  EXEC_FLAG_BACKWARD |
+					  EXEC_FLAG_MARK)) == 0);
+
+	/*
+	 * create state structure
+	 */
+	incrsortstate = makeNode(IncrementalSortState);
+	incrsortstate->ss.ps.plan = (Plan *) node;
+	incrsortstate->ss.ps.state = estate;
+	incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
+
+	incrsortstate->bounded = false;
+	incrsortstate->sort_Done = false;
+	incrsortstate->finished = false;
+	incrsortstate->tuplesortstate = NULL;
+	incrsortstate->grpPivotSlot = NULL;
+	incrsortstate->bound_Done = 0;
+	incrsortstate->groupsCount = 0;
+	incrsortstate->presortedKeys = NULL;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * Sort nodes don't initialize their ExprContexts because they never call
+	 * ExecQual or ExecProject.
+	 */
+
+	/*
+	 * initialize child nodes
+	 *
+	 * We shield the child node from the need to support REWIND, BACKWARD, or
+	 * MARK/RESTORE.
+	 */
+	eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
+
+	outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+	/*
+	 * Initialize scan slot and type.
+	 */
+	ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss);
+
+	/*
+	 * Initialize return slot and type. No need to initialize projection info because
+	 * this node doesn't do projections.
+	 */
+	ExecInitResultTupleSlotTL(estate, &incrsortstate->ss.ps);
+	incrsortstate->ss.ps.ps_ProjInfo = NULL;
+
+	/* make standalone slot to store previous tuple from outer node */
+	incrsortstate->grpPivotSlot = MakeSingleTupleTableSlot(
+							ExecGetResultType(outerPlanState(incrsortstate)));
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "sort node initialized");
+
+	return incrsortstate;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecEndIncrementalSort(node)
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndIncrementalSort(IncrementalSortState *node)
+{
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "shutting down sort node");
+
+	/*
+	 * clean out the tuple table
+	 */
+	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	/* must drop stanalone tuple slot from outer node */
+	ExecDropSingleTupleTableSlot(node->grpPivotSlot);
+
+	/*
+	 * Release tuplesort resources
+	 */
+	if (node->tuplesortstate != NULL)
+		tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+
+	/*
+	 * shut down the subplan
+	 */
+	ExecEndNode(outerPlanState(node));
+
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "sort node shutdown");
+}
+
+void
+ExecReScanIncrementalSort(IncrementalSortState *node)
+{
+	PlanState  *outerPlan = outerPlanState(node);
+
+	/*
+	 * If we haven't sorted yet, just return. If outerplan's chgParam is not
+	 * NULL then it will be re-scanned by ExecProcNode, else no reason to
+	 * re-scan it at all.
+	 */
+	if (!node->sort_Done)
+		return;
+
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+
+	/*
+	 * If subnode is to be rescanned then we forget previous sort results; we
+	 * have to re-read the subplan and re-sort.  Also must re-sort if the
+	 * bounded-sort parameters changed or we didn't select randomAccess.
+	 *
+	 * Otherwise we can just rewind and rescan the sorted output.
+	 */
+	node->sort_Done = false;
+	tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+	node->bound_Done = 0;
+
+	/*
+	 * if chgParam of subnode is not null then plan will be re-scanned by
+	 * first ExecProcNode.
+	 */
+	if (outerPlan->chgParam == NULL)
+		ExecReScan(outerPlan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecSortEstimate
+ *
+ *		Estimate space required to propagate sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
+	size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeDSM
+ *
+ *		Initialize DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ pcxt->nworkers * sizeof(IncrementalSortInfo);
+	node->shared_info = shm_toc_allocate(pcxt->toc, size);
+	/* ensure any unfilled slots will contain zeroes */
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeWorker
+ *
+ *		Attach worker to DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
+{
+	node->shared_info =
+		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	node->am_worker = true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRetrieveInstrumentation
+ *
+ *		Transfer sort statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
+{
+	Size		size;
+	SharedIncrementalSortInfo *si;
+
+	if (node->shared_info == NULL)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ node->shared_info->num_workers * sizeof(IncrementalSortInfo);
+	si = palloc(size);
+	memcpy(si, node->shared_info, size);
+	node->shared_info = si;
+}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 73f16c9aba..457e774b3d 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -93,7 +93,9 @@ ExecSort(PlanState *pstate)
 											  plannode->collations,
 											  plannode->nullsFirst,
 											  work_mem,
-											  NULL, node->randomAccess);
+											  NULL,
+											  node->randomAccess,
+											  false);
 		if (node->bounded)
 			tuplesort_set_bound(tuplesortstate, node->bound);
 		node->tuplesortstate = (void *) tuplesortstate;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index c7293a60d7..b93a7a1d43 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -922,6 +922,24 @@ _copyMaterial(const Material *from)
 
 
 /*
+ * CopySortFields
+ *
+ *		This function copies the fields of the Sort node.  It is used by
+ *		all the copy functions for classes which inherit from Sort.
+ */
+static void
+CopySortFields(const Sort *from, Sort *newnode)
+{
+	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+	COPY_SCALAR_FIELD(numCols);
+	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+}
+
+/*
  * _copySort
  */
 static Sort *
@@ -932,13 +950,29 @@ _copySort(const Sort *from)
 	/*
 	 * copy node superclass fields
 	 */
-	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+	CopySortFields(from, newnode);
 
-	COPY_SCALAR_FIELD(numCols);
-	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
-	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+	return newnode;
+}
+
+
+/*
+ * _copyIncrementalSort
+ */
+static IncrementalSort *
+_copyIncrementalSort(const IncrementalSort *from)
+{
+	IncrementalSort	   *newnode = makeNode(IncrementalSort);
+
+	/*
+	 * copy node superclass fields
+	 */
+	CopySortFields((const Sort *) from, (Sort *) newnode);
+
+	/*
+	 * copy remainder of node
+	 */
+	COPY_SCALAR_FIELD(presortedCols);
 
 	return newnode;
 }
@@ -4834,6 +4868,9 @@ copyObjectImpl(const void *from)
 		case T_Sort:
 			retval = _copySort(from);
 			break;
+		case T_IncrementalSort:
+			retval = _copyIncrementalSort(from);
+			break;
 		case T_Group:
 			retval = _copyGroup(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f61ae03ac5..9d9c90e2be 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -877,12 +877,10 @@ _outMaterial(StringInfo str, const Material *node)
 }
 
 static void
-_outSort(StringInfo str, const Sort *node)
+_outSortInfo(StringInfo str, const Sort *node)
 {
 	int			i;
 
-	WRITE_NODE_TYPE("SORT");
-
 	_outPlanInfo(str, (const Plan *) node);
 
 	WRITE_INT_FIELD(numCols);
@@ -905,6 +903,24 @@ _outSort(StringInfo str, const Sort *node)
 }
 
 static void
+_outSort(StringInfo str, const Sort *node)
+{
+	WRITE_NODE_TYPE("SORT");
+
+	_outSortInfo(str, node);
+}
+
+static void
+_outIncrementalSort(StringInfo str, const IncrementalSort *node)
+{
+	WRITE_NODE_TYPE("INCREMENTALSORT");
+
+	_outSortInfo(str, (const Sort *) node);
+
+	WRITE_INT_FIELD(presortedCols);
+}
+
+static void
 _outUnique(StringInfo str, const Unique *node)
 {
 	int			i;
@@ -3756,6 +3772,9 @@ outNode(StringInfo str, const void *obj)
 			case T_Sort:
 				_outSort(str, obj);
 				break;
+			case T_IncrementalSort:
+				_outIncrementalSort(str, obj);
+				break;
 			case T_Unique:
 				_outUnique(str, obj);
 				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index fd4586e73d..338bf8b835 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2067,12 +2067,13 @@ _readMaterial(void)
 }
 
 /*
- * _readSort
+ * ReadCommonSort
+ *	Assign the basic stuff of all nodes that inherit from Sort
  */
-static Sort *
-_readSort(void)
+static void
+ReadCommonSort(Sort *local_node)
 {
-	READ_LOCALS(Sort);
+	READ_TEMP_LOCALS();
 
 	ReadCommonPlan(&local_node->plan);
 
@@ -2081,6 +2082,32 @@ _readSort(void)
 	READ_OID_ARRAY(sortOperators, local_node->numCols);
 	READ_OID_ARRAY(collations, local_node->numCols);
 	READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+}
+
+/*
+ * _readSort
+ */
+static Sort *
+_readSort(void)
+{
+	READ_LOCALS_NO_FIELDS(Sort);
+
+	ReadCommonSort(local_node);
+
+	READ_DONE();
+}
+
+/*
+ * _readIncrementalSort
+ */
+static IncrementalSort *
+_readIncrementalSort(void)
+{
+	READ_LOCALS(IncrementalSort);
+
+	ReadCommonSort(&local_node->sort);
+
+	READ_INT_FIELD(presortedCols);
 
 	READ_DONE();
 }
@@ -2648,6 +2675,8 @@ parseNodeString(void)
 		return_value = _readMaterial();
 	else if (MATCH("SORT", 4))
 		return_value = _readSort();
+	else if (MATCH("INCREMENTALSORT", 15))
+		return_value = _readIncrementalSort();
 	else if (MATCH("GROUP", 5))
 		return_value = _readGroup();
 	else if (MATCH("AGG", 3))
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 43f4e75748..c28aa4affb 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3655,6 +3655,10 @@ print_path(PlannerInfo *root, Path *path, int indent)
 			ptype = "Sort";
 			subpath = ((SortPath *) path)->subpath;
 			break;
+		case T_IncrementalSortPath:
+			ptype = "IncrementalSort";
+			subpath = ((SortPath *) path)->subpath;
+			break;
 		case T_GroupPath:
 			ptype = "Group";
 			subpath = ((GroupPath *) path)->subpath;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 47729de896..e8cfdd81fd 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -128,6 +128,7 @@ bool		enable_indexonlyscan = true;
 bool		enable_bitmapscan = true;
 bool		enable_tidscan = true;
 bool		enable_sort = true;
+bool		enable_incrementalsort = true;
 bool		enable_hashagg = true;
 bool		enable_nestloop = true;
 bool		enable_material = true;
@@ -1615,6 +1616,13 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  *	  Determines and returns the cost of sorting a relation, including
  *	  the cost of reading the input data.
  *
+ * Sort could be either full sort of relation or incremental sort when we already
+ * have data presorted by some of required pathkeys.  In the second case
+ * we estimate number of groups which source data is divided to by presorted
+ * pathkeys.  And then estimate cost of sorting each individual group assuming
+ * data is divided into group uniformly.  Also, if LIMIT is specified then
+ * we have to pull from source and sort only some of total groups.
+ *
  * If the total volume of data to sort is less than sort_mem, we will do
  * an in-memory sort, which requires no I/O and about t*log2(t) tuple
  * comparisons for t tuples.
@@ -1641,7 +1649,9 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  * work that has to be done to prepare the inputs to the comparison operators.
  *
  * 'pathkeys' is a list of sort keys
- * 'input_cost' is the total cost for reading the input data
+ * 'presorted_keys' is a number of pathkeys already presorted in given path
+ * 'input_startup_cost' is the startup cost for reading the input data
+ * 'input_total_cost' is the total cost for reading the input data
  * 'tuples' is the number of tuples in the relation
  * 'width' is the average tuple width in bytes
  * 'comparison_cost' is the extra cost per comparison, if any
@@ -1657,19 +1667,28 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  */
 void
 cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples)
 {
-	Cost		startup_cost = input_cost;
-	Cost		run_cost = 0;
+	Cost		startup_cost = input_startup_cost;
+	Cost		run_cost = 0,
+				rest_cost,
+				group_cost,
+				input_run_cost = input_total_cost - input_startup_cost;
 	double		input_bytes = relation_byte_size(tuples, width);
 	double		output_bytes;
 	double		output_tuples;
+	double		num_groups,
+				group_input_bytes,
+				group_tuples;
 	long		sort_mem_bytes = sort_mem * 1024L;
 
 	if (!enable_sort)
 		startup_cost += disable_cost;
+	if (!enable_incrementalsort)
+		presorted_keys = 0;
 
 	path->rows = tuples;
 
@@ -1695,13 +1714,56 @@ cost_sort(Path *path, PlannerInfo *root,
 		output_bytes = input_bytes;
 	}
 
-	if (output_bytes > sort_mem_bytes)
+	/*
+	 * Estimate number of groups which dataset is divided by presorted keys.
+	 */
+	if (presorted_keys > 0)
+	{
+		List	   *presortedExprs = NIL;
+		ListCell   *l;
+		int			i = 0;
+
+		/* Extract presorted keys as list of expressions */
+		foreach(l, pathkeys)
+		{
+			PathKey *key = (PathKey *)lfirst(l);
+			EquivalenceMember *member = (EquivalenceMember *)
+										linitial(key->pk_eclass->ec_members);
+
+			presortedExprs = lappend(presortedExprs, member->em_expr);
+
+			i++;
+			if (i >= presorted_keys)
+				break;
+		}
+
+		/* Estimate number of groups with equal presorted keys */
+		num_groups = estimate_num_groups(root, presortedExprs, tuples, NULL);
+
+		/*
+		 * Estimate average cost of sorting of one group where presorted keys
+		 * are equal.  Incremental sort is sensitive to distribution of tuples
+		 * to the groups, where we're relying on quite rough assumptions.  Thus,
+		 * we're pessimistic about incremental sort performance and increase
+		 * its average group size by half.
+		 */
+		group_input_bytes = 1.5 * input_bytes / num_groups;
+		group_tuples = 1.5 * tuples / num_groups;
+	}
+	else
+	{
+		num_groups = 1.0;
+		group_input_bytes = input_bytes;
+		group_tuples = tuples;
+	}
+
+	if (output_bytes > sort_mem_bytes && group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll have to use a disk-based sort of all the tuples
 		 */
-		double		npages = ceil(input_bytes / BLCKSZ);
-		double		nruns = input_bytes / sort_mem_bytes;
+		double		npages = ceil(group_input_bytes / BLCKSZ);
+		double		nruns = group_input_bytes / sort_mem_bytes;
 		double		mergeorder = tuplesort_merge_order(sort_mem_bytes);
 		double		log_runs;
 		double		npageaccesses;
@@ -1711,7 +1773,7 @@ cost_sort(Path *path, PlannerInfo *root,
 		 *
 		 * Assume about N log2 N comparisons
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
 
 		/* Disk costs */
 
@@ -1722,10 +1784,10 @@ cost_sort(Path *path, PlannerInfo *root,
 			log_runs = 1.0;
 		npageaccesses = 2.0 * npages * log_runs;
 		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
-		startup_cost += npageaccesses *
+		group_cost += npageaccesses *
 			(seq_page_cost * 0.75 + random_page_cost * 0.25);
 	}
-	else if (tuples > 2 * output_tuples || input_bytes > sort_mem_bytes)
+	else if (group_tuples > 2 * output_tuples || group_input_bytes > sort_mem_bytes)
 	{
 		/*
 		 * We'll use a bounded heap-sort keeping just K tuples in memory, for
@@ -1733,14 +1795,33 @@ cost_sort(Path *path, PlannerInfo *root,
 		 * factor is a bit higher than for quicksort.  Tweak it so that the
 		 * cost curve is continuous at the crossover point.
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(2.0 * output_tuples);
+		group_cost = comparison_cost * group_tuples * LOG2(2.0 * output_tuples);
 	}
 	else
 	{
-		/* We'll use plain quicksort on all the input tuples */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		/*
+		 * We'll use plain quicksort on all the input tuples.  If it appears
+		 * that we expect less than two tuples per sort group then assume
+		 * logarithmic part of estimate to be 1.
+		 */
+		if (group_tuples >= 2.0)
+			group_cost = comparison_cost * group_tuples * LOG2(group_tuples);
+		else
+			group_cost = comparison_cost * group_tuples;
 	}
 
+	/* Add per group cost of fetching tuples from input */
+	group_cost += input_run_cost / num_groups;
+
+	/*
+	 * We've to sort first group to start output from node. Sorting rest of
+	 * groups are required to return all the other tuples.
+	 */
+	startup_cost += group_cost;
+	rest_cost = (num_groups * (output_tuples / tuples) - 1.0) * group_cost;
+	if (rest_cost > 0.0)
+		run_cost += rest_cost;
+
 	/*
 	 * Also charge a small amount (arbitrarily set equal to operator cost) per
 	 * extracted tuple.  We don't charge cpu_tuple_cost because a Sort node
@@ -1751,6 +1832,20 @@ cost_sort(Path *path, PlannerInfo *root,
 	 */
 	run_cost += cpu_operator_cost * tuples;
 
+	/* Extra costs of incremental sort */
+	if (presorted_keys > 0)
+	{
+		/*
+		 * In incremental sort case we also have to cost the detection of
+		 * sort groups.  This turns out to be one extra copy and comparison
+		 * per tuple.
+		 */
+		run_cost += (cpu_tuple_cost + comparison_cost) * tuples;
+
+		/* Cost of per group tuplesort reset */
+		run_cost += 2.0 * cpu_tuple_cost * num_groups;
+	}
+
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
@@ -2728,6 +2823,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  outersortkeys,
+				  pathkeys_common(outer_path->pathkeys, outersortkeys),
+				  outer_path->startup_cost,
 				  outer_path->total_cost,
 				  outer_path_rows,
 				  outer_path->pathtarget->width,
@@ -2754,6 +2851,8 @@ initial_cost_mergejoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 		cost_sort(&sort_path,
 				  root,
 				  innersortkeys,
+				  pathkeys_common(inner_path->pathkeys, innersortkeys),
+				  inner_path->startup_cost,
 				  inner_path->total_cost,
 				  inner_path_rows,
 				  inner_path->pathtarget->width,
@@ -2990,18 +3089,17 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path,
 	 * inner path is to be used directly (without sorting) and it doesn't
 	 * support mark/restore.
 	 *
-	 * Since the inner side must be ordered, and only Sorts and IndexScans can
-	 * create order to begin with, and they both support mark/restore, you
-	 * might think there's no problem --- but you'd be wrong.  Nestloop and
-	 * merge joins can *preserve* the order of their inputs, so they can be
-	 * selected as the input of a mergejoin, and they don't support
-	 * mark/restore at present.
+	 * Sorts and IndexScans support mark/restore, but IncrementalSorts don't.
+	 * Also Nestloop and merge joins can *preserve* the order of their inputs,
+	 * so they can be selected as the input of a mergejoin, and they don't
+	 * support mark/restore at present.
 	 *
 	 * We don't test the value of enable_material here, because
 	 * materialization is required for correctness in this case, and turning
 	 * it off does not entitle us to deliver an invalid plan.
 	 */
-	else if (innersortkeys == NIL &&
+	else if ((innersortkeys == NIL ||
+			  pathkeys_common(innersortkeys, inner_path->pathkeys) > 0) &&
 			 !ExecSupportsMarkRestore(inner_path))
 		path->materialize_inner = true;
 
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index 6d1cc3b8a0..57fe52dc98 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -22,10 +22,12 @@
 #include "nodes/nodeFuncs.h"
 #include "nodes/plannodes.h"
 #include "optimizer/clauses.h"
+#include "optimizer/cost.h"
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/tlist.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 
 
 static bool pathkey_is_redundant(PathKey *new_pathkey, List *pathkeys);
@@ -327,6 +329,51 @@ pathkeys_contained_in(List *keys1, List *keys2)
 	return false;
 }
 
+
+/*
+ * pathkeys_common_contained_in
+ *    Same as pathkeys_contained_in, but also sets length of longest
+ *    common prefix of keys1 and keys2.
+ */
+bool
+pathkeys_common_contained_in(List *keys1, List *keys2, int *n_common)
+{
+	int			n = 0;
+	ListCell   *key1,
+			   *key2;
+
+	forboth(key1, keys1, key2, keys2)
+	{
+		PathKey    *pathkey1 = (PathKey *) lfirst(key1);
+		PathKey    *pathkey2 = (PathKey *) lfirst(key2);
+
+		if (pathkey1 != pathkey2)
+		{
+			*n_common = n;
+			return false;
+		}
+		n++;
+	}
+
+	*n_common = n;
+	return (key1 == NULL); 
+}
+
+
+/*
+ * pathkeys_common
+ *    Returns length of longest common prefix of keys1 and keys2.
+ */
+int
+pathkeys_common(List *keys1, List *keys2)
+{
+	int		n;
+
+	(void) pathkeys_common_contained_in(keys1, keys2, &n);
+	return n;
+}
+
+
 /*
  * get_cheapest_path_for_pathkeys
  *	  Find the cheapest path (according to the specified criterion) that
@@ -1580,26 +1627,45 @@ right_merge_direction(PlannerInfo *root, PathKey *pathkey)
  *		Count the number of pathkeys that are useful for meeting the
  *		query's requested output ordering.
  *
- * Unlike merge pathkeys, this is an all-or-nothing affair: it does us
- * no good to order by just the first key(s) of the requested ordering.
- * So the result is always either 0 or list_length(root->query_pathkeys).
+ * Returns number of pathkeys that maches given argument. Others can be
+ * satisfied by incremental sort.
  */
-static int
-pathkeys_useful_for_ordering(PlannerInfo *root, List *pathkeys)
+int
+pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys)
 {
-	if (root->query_pathkeys == NIL)
+	int	n_common_pathkeys;
+
+	if (query_pathkeys == NIL)
 		return 0;				/* no special ordering requested */
 
 	if (pathkeys == NIL)
 		return 0;				/* unordered path */
 
-	if (pathkeys_contained_in(root->query_pathkeys, pathkeys))
+	if (pathkeys_common_contained_in(query_pathkeys, pathkeys, &n_common_pathkeys))
 	{
-		/* It's useful ... or at least the first N keys are */
-		return list_length(root->query_pathkeys);
+		/* Full match of pathkeys: always useful */
+		return n_common_pathkeys;
+	}
+	else
+	{
+		if (enable_incrementalsort)
+		{
+			/*
+			 * Return the number of path keys in common, or 0 if there are none.
+			 * Any leading common pathkeys could be useful for ordering because
+			 * we can use the incremental sort.
+			 */
+			return n_common_pathkeys;
+		}
+		else
+		{
+			/*
+			 * When incremental sort is disabled, pathkeys are useful only when
+			 * they do contain all the query pathkeys.
+			 */
+			return 0;
+		}
 	}
-
-	return 0;					/* path ordering not useful */
 }
 
 /*
@@ -1615,7 +1681,7 @@ truncate_useless_pathkeys(PlannerInfo *root,
 	int			nuseful2;
 
 	nuseful = pathkeys_useful_for_merging(root, rel, pathkeys);
-	nuseful2 = pathkeys_useful_for_ordering(root, pathkeys);
+	nuseful2 = pathkeys_useful_for_ordering(root->query_pathkeys, pathkeys);
 	if (nuseful2 > nuseful)
 		nuseful = nuseful2;
 
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 8b4f031d96..e047e7736b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -236,7 +236,7 @@ static MergeJoin *make_mergejoin(List *tlist,
 			   Plan *lefttree, Plan *righttree,
 			   JoinType jointype, bool inner_unique,
 			   bool skip_mark_restore);
-static Sort *make_sort(Plan *lefttree, int numCols,
+static Sort *make_sort(Plan *lefttree, int numCols, int presortedCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst);
 static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
@@ -252,10 +252,11 @@ static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
 					   TargetEntry *tle,
 					   Relids relids);
 static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
-						Relids relids);
+						Relids relids, int presortedCols);
 static Sort *make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree);
+						 Plan *lefttree,
+						 int presortedCols);
 static Material *make_material(Plan *lefttree);
 static WindowAgg *make_windowagg(List *tlist, Index winref,
 			   int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
@@ -443,6 +444,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 											   (GatherPath *) best_path);
 			break;
 		case T_Sort:
+		case T_IncrementalSort:
 			plan = (Plan *) create_sort_plan(root,
 											 (SortPath *) best_path,
 											 flags);
@@ -1128,6 +1130,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 		Oid		   *sortOperators;
 		Oid		   *collations;
 		bool	   *nullsFirst;
+		int			n_common_pathkeys;
 
 		/* Build the child plan */
 		/* Must insist that all children return the same tlist */
@@ -1162,9 +1165,11 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path)
 					  numsortkeys * sizeof(bool)) == 0);
 
 		/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-		if (!pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		if (!pathkeys_common_contained_in(pathkeys, subpath->pathkeys,
+										  &n_common_pathkeys))
 		{
 			Sort	   *sort = make_sort(subplan, numsortkeys,
+										 n_common_pathkeys,
 										 sortColIdx, sortOperators,
 										 collations, nullsFirst);
 
@@ -1514,6 +1519,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 	Plan	   *subplan;
 	List	   *pathkeys = best_path->path.pathkeys;
 	List	   *tlist = build_path_tlist(root, &best_path->path);
+	int			n_common_pathkeys;
 
 	/* As with Gather, it's best to project away columns in the workers. */
 	subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
@@ -1543,12 +1549,16 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
 
 
 	/* Now, insert a Sort node if subplan isn't sufficiently ordered */
-	if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
+	if (!pathkeys_common_contained_in(pathkeys, best_path->subpath->pathkeys,
+									  &n_common_pathkeys))
+	{
 		subplan = (Plan *) make_sort(subplan, gm_plan->numCols,
+									 n_common_pathkeys,
 									 gm_plan->sortColIdx,
 									 gm_plan->sortOperators,
 									 gm_plan->collations,
 									 gm_plan->nullsFirst);
+	}
 
 	/* Now insert the subplan under GatherMerge. */
 	gm_plan->plan.lefttree = subplan;
@@ -1661,6 +1671,7 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 {
 	Sort	   *plan;
 	Plan	   *subplan;
+	int			n_common_pathkeys;
 
 	/*
 	 * We don't want any excess columns in the sorted tuples, so request a
@@ -1670,6 +1681,11 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	subplan = create_plan_recurse(root, best_path->subpath,
 								  flags | CP_SMALL_TLIST);
 
+	if (IsA(best_path, IncrementalSortPath))
+		n_common_pathkeys = ((IncrementalSortPath *) best_path)->presortedCols;
+	else
+		n_common_pathkeys = 0;
+
 	/*
 	 * make_sort_from_pathkeys() indirectly calls find_ec_member_for_tle(),
 	 * which will ignore any child EC members that don't belong to the given
@@ -1678,7 +1694,8 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	 */
 	plan = make_sort_from_pathkeys(subplan, best_path->path.pathkeys,
 								   IS_OTHER_REL(best_path->subpath->parent) ?
-								   best_path->path.parent->relids : NULL);
+								   best_path->path.parent->relids : NULL,
+								   n_common_pathkeys);
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
@@ -1922,7 +1939,8 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 				sort_plan = (Plan *)
 					make_sort_from_groupcols(rollup->groupClause,
 											 new_grpColIdx,
-											 subplan);
+											 subplan,
+											 0);
 			}
 
 			if (!rollup->is_hashed)
@@ -3870,10 +3888,15 @@ create_mergejoin_plan(PlannerInfo *root,
 	 */
 	if (best_path->outersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		outer_relids = outer_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(outer_plan,
-												   best_path->outersortkeys,
-												   outer_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->outersortkeys,
+									best_path->jpath.outerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(outer_plan, best_path->outersortkeys,
+									   outer_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		outer_plan = (Plan *) sort;
@@ -3884,10 +3907,15 @@ create_mergejoin_plan(PlannerInfo *root,
 
 	if (best_path->innersortkeys)
 	{
+		Sort	   *sort;
+		int			n_common_pathkeys;
 		Relids		inner_relids = inner_path->parent->relids;
-		Sort	   *sort = make_sort_from_pathkeys(inner_plan,
-												   best_path->innersortkeys,
-												   inner_relids);
+
+		n_common_pathkeys = pathkeys_common(best_path->innersortkeys,
+									best_path->jpath.innerjoinpath->pathkeys);
+
+		sort = make_sort_from_pathkeys(inner_plan, best_path->innersortkeys,
+									   inner_relids, n_common_pathkeys);
 
 		label_sort_with_costsize(root, sort, -1.0);
 		inner_plan = (Plan *) sort;
@@ -4942,8 +4970,13 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
 {
 	Plan	   *lefttree = plan->plan.lefttree;
 	Path		sort_path;		/* dummy for result of cost_sort */
+	int			presorted_cols = 0;
+
+	if (IsA(plan, IncrementalSort))
+		presorted_cols = ((IncrementalSort *) plan)->presortedCols;
 
-	cost_sort(&sort_path, root, NIL,
+	cost_sort(&sort_path, root, NIL, presorted_cols,
+			  lefttree->startup_cost,
 			  lefttree->total_cost,
 			  lefttree->plan_rows,
 			  lefttree->plan_width,
@@ -5534,13 +5567,31 @@ make_mergejoin(List *tlist,
  * nullsFirst arrays already.
  */
 static Sort *
-make_sort(Plan *lefttree, int numCols,
+make_sort(Plan *lefttree, int numCols, int presortedCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst)
 {
-	Sort	   *node = makeNode(Sort);
-	Plan	   *plan = &node->plan;
+	Sort	   *node;
+	Plan	   *plan;
+
+	/* Always use regular sort node when enable_incrementalsort = false */
+	if (!enable_incrementalsort)
+		presortedCols = 0;
+
+	if (presortedCols == 0)
+	{
+		node = makeNode(Sort);
+	}
+	else
+	{
+		IncrementalSort    *incrementalSort;
+
+		incrementalSort = makeNode(IncrementalSort);
+		node = &incrementalSort->sort;
+		incrementalSort->presortedCols = presortedCols;
+	}
 
+	plan = &node->plan;
 	plan->targetlist = lefttree->targetlist;
 	plan->qual = NIL;
 	plan->lefttree = lefttree;
@@ -5873,9 +5924,11 @@ find_ec_member_for_tle(EquivalenceClass *ec,
  *	  'lefttree' is the node which yields input tuples
  *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
  *	  'relids' is the set of relations required by prepare_sort_from_pathkeys()
+ *	  'presortedCols' is the number of presorted columns in input tuples
  */
 static Sort *
-make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
+make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
+						Relids relids, int presortedCols)
 {
 	int			numsortkeys;
 	AttrNumber *sortColIdx;
@@ -5895,7 +5948,7 @@ make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
 										  &nullsFirst);
 
 	/* Now build the Sort node */
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, presortedCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5938,7 +5991,7 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, 0,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -5959,7 +6012,8 @@ make_sort_from_sortclauses(List *sortcls, Plan *lefttree)
 static Sort *
 make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
-						 Plan *lefttree)
+						 Plan *lefttree,
+						 int presortedCols)
 {
 	List	   *sub_tlist = lefttree->targetlist;
 	ListCell   *l;
@@ -5992,7 +6046,7 @@ make_sort_from_groupcols(List *groupcls,
 		numsortkeys++;
 	}
 
-	return make_sort(lefttree, numsortkeys,
+	return make_sort(lefttree, numsortkeys, presortedCols,
 					 sortColIdx, sortOperators,
 					 collations, nullsFirst);
 }
@@ -6657,6 +6711,7 @@ is_projection_capable_plan(Plan *plan)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_LockRows:
diff --git a/src/backend/optimizer/plan/planagg.c b/src/backend/optimizer/plan/planagg.c
index 95cbffbd69..308f60beac 100644
--- a/src/backend/optimizer/plan/planagg.c
+++ b/src/backend/optimizer/plan/planagg.c
@@ -44,6 +44,7 @@
 #include "parser/parse_clause.h"
 #include "rewrite/rewriteManip.h"
 #include "utils/lsyscache.h"
+#include "utils/selfuncs.h"
 #include "utils/syscache.h"
 
 
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index a19f5d0c02..ce5b1cc76e 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -4828,13 +4828,13 @@ create_ordered_paths(PlannerInfo *root,
 	foreach(lc, input_rel->pathlist)
 	{
 		Path	   *path = (Path *) lfirst(lc);
-		bool		is_sorted;
+		int			n_useful_pathkeys;
 
-		is_sorted = pathkeys_contained_in(root->sort_pathkeys,
-										  path->pathkeys);
-		if (path == cheapest_input_path || is_sorted)
+		n_useful_pathkeys = pathkeys_useful_for_ordering(root->sort_pathkeys,
+														 path->pathkeys);
+		if (path == cheapest_input_path || n_useful_pathkeys > 0)
 		{
-			if (!is_sorted)
+			if (n_useful_pathkeys < list_length(root->sort_pathkeys))
 			{
 				/* An explicit sort here can take advantage of LIMIT */
 				path = (Path *) create_sort_path(root,
@@ -5966,8 +5966,9 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
 
 	/* Estimate the cost of seq scan + sort */
 	seqScanPath = create_seqscan_path(root, rel, NULL, 0);
-	cost_sort(&seqScanAndSortPath, root, NIL,
-			  seqScanPath->total_cost, rel->tuples, rel->reltarget->width,
+	cost_sort(&seqScanAndSortPath, root, NIL, 0,
+			  seqScanPath->startup_cost, seqScanPath->total_cost,
+			  rel->tuples, rel->reltarget->width,
 			  comparisonCost, maintenance_work_mem, -1.0);
 
 	/* Estimate the cost of index scan */
@@ -6205,14 +6206,14 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 		foreach(lc, input_rel->pathlist)
 		{
 			Path	   *path = (Path *) lfirst(lc);
-			bool		is_sorted;
+			int			n_useful_pathkeys;
 
-			is_sorted = pathkeys_contained_in(root->group_pathkeys,
-											  path->pathkeys);
-			if (path == cheapest_path || is_sorted)
+			n_useful_pathkeys = pathkeys_useful_for_ordering(
+									root->group_pathkeys, path->pathkeys);
+			if (path == cheapest_path || n_useful_pathkeys > 0)
 			{
 				/* Sort the cheapest-total path if it isn't already sorted */
-				if (!is_sorted)
+				if (n_useful_pathkeys < list_length(root->group_pathkeys))
 					path = (Path *) create_sort_path(root,
 													 grouped_rel,
 													 path,
@@ -6275,12 +6276,18 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel,
 			foreach(lc, partially_grouped_rel->pathlist)
 			{
 				Path	   *path = (Path *) lfirst(lc);
+				int			n_useful_pathkeys;
 
 				/*
 				 * Insert a Sort node, if required.  But there's no point in
-				 * sorting anything but the cheapest path.
+				 * non-incremental sorting anything but the cheapest path.
 				 */
-				if (!pathkeys_contained_in(root->group_pathkeys, path->pathkeys))
+				n_useful_pathkeys = pathkeys_useful_for_ordering(
+										root->group_pathkeys, path->pathkeys);
+				if (n_useful_pathkeys == 0 &&
+					path != partially_grouped_rel->cheapest_total_path)
+					continue;
+				if (n_useful_pathkeys < list_length(root->group_pathkeys))
 				{
 					if (path != partially_grouped_rel->cheapest_total_path)
 						continue;
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 69dd327f0c..08a9545634 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -642,6 +642,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index 83008d7661..313cad266f 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2795,6 +2795,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_Group:
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 6e510f9d94..9fca84fde1 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -1110,7 +1110,8 @@ choose_hashed_setop(PlannerInfo *root, List *groupClauses,
 	sorted_p.startup_cost = input_path->startup_cost;
 	sorted_p.total_cost = input_path->total_cost;
 	/* XXX cost_sort doesn't actually look at pathkeys, so just pass NIL */
-	cost_sort(&sorted_p, root, NIL, sorted_p.total_cost,
+	cost_sort(&sorted_p, root, NIL, 0,
+			  sorted_p.startup_cost, sorted_p.total_cost,
 			  input_path->rows, input_path->pathtarget->width,
 			  0.0, work_mem, -1.0);
 	cost_group(&sorted_p, root, numGroupCols, dNumGroups,
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 22133fcf12..2202e97ee4 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1362,12 +1362,14 @@ create_merge_append_path(PlannerInfo *root,
 	foreach(l, subpaths)
 	{
 		Path	   *subpath = (Path *) lfirst(l);
+		int			n_common_pathkeys;
 
 		pathnode->path.rows += subpath->rows;
 		pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
 			subpath->parallel_safe;
 
-		if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+		if (pathkeys_common_contained_in(pathkeys, subpath->pathkeys,
+										 &n_common_pathkeys))
 		{
 			/* Subpath is adequately ordered, we won't need to sort it */
 			input_startup_cost += subpath->startup_cost;
@@ -1381,6 +1383,8 @@ create_merge_append_path(PlannerInfo *root,
 			cost_sort(&sort_path,
 					  root,
 					  pathkeys,
+					  n_common_pathkeys,
+					  subpath->startup_cost,
 					  subpath->total_cost,
 					  subpath->parent->tuples,
 					  subpath->pathtarget->width,
@@ -1628,7 +1632,8 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		/*
 		 * Estimate cost for sort+unique implementation
 		 */
-		cost_sort(&sort_path, root, NIL,
+		cost_sort(&sort_path, root, NIL, 0,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  rel->rows,
 				  subpath->pathtarget->width,
@@ -1721,6 +1726,7 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	GatherMergePath *pathnode = makeNode(GatherMergePath);
 	Cost		input_startup_cost = 0;
 	Cost		input_total_cost = 0;
+	int			n_common_pathkeys;
 
 	Assert(subpath->parallel_safe);
 	Assert(pathkeys);
@@ -1737,7 +1743,7 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 	pathnode->path.pathtarget = target ? target : rel->reltarget;
 	pathnode->path.rows += subpath->rows;
 
-	if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
+	if (pathkeys_common_contained_in(pathkeys, subpath->pathkeys, &n_common_pathkeys))
 	{
 		/* Subpath is adequately ordered, we won't need to sort it */
 		input_startup_cost += subpath->startup_cost;
@@ -1751,6 +1757,8 @@ create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
 		cost_sort(&sort_path,
 				  root,
 				  pathkeys,
+				  n_common_pathkeys,
+				  subpath->startup_cost,
 				  subpath->total_cost,
 				  subpath->rows,
 				  subpath->pathtarget->width,
@@ -2610,9 +2618,35 @@ create_sort_path(PlannerInfo *root,
 				 List *pathkeys,
 				 double limit_tuples)
 {
-	SortPath   *pathnode = makeNode(SortPath);
+	SortPath   *pathnode;
+	int			n_common_pathkeys;
+
+	/*
+	 * Use incremental sort when it's enabled and there are common pathkeys,
+	 * use regular sort otherwise.
+	 */
+	if (enable_incrementalsort)
+		n_common_pathkeys = pathkeys_common(subpath->pathkeys, pathkeys);
+	else
+		n_common_pathkeys = 0;
+
+	if (n_common_pathkeys == 0)
+	{
+		pathnode = makeNode(SortPath);
+		pathnode->path.pathtype = T_Sort;
+	}
+	else
+	{
+		IncrementalSortPath   *incpathnode;
+
+		incpathnode = makeNode(IncrementalSortPath);
+		pathnode = &incpathnode->spath;
+		pathnode->path.pathtype = T_IncrementalSort;
+		incpathnode->presortedCols = n_common_pathkeys;
+	}
+
+	Assert(n_common_pathkeys < list_length(pathkeys));
 
-	pathnode->path.pathtype = T_Sort;
 	pathnode->path.parent = rel;
 	/* Sort doesn't project, so use source path's pathtarget */
 	pathnode->path.pathtarget = subpath->pathtarget;
@@ -2626,7 +2660,9 @@ create_sort_path(PlannerInfo *root,
 
 	pathnode->subpath = subpath;
 
-	cost_sort(&pathnode->path, root, pathkeys,
+	cost_sort(&pathnode->path, root,
+			  pathkeys, n_common_pathkeys,
+			  subpath->startup_cost,
 			  subpath->total_cost,
 			  subpath->rows,
 			  subpath->pathtarget->width,
@@ -2938,7 +2974,8 @@ create_groupingsets_path(PlannerInfo *root,
 			else
 			{
 				/* Account for cost of sort, but don't charge input cost again */
-				cost_sort(&sort_path, root, NIL,
+				cost_sort(&sort_path, root, NIL, 0,
+						  0.0,
 						  0.0,
 						  subpath->rows,
 						  subpath->pathtarget->width,
diff --git a/src/backend/utils/adt/orderedsetaggs.c b/src/backend/utils/adt/orderedsetaggs.c
index ed36851fdd..a6e14af9b8 100644
--- a/src/backend/utils/adt/orderedsetaggs.c
+++ b/src/backend/utils/adt/orderedsetaggs.c
@@ -295,7 +295,8 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
 												   qstate->sortNullsFirsts,
 												   work_mem,
 												   NULL,
-												   qstate->rescan_needed);
+												   qstate->rescan_needed,
+												   false);
 	else
 		osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
 													qstate->sortOperator,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4ffc8451ca..c8aa384a74 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -861,6 +861,15 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
+		{"enable_incrementalsort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of incremental sort steps."),
+			NULL
+		},
+		&enable_incrementalsort,
+		true,
+		NULL, NULL, NULL
+	},
+	{
 		{"enable_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of hashed aggregation plans."),
 			NULL
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index e433faad86..83665e0fb2 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -125,6 +125,9 @@
 #define PARALLEL_SORT(state)	((state)->shared == NULL ? 0 : \
 								 (state)->worker >= 0 ? 1 : 2)
 
+#define INITIAL_MEMTUPSIZE Max(1024, \
+	ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool		trace_sort = false;
@@ -243,6 +246,14 @@ struct Tuplesortstate
 	int64		allowedMem;		/* total memory allowed, in bytes */
 	int			maxTapes;		/* number of tapes (Knuth's T) */
 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	int64		maxSpace;		/* maximum amount of space occupied among sort
+								   of groups, either in-memory or on-disk */
+	bool		maxSpaceOnDisk;	/* true when maxSpace is value for on-disk
+								   space, false when it's value for in-memory
+								   space */
+	TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
+	MemoryContext maincontext;	/* memory context for tuple sort metadata
+								   that persist across multiple batches */
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
@@ -647,6 +658,9 @@ static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
 static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+static void tuplesort_free(Tuplesortstate *state);
+static void tuplesort_updatemax(Tuplesortstate *state);
+
 
 /*
  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
@@ -682,6 +696,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 					   bool randomAccess)
 {
 	Tuplesortstate *state;
+	MemoryContext maincontext;
 	MemoryContext sortcontext;
 	MemoryContext tuplecontext;
 	MemoryContext oldcontext;
@@ -691,14 +706,22 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 		elog(ERROR, "random access disallowed under parallel sort");
 
 	/*
-	 * Create a working memory context for this sort operation. All data
-	 * needed by the sort will live inside this context.
+	 * Memory context surviving tuplesort_reset.  This memory context holds
+	 * data which is useful to keep while sorting multiple similar batches.
 	 */
-	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
+	maincontext = AllocSetContextCreate(CurrentMemoryContext,
 										"TupleSort main",
 										ALLOCSET_DEFAULT_SIZES);
 
 	/*
+	 * Create a working memory context for one sort operation.  The content of
+	 * this context is deleted by tuplesort_reset.
+	 */
+	sortcontext = AllocSetContextCreate(maincontext,
+										"TupleSort sort",
+										ALLOCSET_DEFAULT_SIZES);
+
+	/*
 	 * Caller tuple (e.g. IndexTuple) memory context.
 	 *
 	 * A dedicated child context used exclusively for caller passed tuples
@@ -715,7 +738,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	 * Make the Tuplesortstate within the per-sort context.  This way, we
 	 * don't need a separate pfree() operation for it at shutdown.
 	 */
-	oldcontext = MemoryContextSwitchTo(sortcontext);
+	oldcontext = MemoryContextSwitchTo(maincontext);
 
 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
@@ -740,6 +763,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	state->availMem = state->allowedMem;
 	state->sortcontext = sortcontext;
 	state->tuplecontext = tuplecontext;
+	state->maincontext = maincontext;
 	state->tapeset = NULL;
 
 	state->memtupcount = 0;
@@ -748,9 +772,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
 	 * see comments in grow_memtuples().
 	 */
-	state->memtupsize = Max(1024,
-							ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
-
+	state->memtupsize = INITIAL_MEMTUPSIZE;
 	state->growmemtuples = true;
 	state->slabAllocatorUsed = false;
 	state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
@@ -807,14 +829,15 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, SortCoordinate coordinate, bool randomAccess)
+					 int workMem, SortCoordinate coordinate,
+					 bool randomAccess, bool skipAbbrev)
 {
 	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
 												   randomAccess);
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 	AssertArg(nkeys > 0);
 
@@ -857,7 +880,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 		sortKey->ssup_nulls_first = nullsFirstFlags[i];
 		sortKey->ssup_attno = attNums[i];
 		/* Convey if abbreviation optimization is applicable in principle */
-		sortKey->abbreviate = (i == 0);
+		sortKey->abbreviate = (i == 0) && !skipAbbrev;
 
 		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
 	}
@@ -890,7 +913,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -985,7 +1008,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1064,7 +1087,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 												   randomAccess);
 	MemoryContext oldcontext;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1107,7 +1130,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	int16		typlen;
 	bool		typbyval;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1224,16 +1247,12 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
 }
 
 /*
- * tuplesort_end
- *
- *	Release resources and clean up.
+ * tuplesort_free
  *
- * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
- * pointing to garbage.  Be careful not to attempt to use or free such
- * pointers afterwards!
+ *	Internal routine for freeing resources of tuplesort.
  */
-void
-tuplesort_end(Tuplesortstate *state)
+static void
+tuplesort_free(Tuplesortstate *state)
 {
 	/* context swap probably not needed, but let's be safe */
 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -1294,7 +1313,104 @@ tuplesort_end(Tuplesortstate *state)
 	 * Free the per-sort memory context, thereby releasing all working memory,
 	 * including the Tuplesortstate struct itself.
 	 */
-	MemoryContextDelete(state->sortcontext);
+	MemoryContextReset(state->sortcontext);
+}
+
+/*
+ * tuplesort_end
+ *
+ *	Release resources and clean up.
+ *
+ * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
+ * pointing to garbage.  Be careful not to attempt to use or free such
+ * pointers afterwards!
+ */
+void
+tuplesort_end(Tuplesortstate *state)
+{
+	tuplesort_free(state);
+	MemoryContextDelete(state->maincontext);
+}
+
+/*
+ * tuplesort_updatemax 
+ *
+ *	Update maximum resource usage statistics.
+ */
+static void
+tuplesort_updatemax(Tuplesortstate *state)
+{
+	int64	spaceUsed;
+	bool	spaceUsedOnDisk;
+
+	/*
+	 * Note: it might seem we should provide both memory and disk usage for a
+	 * disk-based sort.  However, the current code doesn't track memory space
+	 * accurately once we have begun to return tuples to the caller (since we
+	 * don't account for pfree's the caller is expected to do), so we cannot
+	 * rely on availMem in a disk sort.  This does not seem worth the overhead
+	 * to fix.  Is it worth creating an API for the memory context code to
+	 * tell us how much is actually used in sortcontext?
+	 */
+	if (state->tapeset)
+	{
+		spaceUsedOnDisk = true;
+		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
+	}
+	else
+	{
+		spaceUsedOnDisk = false;
+		spaceUsed = state->allowedMem - state->availMem;
+	}
+
+	/* XXX */
+	if (spaceUsedOnDisk > state->maxSpaceOnDisk ||
+		(spaceUsedOnDisk == state->maxSpaceOnDisk && spaceUsed > state->maxSpace))
+	{
+		state->maxSpace = spaceUsed;
+		state->maxSpaceOnDisk = spaceUsedOnDisk;
+		state->maxSpaceStatus = state->status;
+	}
+}
+
+/*
+ * tuplesort_reset
+ *
+ *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
+ *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
+ *	a new sort.  It allows evade recreation of tuple sort (and save resources)
+ *	when sorting multiple small batches.
+ */
+void
+tuplesort_reset(Tuplesortstate *state)
+{
+	tuplesort_updatemax(state);
+	tuplesort_free(state);
+
+	state->status = TSS_INITIAL;
+	state->memtupcount = 0;
+	state->boundUsed = false;
+	state->tapeset = NULL;
+	state->currentRun = 0;
+	state->result_tape = -1;
+	state->bounded = false;
+	state->availMem = state->allowedMem;
+	state->lastReturnedTuple = NULL;
+	state->slabAllocatorUsed = false;
+	state->slabMemoryBegin = NULL;
+	state->slabMemoryEnd = NULL;
+	state->slabFreeHead = NULL;
+	state->growmemtuples = true;
+
+	if (state->memtupsize < INITIAL_MEMTUPSIZE)
+	{
+		if (state->memtuples)
+			pfree(state->memtuples);
+		state->memtuples = (SortTuple *) palloc(INITIAL_MEMTUPSIZE * sizeof(SortTuple));
+		state->memtupsize = INITIAL_MEMTUPSIZE;
+	}
+
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 }
 
 /*
@@ -2591,8 +2707,7 @@ mergeruns(Tuplesortstate *state)
 	 * Reset tuple memory.  We've freed all the tuples that we previously
 	 * allocated.  We will use the slab allocator from now on.
 	 */
-	MemoryContextDelete(state->tuplecontext);
-	state->tuplecontext = NULL;
+	MemoryContextResetOnly(state->tuplecontext);
 
 	/*
 	 * We no longer need a large memtuples array.  (We will allocate a smaller
@@ -2642,7 +2757,8 @@ mergeruns(Tuplesortstate *state)
 	 * from each input tape.
 	 */
 	state->memtupsize = numInputTapes;
-	state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+	state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
+										numInputTapes * sizeof(SortTuple));
 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
 	/*
@@ -3139,18 +3255,15 @@ tuplesort_get_stats(Tuplesortstate *state,
 	 * to fix.  Is it worth creating an API for the memory context code to
 	 * tell us how much is actually used in sortcontext?
 	 */
-	if (state->tapeset)
-	{
+	tuplesort_updatemax(state);
+
+	if (state->maxSpaceOnDisk)
 		stats->spaceType = SORT_SPACE_TYPE_DISK;
-		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
-	}
 	else
-	{
 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
-		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
-	}
+	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
 
-	switch (state->status)
+	switch (state->maxSpaceStatus)
 	{
 		case TSS_SORTEDINMEM:
 			if (state->boundUsed)
diff --git a/src/include/executor/nodeIncrementalSort.h b/src/include/executor/nodeIncrementalSort.h
new file mode 100644
index 0000000000..90d7a81711
--- /dev/null
+++ b/src/include/executor/nodeIncrementalSort.h
@@ -0,0 +1,30 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncrementalSort.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeIncrementalSort.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEINCREMENTALSORT_H
+#define NODEINCREMENTALSORT_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern IncrementalSortState *ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags);
+extern void ExecEndIncrementalSort(IncrementalSortState *node);
+extern void ExecReScanIncrementalSort(IncrementalSortState *node);
+
+/* parallel instrumentation support */
+extern void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pcxt);
+extern void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node);
+
+#endif   /* NODEINCREMENTALSORT_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6070a42b6f..bf379f7f20 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1807,6 +1807,20 @@ typedef struct MaterialState
 	Tuplestorestate *tuplestorestate;
 } MaterialState;
 
+
+/* ----------------
+ *	 When performing sorting by multiple keys input dataset could be already
+ *	 presorted by some prefix of these keys.  We call them "presorted keys".
+ *	 PresortedKeyData represents information about one such key.
+ * ----------------
+ */
+typedef struct PresortedKeyData
+{
+	FmgrInfo				flinfo;	/* comparison function info */
+	FunctionCallInfoData	fcinfo;	/* comparison function call info */
+	OffsetNumber			attno;	/* attribute number in tuple */
+} PresortedKeyData;
+
 /* ----------------
  *	 Shared memory container for per-worker sort information
  * ----------------
@@ -1835,6 +1849,45 @@ typedef struct SortState
 	SharedSortInfo *shared_info;	/* one entry per worker */
 } SortState;
 
+/* ----------------
+ *	 Shared memory container for per-worker incremental sort information
+ * ----------------
+ */
+typedef struct IncrementalSortInfo
+{
+	TuplesortInstrumentation	sinstrument;
+	int64						groupsCount;
+} IncrementalSortInfo;
+
+typedef struct SharedIncrementalSortInfo
+{
+	int							num_workers;
+	IncrementalSortInfo			sinfo[FLEXIBLE_ARRAY_MEMBER];
+} SharedIncrementalSortInfo;
+
+/* ----------------
+ *	 IncrementalSortState information
+ * ----------------
+ */
+typedef struct IncrementalSortState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	bool		bounded;		/* is the result set bounded? */
+	int64		bound;			/* if bounded, how many tuples are needed */
+	bool		sort_Done;		/* sort completed yet? */
+	bool		finished;		/* fetching tuples from outer node
+								   is finished ? */
+	bool		bounded_Done;	/* value of bounded we did the sort with */
+	int64		bound_Done;		/* value of bound we did the sort with */
+	void	   *tuplesortstate; /* private state of tuplesort.c */
+	PresortedKeyData *presortedKeys;	/* keys, dataset is presorted by */
+	int64		groupsCount;	/* number of groups with equal presorted keys */
+	/* slot for pivot tuple defining values of presorted keys within group */
+	TupleTableSlot *grpPivotSlot;
+	bool		am_worker;		/* are we a worker? */
+	SharedIncrementalSortInfo *shared_info;	/* one entry per worker */
+} IncrementalSortState;
+
 /* ---------------------
  *	GroupState information
  * ---------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 443de22704..4bc270ed6f 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -74,6 +74,7 @@ typedef enum NodeTag
 	T_HashJoin,
 	T_Material,
 	T_Sort,
+	T_IncrementalSort,
 	T_Group,
 	T_Agg,
 	T_WindowAgg,
@@ -126,6 +127,7 @@ typedef enum NodeTag
 	T_HashJoinState,
 	T_MaterialState,
 	T_SortState,
+	T_IncrementalSortState,
 	T_GroupState,
 	T_AggState,
 	T_WindowAggState,
@@ -241,6 +243,7 @@ typedef enum NodeTag
 	T_ProjectionPath,
 	T_ProjectSetPath,
 	T_SortPath,
+	T_IncrementalSortPath,
 	T_GroupPath,
 	T_UpperUniquePath,
 	T_AggPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index c922216b7d..974112d086 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -753,6 +753,17 @@ typedef struct Sort
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
 } Sort;
 
+
+/* ----------------
+ *		incremental sort node
+ * ----------------
+ */
+typedef struct IncrementalSort
+{
+	Sort		sort;
+	int			presortedCols;	/* number of presorted columns */
+} IncrementalSort;
+
 /* ---------------
  *	 group node -
  *		Used for queries with GROUP BY (but no aggregates) specified.
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index abbbda9e91..86db5098e4 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1524,6 +1524,16 @@ typedef struct SortPath
 } SortPath;
 
 /*
+ * IncrementalSortPath
+ */
+typedef struct IncrementalSortPath
+{
+	SortPath	spath;
+	int			presortedCols;	/* number of presorted columns */
+} IncrementalSortPath;
+
+
+/*
  * GroupPath represents grouping (of presorted input)
  *
  * groupClause represents the columns to be grouped on; the input path
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index d3269eae71..60edbd996f 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,7 @@ extern PGDLLIMPORT bool enable_indexonlyscan;
 extern PGDLLIMPORT bool enable_bitmapscan;
 extern PGDLLIMPORT bool enable_tidscan;
 extern PGDLLIMPORT bool enable_sort;
+extern PGDLLIMPORT bool enable_incrementalsort;
 extern PGDLLIMPORT bool enable_hashagg;
 extern PGDLLIMPORT bool enable_nestloop;
 extern PGDLLIMPORT bool enable_material;
@@ -106,8 +107,9 @@ extern void cost_namedtuplestorescan(Path *path, PlannerInfo *root,
 						 RelOptInfo *baserel, ParamPathInfo *param_info);
 extern void cost_recursive_union(Path *runion, Path *nrterm, Path *rterm);
 extern void cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
-		  Cost comparison_cost, int sort_mem,
+		  List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double tuples, int width, Cost comparison_cost, int sort_mem,
 		  double limit_tuples);
 extern void cost_append(AppendPath *path);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 50e180c554..26787a6221 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -189,6 +189,8 @@ typedef enum
 
 extern PathKeysComparison compare_pathkeys(List *keys1, List *keys2);
 extern bool pathkeys_contained_in(List *keys1, List *keys2);
+extern bool pathkeys_common_contained_in(List *keys1, List *keys2, int *n_common);
+extern int pathkeys_common(List *keys1, List *keys2);
 extern Path *get_cheapest_path_for_pathkeys(List *paths, List *pathkeys,
 							   Relids required_outer,
 							   CostSelector cost_criterion,
@@ -229,6 +231,7 @@ extern List *make_inner_pathkeys_for_merge(PlannerInfo *root,
 extern List *trim_mergeclauses_for_inner_pathkeys(PlannerInfo *root,
 									 List *mergeclauses,
 									 List *pathkeys);
+extern int pathkeys_useful_for_ordering(List *query_pathkeys, List *pathkeys);
 extern List *truncate_useless_pathkeys(PlannerInfo *root,
 						  RelOptInfo *rel,
 						  List *pathkeys);
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index d2e6754f04..eb260dfd8b 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -193,7 +193,7 @@ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
 					 int workMem, SortCoordinate coordinate,
-					 bool randomAccess);
+					 bool randomAccess, bool skipAbbrev);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
 						Relation indexRel, int workMem,
 						SortCoordinate coordinate, bool randomAccess);
@@ -240,6 +240,8 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples,
 
 extern void tuplesort_end(Tuplesortstate *state);
 
+extern void tuplesort_reset(Tuplesortstate *state);
+
 extern void tuplesort_get_stats(Tuplesortstate *state,
 					TuplesortInstrumentation *stats);
 extern const char *tuplesort_method_name(TuplesortMethod m);
diff --git a/src/test/isolation/expected/drop-index-concurrently-1.out b/src/test/isolation/expected/drop-index-concurrently-1.out
index 75dff56bc4..e11fb617b5 100644
--- a/src/test/isolation/expected/drop-index-concurrently-1.out
+++ b/src/test/isolation/expected/drop-index-concurrently-1.out
@@ -19,9 +19,10 @@ Sort
 step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
 QUERY PLAN     
 
-Sort           
+Incremental Sort
   Sort Key: id, data
-  ->  Seq Scan on test_dc
+  Presorted Key: id
+  ->  Index Scan using test_dc_pkey on test_dc
         Filter: ((data)::text = '34'::text)
 step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
 id             data           
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
index f56151fc1e..f643422d5b 100644
--- a/src/test/regress/expected/inherit.out
+++ b/src/test/regress/expected/inherit.out
@@ -1517,6 +1517,7 @@ NOTICE:  drop cascades to table matest1
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
 SELECT thousand, tenthous FROM tenk1
@@ -1657,9 +1658,45 @@ FROM generate_series(1, 3) g(i);
  {3,7,8,10,13,13,16,18,19,22}
 (3 rows)
 
+set enable_incrementalsort = on;
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Merge Append
+   Sort Key: tenk1.thousand, tenk1.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1
+   ->  Incremental Sort
+         Sort Key: tenk1_1.thousand, tenk1_1.thousand
+         Presorted Key: tenk1_1.thousand
+         ->  Index Only Scan using tenk1_thous_tenthous on tenk1 tenk1_1
+(7 rows)
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Merge Append
+   Sort Key: a.thousand, a.tenthous
+   ->  Index Only Scan using tenk1_thous_tenthous on tenk1 a
+   ->  Incremental Sort
+         Sort Key: b.unique2, b.unique2
+         Presorted Key: b.unique2
+         ->  Index Only Scan using tenk1_unique2 on tenk1 b
+(7 rows)
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 --
 -- Check handling of a constant-null CHECK constraint
 --
diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
index 84c6e9b5a4..78728f873a 100644
--- a/src/test/regress/expected/join.out
+++ b/src/test/regress/expected/join.out
@@ -2347,18 +2347,21 @@ select count(*) from
   left join
   (select * from tenk1 y order by y.unique2) y
   on x.thousand = y.unique2 and x.twothousand = y.hundred and x.fivethous = y.unique2;
-                                    QUERY PLAN                                    
-----------------------------------------------------------------------------------
+                                                  QUERY PLAN                                                  
+--------------------------------------------------------------------------------------------------------------
  Aggregate
    ->  Merge Left Join
-         Merge Cond: (x.thousand = y.unique2)
-         Join Filter: ((x.twothousand = y.hundred) AND (x.fivethous = y.unique2))
+         Merge Cond: ((x.thousand = y.unique2) AND (x.twothousand = y.hundred) AND (x.fivethous = y.unique2))
          ->  Sort
                Sort Key: x.thousand, x.twothousand, x.fivethous
                ->  Seq Scan on tenk1 x
          ->  Materialize
-               ->  Index Scan using tenk1_unique2 on tenk1 y
-(9 rows)
+               ->  Incremental Sort
+                     Sort Key: y.unique2, y.hundred
+                     Presorted Key: y.unique2
+                     ->  Subquery Scan on y
+                           ->  Index Scan using tenk1_unique2 on tenk1 y_1
+(12 rows)
 
 select count(*) from
   (select * from tenk1 x order by x.thousand, x.twothousand, x.fivethous) x
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index 76a8209ec2..39c17c6f03 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -176,9 +176,11 @@ EXPLAIN (COSTS OFF)
 SELECT c, sum(a), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 ORDER BY 1, 2, 3;
                               QUERY PLAN                               
 -----------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_p1.c, (sum(pagg_tab_p1.a)), (avg(pagg_tab_p1.b))
-   ->  Append
+   Presorted Key: pagg_tab_p1.c
+   ->  Merge Append
+         Sort Key: pagg_tab_p1.c
          ->  GroupAggregate
                Group Key: pagg_tab_p1.c
                Filter: (avg(pagg_tab_p1.d) < '15'::numeric)
@@ -197,7 +199,7 @@ SELECT c, sum(a), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 O
                ->  Sort
                      Sort Key: pagg_tab_p3.c
                      ->  Seq Scan on pagg_tab_p3
-(21 rows)
+(23 rows)
 
 SELECT c, sum(a), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 ORDER BY 1, 2, 3;
   c   | sum  |         avg         | count 
@@ -215,8 +217,9 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 ORDER BY 1, 2, 3;
                               QUERY PLAN                               
 -----------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_p1.a, (sum(pagg_tab_p1.b)), (avg(pagg_tab_p1.b))
+   Presorted Key: pagg_tab_p1.a
    ->  Finalize GroupAggregate
          Group Key: pagg_tab_p1.a
          Filter: (avg(pagg_tab_p1.d) < '15'::numeric)
@@ -237,7 +240,7 @@ SELECT a, sum(b), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 O
                      ->  Sort
                            Sort Key: pagg_tab_p3.a
                            ->  Seq Scan on pagg_tab_p3
-(22 rows)
+(23 rows)
 
 SELECT a, sum(b), avg(b), count(*) FROM pagg_tab GROUP BY 1 HAVING avg(d) < 15 ORDER BY 1, 2, 3;
  a  | sum  |         avg         | count 
@@ -356,9 +359,11 @@ EXPLAIN (COSTS OFF)
 SELECT c, sum(b order by a) FROM pagg_tab GROUP BY c ORDER BY 1, 2;
                                QUERY PLAN                               
 ------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_p1.c, (sum(pagg_tab_p1.b ORDER BY pagg_tab_p1.a))
-   ->  Append
+   Presorted Key: pagg_tab_p1.c
+   ->  Merge Append
+         Sort Key: pagg_tab_p1.c
          ->  GroupAggregate
                Group Key: pagg_tab_p1.c
                ->  Sort
@@ -374,7 +379,7 @@ SELECT c, sum(b order by a) FROM pagg_tab GROUP BY c ORDER BY 1, 2;
                ->  Sort
                      Sort Key: pagg_tab_p3.c
                      ->  Seq Scan on pagg_tab_p3
-(18 rows)
+(20 rows)
 
 -- Since GROUP BY clause does not match with PARTITION KEY; we need to do
 -- partial aggregation. However, ORDERED SET are not partial safe and thus
@@ -383,8 +388,9 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b order by a) FROM pagg_tab GROUP BY a ORDER BY 1, 2;
                                QUERY PLAN                               
 ------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_p1.a, (sum(pagg_tab_p1.b ORDER BY pagg_tab_p1.a))
+   Presorted Key: pagg_tab_p1.a
    ->  GroupAggregate
          Group Key: pagg_tab_p1.a
          ->  Sort
@@ -393,7 +399,7 @@ SELECT a, sum(b order by a) FROM pagg_tab GROUP BY a ORDER BY 1, 2;
                      ->  Seq Scan on pagg_tab_p1
                      ->  Seq Scan on pagg_tab_p2
                      ->  Seq Scan on pagg_tab_p3
-(10 rows)
+(11 rows)
 
 -- JOIN query
 CREATE TABLE pagg_tab1(x int, y int) PARTITION BY RANGE(x);
@@ -487,8 +493,9 @@ EXPLAIN (COSTS OFF)
 SELECT t1.y, sum(t1.x), count(*) FROM pagg_tab1 t1, pagg_tab2 t2 WHERE t1.x = t2.y GROUP BY t1.y HAVING avg(t1.x) > 10 ORDER BY 1, 2, 3;
                                QUERY PLAN                                
 -------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: t1.y, (sum(t1.x)), (count(*))
+   Presorted Key: t1.y
    ->  Finalize GroupAggregate
          Group Key: t1.y
          Filter: (avg(t1.x) > '10'::numeric)
@@ -521,7 +528,7 @@ SELECT t1.y, sum(t1.x), count(*) FROM pagg_tab1 t1, pagg_tab2 t2 WHERE t1.x = t2
                                  ->  Seq Scan on pagg_tab2_p3 t2_2
                                  ->  Hash
                                        ->  Seq Scan on pagg_tab1_p3 t1_2
-(34 rows)
+(35 rows)
 
 SELECT t1.y, sum(t1.x), count(*) FROM pagg_tab1 t1, pagg_tab2 t2 WHERE t1.x = t2.y GROUP BY t1.y HAVING avg(t1.x) > 10 ORDER BY 1, 2, 3;
  y  | sum  | count 
@@ -1068,8 +1075,9 @@ EXPLAIN (COSTS OFF)
 SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b ORDER BY 1, 2, 3;
                             QUERY PLAN                             
 -------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_ml_p1.b, (sum(pagg_tab_ml_p1.a)), (count(*))
+   Presorted Key: pagg_tab_ml_p1.b
    ->  Finalize GroupAggregate
          Group Key: pagg_tab_ml_p1.b
          ->  Sort
@@ -1090,7 +1098,7 @@ SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b ORDER BY 1, 2, 3;
                      ->  Partial HashAggregate
                            Group Key: pagg_tab_ml_p3_s2.b
                            ->  Seq Scan on pagg_tab_ml_p3_s2
-(22 rows)
+(23 rows)
 
 SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b HAVING avg(a) < 15 ORDER BY 1, 2, 3;
  b |  sum  | count 
@@ -1159,9 +1167,11 @@ EXPLAIN (COSTS OFF)
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a HAVING avg(b) < 3 ORDER BY 1, 2, 3;
                                     QUERY PLAN                                    
 ----------------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_ml_p1.a, (sum(pagg_tab_ml_p1.b)), (count(*))
-   ->  Append
+   Presorted Key: pagg_tab_ml_p1.a
+   ->  Merge Append
+         Sort Key: pagg_tab_ml_p1.a
          ->  Finalize GroupAggregate
                Group Key: pagg_tab_ml_p1.a
                Filter: (avg(pagg_tab_ml_p1.b) < '3'::numeric)
@@ -1200,7 +1210,7 @@ SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a HAVING avg(b) < 3 ORDER B
                                  ->  Partial HashAggregate
                                        Group Key: pagg_tab_ml_p3_s2.a
                                        ->  Parallel Seq Scan on pagg_tab_ml_p3_s2
-(41 rows)
+(43 rows)
 
 SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a HAVING avg(b) < 3 ORDER BY 1, 2, 3;
  a  | sum  | count 
@@ -1222,8 +1232,9 @@ EXPLAIN (COSTS OFF)
 SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b ORDER BY 1, 2, 3;
                                  QUERY PLAN                                 
 ----------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_ml_p1.b, (sum(pagg_tab_ml_p1.a)), (count(*))
+   Presorted Key: pagg_tab_ml_p1.b
    ->  Finalize GroupAggregate
          Group Key: pagg_tab_ml_p1.b
          ->  Gather Merge
@@ -1246,7 +1257,7 @@ SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b ORDER BY 1, 2, 3;
                            ->  Partial HashAggregate
                                  Group Key: pagg_tab_ml_p3_s2.b
                                  ->  Parallel Seq Scan on pagg_tab_ml_p3_s2
-(24 rows)
+(25 rows)
 
 SELECT b, sum(a), count(*) FROM pagg_tab_ml GROUP BY b HAVING avg(a) < 15 ORDER BY 1, 2, 3;
  b |  sum  | count 
@@ -1327,8 +1338,9 @@ EXPLAIN (COSTS OFF)
 SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3;
                                       QUERY PLAN                                      
 --------------------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_para_p1.x, (sum(pagg_tab_para_p1.y)), (avg(pagg_tab_para_p1.y))
+   Presorted Key: pagg_tab_para_p1.x
    ->  Finalize GroupAggregate
          Group Key: pagg_tab_para_p1.x
          Filter: (avg(pagg_tab_para_p1.y) < '7'::numeric)
@@ -1346,7 +1358,7 @@ SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) <
                            ->  Partial HashAggregate
                                  Group Key: pagg_tab_para_p3.x
                                  ->  Parallel Seq Scan on pagg_tab_para_p3
-(19 rows)
+(20 rows)
 
 SELECT x, sum(y), avg(y), count(*) FROM pagg_tab_para GROUP BY x HAVING avg(y) < 7 ORDER BY 1, 2, 3;
  x  | sum  |        avg         | count 
@@ -1364,8 +1376,9 @@ EXPLAIN (COSTS OFF)
 SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) < 12 ORDER BY 1, 2, 3;
                                       QUERY PLAN                                      
 --------------------------------------------------------------------------------------
- Sort
+ Incremental Sort
    Sort Key: pagg_tab_para_p1.y, (sum(pagg_tab_para_p1.x)), (avg(pagg_tab_para_p1.x))
+   Presorted Key: pagg_tab_para_p1.y
    ->  Finalize GroupAggregate
          Group Key: pagg_tab_para_p1.y
          Filter: (avg(pagg_tab_para_p1.x) < '12'::numeric)
@@ -1383,7 +1396,7 @@ SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) <
                            ->  Partial HashAggregate
                                  Group Key: pagg_tab_para_p3.y
                                  ->  Parallel Seq Scan on pagg_tab_para_p3
-(19 rows)
+(20 rows)
 
 SELECT y, sum(x), avg(x), count(*) FROM pagg_tab_para GROUP BY y HAVING avg(x) < 12 ORDER BY 1, 2, 3;
  y  |  sum  |         avg         | count 
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index a19ee08749..9dec75060d 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -76,6 +76,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_gathermerge             | on
  enable_hashagg                 | on
  enable_hashjoin                | on
+ enable_incrementalsort         | on
  enable_indexonlyscan           | on
  enable_indexscan               | on
  enable_material                | on
@@ -88,7 +89,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(16 rows)
+(17 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index 9397f72c13..cde4c2ee5a 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -546,6 +546,7 @@ drop table matest0 cascade;
 set enable_seqscan = off;
 set enable_indexscan = on;
 set enable_bitmapscan = off;
+set enable_incrementalsort = off;
 
 -- Check handling of duplicated, constant, or volatile targetlist items
 explain (costs off)
@@ -607,9 +608,26 @@ SELECT
     ORDER BY f.i LIMIT 10)
 FROM generate_series(1, 3) g(i);
 
+set enable_incrementalsort = on;
+
+-- check incremental sort is used when enabled
+explain (costs off)
+SELECT thousand, tenthous FROM tenk1
+UNION ALL
+SELECT thousand, thousand FROM tenk1
+ORDER BY thousand, tenthous;
+
+explain (costs off)
+SELECT x, y FROM
+  (SELECT thousand AS x, tenthous AS y FROM tenk1 a
+   UNION ALL
+   SELECT unique2 AS x, unique2 AS y FROM tenk1 b) s
+ORDER BY x, y;
+
 reset enable_seqscan;
 reset enable_indexscan;
 reset enable_bitmapscan;
+reset enable_incrementalsort;
 
 --
 -- Check handling of a constant-null CHECK constraint

Reply via email to