diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c
index baed981..639451a 100644
--- a/src/backend/access/common/printtup.c
+++ b/src/backend/access/common/printtup.c
@@ -26,9 +26,9 @@
 
 static void printtup_startup(DestReceiver *self, int operation,
 				 TupleDesc typeinfo);
-static void printtup(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_20(TupleTableSlot *slot, DestReceiver *self);
-static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_20(TupleTableSlot *slot, DestReceiver *self);
+static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self);
 static void printtup_shutdown(DestReceiver *self);
 static void printtup_destroy(DestReceiver *self);
 
@@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
  *		printtup --- print a tuple in protocol 3.0
  * ----------------
  */
-static void
+static bool
 printtup(TupleTableSlot *slot, DestReceiver *self)
 {
 	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
@@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self)
 	/* Return to caller's context, and flush row's temporary memory */
 	MemoryContextSwitchTo(oldcontext);
 	MemoryContextReset(myState->tmpcontext);
+
+	return true;
 }
 
 /* ----------------
  *		printtup_20 --- print a tuple in protocol 2.0
  * ----------------
  */
-static void
+static bool
 printtup_20(TupleTableSlot *slot, DestReceiver *self)
 {
 	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
@@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self)
 	/* Return to caller's context, and flush row's temporary memory */
 	MemoryContextSwitchTo(oldcontext);
 	MemoryContextReset(myState->tmpcontext);
+
+	return true;
 }
 
 /* ----------------
@@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
  *		debugtup - print one tuple for an interactive backend
  * ----------------
  */
-void
+bool
 debugtup(TupleTableSlot *slot, DestReceiver *self)
 {
 	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
@@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
 		printatt((unsigned) i + 1, typeinfo->attrs[i], value);
 	}
 	printf("\t----\n");
+
+	return true;
 }
 
 /* ----------------
@@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self)
  * This is largely same as printtup_20, except we use binary formatting.
  * ----------------
  */
-static void
+static bool
 printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
 {
 	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
@@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self)
 	/* Return to caller's context, and flush row's temporary memory */
 	MemoryContextSwitchTo(oldcontext);
 	MemoryContextReset(myState->tmpcontext);
+
+	return true;
 }
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f409aa7..42d4a44 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4411,7 +4411,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * copy_dest_receive --- receive one tuple
  */
-static void
+static bool
 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_copy    *myState = (DR_copy *) self;
@@ -4423,6 +4423,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	/* And send the data */
 	CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
 	myState->processed++;
+
+	return true;
 }
 
 /*
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 41183f6..418b0f6 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -62,7 +62,7 @@ typedef struct
 static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0};
 
 static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void intorel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void intorel_shutdown(DestReceiver *self);
 static void intorel_destroy(DestReceiver *self);
 
@@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * intorel_receive --- receive one tuple
  */
-static void
+static bool
 intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
@@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 				myState->bistate);
 
 	/* We know this is a newly created relation, so there are no indexes */
+
+	return true;
 }
 
 /*
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f0d9e94..7f14fd9 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -20,6 +20,7 @@
 #include "commands/defrem.h"
 #include "commands/prepare.h"
 #include "executor/hashjoin.h"
+#include "executor/nodeGather.h"
 #include "foreign/fdwapi.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
@@ -730,6 +731,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used)
 	{
 		case T_SeqScan:
 		case T_SampleScan:
+		case T_Gather:
 		case T_IndexScan:
 		case T_IndexOnlyScan:
 		case T_BitmapHeapScan:
@@ -853,6 +855,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_SampleScan:
 			pname = sname = "Sample Scan";
 			break;
+		case T_Gather:
+			pname = sname = "Gather";
+			break;
 		case T_IndexScan:
 			pname = sname = "Index Scan";
 			break;
@@ -1003,6 +1008,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	{
 		case T_SeqScan:
 		case T_SampleScan:
+		case T_Gather:
 		case T_BitmapHeapScan:
 		case T_TidScan:
 		case T_SubqueryScan:
@@ -1147,6 +1153,15 @@ ExplainNode(PlanState *planstate, List *ancestors,
 	}
 
 	/*
+	 * Aggregate instrumentation information of all the backend workers for
+	 * Gather node.  Though we already accumulate this information when last
+	 * tuple is fetched from Gather node, this is to cover cases when we don't
+	 * fetch all tuples from a node such as for Limit node.
+	 */
+	if (es->analyze && IsA(plan, Gather))
+		DestroyParallelSetupAndAccumStats((GatherState *) planstate);
+
+	/*
 	 * We have to forcibly clean up the instrumentation state because we
 	 * haven't done ExecutorEnd yet.  This is pretty grotty ...
 	 *
@@ -1276,6 +1291,14 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				show_instrumentation_count("Rows Removed by Filter", 1,
 										   planstate, es);
 			break;
+		case T_Gather:
+			show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+			if (plan->qual)
+				show_instrumentation_count("Rows Removed by Filter", 1,
+										   planstate, es);
+			ExplainPropertyInteger("Number of Workers",
+								   ((Gather *) plan)->num_workers, es);
+			break;
 		case T_FunctionScan:
 			if (es->verbose)
 			{
@@ -2335,6 +2358,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es)
 	{
 		case T_SeqScan:
 		case T_SampleScan:
+		case T_Gather:
 		case T_IndexScan:
 		case T_IndexOnlyScan:
 		case T_BitmapHeapScan:
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 5492e59..750a59c 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -56,7 +56,7 @@ typedef struct
 static int	matview_maintenance_depth = 0;
 
 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
@@ -422,7 +422,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * transientrel_receive --- receive one tuple
  */
-static void
+static bool
 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
@@ -441,6 +441,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 				myState->bistate);
 
 	/* We know this is a newly created relation, so there are no indexes */
+
+	return true;
 }
 
 /*
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f5e1e1a..51edd4c 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
        execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
-       nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
-       nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
+       nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \
+       nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
        nodeLimit.o nodeLockRows.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 93e1e9a..163650c 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -24,6 +24,7 @@
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeFunctionscan.h"
+#include "executor/nodeGather.h"
 #include "executor/nodeGroup.h"
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
@@ -160,6 +161,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSampleScan((SampleScanState *) node);
 			break;
 
+		case T_GatherState:
+			ExecReScanGather((GatherState *) node);
+			break;
+
 		case T_IndexScanState:
 			ExecReScanIndexScan((IndexScanState *) node);
 			break;
@@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node)
 			/* Simplify life for tablesample methods by disallowing this */
 			return false;
 
+		case T_Gather:
+			return false;
+
 		case T_IndexScan:
 			return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) &&
 				TargetListSupportsBackwardScan(node->targetlist);
diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c
index bcd287f..fd89204 100644
--- a/src/backend/executor/execCurrent.c
+++ b/src/backend/executor/execCurrent.c
@@ -262,6 +262,7 @@ search_plan_tree(PlanState *node, Oid table_oid)
 			 */
 		case T_SeqScanState:
 		case T_SampleScanState:
+		case T_GatherState:
 		case T_IndexScanState:
 		case T_IndexOnlyScanState:
 		case T_BitmapHeapScanState:
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 85ff46b..66e015b 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -45,9 +45,11 @@
 #include "commands/matview.h"
 #include "commands/trigger.h"
 #include "executor/execdebug.h"
+#include "executor/execParallel.h"
 #include "foreign/fdwapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
 #include "parser/parsetree.h"
 #include "storage/bufmgr.h"
@@ -354,7 +356,11 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 		(*dest->rShutdown) (dest);
 
 	if (queryDesc->totaltime)
+	{
+		/* Accumulate stats from parallel workers before stopping node */
+		(void) ExecParallelBufferUsageAccum((Node *) queryDesc->planstate);
 		InstrStopNode(queryDesc->totaltime, estate->es_processed);
+	}
 
 	MemoryContextSwitchTo(oldcontext);
 }
@@ -1581,7 +1587,15 @@ ExecutePlan(EState *estate,
 		 * practice, this is probably always the case at this point.)
 		 */
 		if (sendTuples)
-			(*dest->receiveSlot) (slot, dest);
+		{
+			/*
+			 * If we are not able to send the tuple, we assume the destination
+			 * has closed and no more tuples can be sent. If that's the case,
+			 * end the loop.
+			 */
+			if (!((*dest->receiveSlot) (slot, dest)))
+				break;
+		}
 
 		/*
 		 * Count tuples processed, if this is a SELECT.  (For other operation
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index a409a9a..3a3e0de 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -17,6 +17,7 @@
 
 #include "executor/execParallel.h"
 #include "executor/executor.h"
+#include "executor/nodeGather.h"
 #include "executor/tqueue.h"
 #include "nodes/nodeFuncs.h"
 #include "optimizer/planmain.h"
@@ -39,22 +40,6 @@
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
-/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
-	int plan_node_id;
-	slock_t mutex;
-	Instrumentation	instr;
-} SharedPlanStateInstrumentation;
-
-/* DSM structure for accumulating per-PlanState instrumentation. */
-struct SharedExecutorInstrumentation
-{
-	int instrument_options;
-	int ps_ninstrument;			/* # of ps_instrument structures following */
-	SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
-};
-
 /* Context object for ExecParallelEstimate. */
 typedef struct ExecParallelEstimateContext
 {
@@ -531,6 +516,33 @@ ExecParallelReportInstrumentation(PlanState *planstate,
 }
 
 /*
+ * ExecParallelBufferUsageAccum
+ *
+ * Recursively accumulate the stats for all the Gather nodes in a plan
+ * state tree.
+ */
+bool
+ExecParallelBufferUsageAccum(Node *node)
+{
+	if (node == NULL)
+		return false;
+
+	switch (nodeTag(node))
+	{
+		case T_GatherState:
+			{
+				DestroyParallelSetupAndAccumStats((GatherState *) node);
+				return true;
+			}
+			break;
+		default:
+			break;
+	}
+
+	return planstate_tree_walker((PlanState *) node, ExecParallelBufferUsageAccum, NULL);
+}
+
+/*
  * Main entrypoint for parallel query worker processes.
  *
  * We reach this function from ParallelMain, so the setup necessary to create
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 03c2feb..f24d39e 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -100,6 +100,7 @@
 #include "executor/nodeMergejoin.h"
 #include "executor/nodeModifyTable.h"
 #include "executor/nodeNestloop.h"
+#include "executor/nodeGather.h"
 #include "executor/nodeRecursiveunion.h"
 #include "executor/nodeResult.h"
 #include "executor/nodeSamplescan.h"
@@ -196,6 +197,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 													  estate, eflags);
 			break;
 
+		case T_Gather:
+			result = (PlanState *) ExecInitGather((Gather *) node,
+												  estate, eflags);
+			break;
+
 		case T_IndexScan:
 			result = (PlanState *) ExecInitIndexScan((IndexScan *) node,
 													 estate, eflags);
@@ -416,6 +422,10 @@ ExecProcNode(PlanState *node)
 			result = ExecSampleScan((SampleScanState *) node);
 			break;
 
+		case T_GatherState:
+			result = ExecGather((GatherState *) node);
+			break;
+
 		case T_IndexScanState:
 			result = ExecIndexScan((IndexScanState *) node);
 			break;
@@ -658,6 +668,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSampleScan((SampleScanState *) node);
 			break;
 
+		case T_GatherState:
+			ExecEndGather((GatherState *) node);
+			break;
+
 		case T_IndexScanState:
 			ExecEndIndexScan((IndexScanState *) node);
 			break;
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index a05d8b1..d5619bd 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -1313,7 +1313,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
 	ExecStoreVirtualTuple(slot);
 
 	/* send the tuple to the receiver */
-	(*tstate->dest->receiveSlot) (slot, tstate->dest);
+	(void) (*tstate->dest->receiveSlot) (slot, tstate->dest);
 
 	/* clean up */
 	ExecClearTuple(slot);
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 812a610..863bd64 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot,
 static void sql_exec_error_callback(void *arg);
 static void ShutdownSQLFunction(Datum arg);
 static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
-static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
+static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
 static void sqlfunction_shutdown(DestReceiver *self);
 static void sqlfunction_destroy(DestReceiver *self);
 
@@ -1903,7 +1903,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 /*
  * sqlfunction_receive --- receive one tuple
  */
-static void
+static bool
 sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_sqlfunction *myState = (DR_sqlfunction *) self;
@@ -1913,6 +1913,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Store the filtered tuple into the tuplestore */
 	tuplestore_puttupleslot(myState->tstore, slot);
+
+	return true;
 }
 
 /*
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
new file mode 100644
index 0000000..9ec0474
--- /dev/null
+++ b/src/backend/executor/nodeGather.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.c
+ *	  Support routines for scanning a relation via multiple workers.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeGather.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ *		ExecGather				scans a relation using worker backends.
+ *		ExecInitGather			creates and initializes a Gather node.
+ *		ExecEndGather			releases any storage allocated.
+ *		ExecReScanGather		Re-initialize the workers and rescans a relation via them.
+ */
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGather.h"
+#include "executor/nodeSubplan.h"
+#include "utils/rel.h"
+
+
+static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+
+
+/* ----------------------------------------------------------------
+ *		ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherState *
+ExecInitGather(Gather *node, EState *estate, int eflags)
+{
+	GatherState *gatherstate;
+
+	/* Gather node doesn't have innerPlan node. */
+	Assert(innerPlan(node) == NULL);
+
+	/*
+	 * create state structure
+	 */
+	gatherstate = makeNode(GatherState);
+	gatherstate->ss.ps.plan = (Plan *) node;
+	gatherstate->ss.ps.state = estate;
+	gatherstate->fs_workersReady = false;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * create expression context for node
+	 */
+	ExecAssignExprContext(estate, &gatherstate->ss.ps);
+
+	/*
+	 * initialize child expressions
+	 */
+	gatherstate->ss.ps.targetlist = (List *)
+		ExecInitExpr((Expr *) node->scan.plan.targetlist,
+					 (PlanState *) gatherstate);
+	gatherstate->ss.ps.qual = (List *)
+		ExecInitExpr((Expr *) node->scan.plan.qual,
+					 (PlanState *) gatherstate);
+
+	/*
+	 * tuple table initialization
+	 */
+	ExecInitResultTupleSlot(estate, &gatherstate->ss.ps);
+	ExecInitScanTupleSlot(estate, &gatherstate->ss);
+
+	/*
+	 * now initialize outer plan
+	 */
+	outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+
+	gatherstate->ss.ps.ps_TupFromTlist = false;
+
+	/*
+	 * Initialize result tuple type and projection info.
+	 */
+	ExecAssignResultTypeFromTL(&gatherstate->ss.ps);
+	ExecAssignProjectionInfo(&gatherstate->ss.ps, NULL);
+
+	return gatherstate;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecGather(node)
+ *
+ *		Scans the relation via multiple workers and returns
+ *		the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+	int			i;
+	TupleTableSlot *slot;
+
+	/*
+	 * Initialize the parallel context and workers on first execution. We do
+	 * this on first execution rather than during node initialization, as it
+	 * needs to allocate large dynamic segement, so it is better to do if it
+	 * is really needed.
+	 */
+	if (!node->pei->pcxt)
+	{
+		EState	   *estate = node->ss.ps.state;
+		bool		any_worker_launched = false;
+
+		/* Initialize the workers required to execute Gather node. */
+		node->pei = ExecInitParallelPlan(node->ss.ps.lefttree,
+										 estate,
+							   ((Gather *) (node->ss.ps.plan))->num_workers);
+
+		outerPlanState(node)->toc = node->pei->pcxt->toc;
+
+		/*
+		 * Register backend workers. If the required number of workers are not
+		 * available then we perform the scan with available workers and if
+		 * there are no more workers available, then the Gather node will just
+		 * scan locally.
+		 */
+		LaunchParallelWorkers(node->pei->pcxt);
+
+		node->funnel = CreateTupleQueueFunnel();
+
+		for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+		{
+			if (node->pei->pcxt->worker[i].bgwhandle)
+			{
+				shm_mq_set_handle((node->pei->tqueue)[i], node->pei->pcxt->worker[i].bgwhandle);
+				RegisterTupleQueueOnFunnel(node->funnel, (node->pei->tqueue)[i]);
+				any_worker_launched = true;
+			}
+		}
+
+		if (any_worker_launched)
+			node->fs_workersReady = true;
+	}
+
+	slot = gather_getnext(node);
+
+	if (TupIsNull(slot))
+	{
+		/*
+		 * Destroy the parallel context once we complete fetching all the
+		 * tuples, this will ensure that if in the same statement we need to
+		 * have Gather node for multiple parts of statement, it won't
+		 * accumulate lot of dsm segments and workers can be made available to
+		 * use by other parts of statement.
+		 */
+		DestroyParallelSetupAndAccumStats(node);
+	}
+	return slot;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecEndGather
+ *
+ *		frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGather(GatherState *node)
+{
+	Relation	relation;
+
+	relation = node->ss.ss_currentRelation;
+
+	/*
+	 * Free the exprcontext
+	 */
+	ExecFreeExprContext(&node->ss.ps);
+
+	/*
+	 * clean out the tuple table
+	 */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+
+	ExecEndNode(outerPlanState(node));
+
+	DestroyParallelSetupAndAccumStats(node);
+}
+
+/*
+ * gather_getnext
+ *
+ * Get the next tuple from shared memory queue.  This function
+ * is reponsible for fetching tuples from all the queues associated
+ * with worker backends used in Gather node execution and if there is
+ * no data available from queues or no worker is available, it does
+ * fetch the data from local node.
+ */
+TupleTableSlot *
+gather_getnext(GatherState *gatherstate)
+{
+	PlanState  *outerPlan;
+	TupleTableSlot *outerTupleSlot;
+	TupleTableSlot *slot;
+	HeapTuple	tup;
+
+	/*
+	 * We can use projection info of Gather for the tuples received from
+	 * worker backends as currently for all cases worker backends sends the
+	 * projected tuple as required by Gather node.
+	 */
+	slot = gatherstate->ss.ps.ps_ProjInfo->pi_slot;
+
+	while ((!gatherstate->all_workers_done && gatherstate->fs_workersReady) ||
+		   !gatherstate->local_scan_done)
+	{
+		if (!gatherstate->all_workers_done && gatherstate->fs_workersReady)
+		{
+			/* wait only if local scan is done */
+			tup = TupleQueueFunnelNext(gatherstate->funnel,
+									   !gatherstate->local_scan_done,
+									   &gatherstate->all_workers_done);
+
+			if (HeapTupleIsValid(tup))
+			{
+				ExecStoreTuple(tup,		/* tuple to store */
+							   slot,	/* slot to store in */
+							   InvalidBuffer,	/* buffer associated with this
+												 * tuple */
+							   true);	/* pfree this pointer if not from heap */
+
+				return slot;
+			}
+		}
+		if (!gatherstate->local_scan_done)
+		{
+			outerPlan = outerPlanState(gatherstate);
+
+			outerTupleSlot = ExecProcNode(outerPlan);
+
+			if (!TupIsNull(outerTupleSlot))
+				return outerTupleSlot;
+
+			gatherstate->local_scan_done = true;
+		}
+	}
+
+	return ExecClearTuple(slot);
+}
+
+/* ----------------------------------------------------------------
+ *		DestroyParallelSetupAndAccumStats
+ *
+ *		Destroy the setup for parallel workers.  Collect all the
+ *		stats after workers are stopped, else some work done by
+ *		workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+DestroyParallelSetupAndAccumStats(GatherState *node)
+{
+	if (node->pei->pcxt)
+	{
+		/*
+		 * Ensure all workers have finished before destroying the parallel
+		 * context to ensure a clean exit.
+		 */
+		if (node->fs_workersReady)
+		{
+			DestroyTupleQueueFunnel(node->funnel);
+			node->funnel = NULL;
+		}
+
+		ExecParallelFinish(node->pei);
+
+		/* destroy parallel context. */
+		DestroyParallelContext(node->pei->pcxt);
+		node->pei->pcxt = NULL;
+
+		node->fs_workersReady = false;
+		node->all_workers_done = false;
+		node->local_scan_done = false;
+	}
+}
+
+/* ----------------------------------------------------------------
+ *						Join Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecReScanGather
+ *
+ *		Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGather(GatherState *node)
+{
+	/*
+	 * Re-initialize the parallel context and workers to perform rescan of
+	 * relation.  We want to gracefully shutdown all the workers so that they
+	 * should be able to propagate any error or other information to master
+	 * backend before dying.
+	 */
+	DestroyParallelSetupAndAccumStats(node);
+
+	ExecReScan(node->ss.ps.lefttree);
+}
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 300401e..a60f228 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
  *		store tuple retrieved by Executor into SPITupleTable
  *		of current SPI procedure
  */
-void
+bool
 spi_printtup(TupleTableSlot *slot, DestReceiver *self)
 {
 	SPITupleTable *tuptable;
@@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self)
 	(tuptable->free)--;
 
 	MemoryContextSwitchTo(oldcxt);
+
+	return true;
 }
 
 /*
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 67143d3..28ba49f 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -41,14 +41,24 @@ struct TupleQueueFunnel
 /*
  * Receive a tuple.
  */
-static void
+static bool
 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 	HeapTuple	tuple;
+	shm_mq_result result;
 
 	tuple = ExecMaterializeSlot(slot);
-	shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+	result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+
+	if (result == SHM_MQ_DETACHED)
+		return false;
+	else if (result != SHM_MQ_SUCCESS)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to send tuples")));
+
+	return true;
 }
 
 /*
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index c1fdeb7..b0862ae 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -37,8 +37,8 @@ typedef struct
 } TStoreState;
 
 
-static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
-static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
  * Receive a tuple from the executor and store it in the tuplestore.
  * This is for the easy case where we don't have to detoast.
  */
-static void
+static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
 {
 	TStoreState *myState = (TStoreState *) self;
 
 	tuplestore_puttupleslot(myState->tstore, slot);
+
+	return true;
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
  * This is for the case where we have to detoast any toasted values.
  */
-static void
+static bool
 tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 {
 	TStoreState *myState = (TStoreState *) self;
@@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 	/* And release any temporary detoasted values */
 	for (i = 0; i < nfree; i++)
 		pfree(DatumGetPointer(myState->tofree[i]));
+
+	return true;
 }
 
 /*
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 4b4ddec..f308063 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -383,6 +383,27 @@ _copySampleScan(const SampleScan *from)
 }
 
 /*
+ * _copyGather
+ */
+static Gather *
+_copyGather(const Gather *from)
+{
+	Gather	   *newnode = makeNode(Gather);
+
+	/*
+	 * copy node superclass fields
+	 */
+	CopyScanFields((const Scan *) from, (Scan *) newnode);
+
+	/*
+	 * copy remainder of node
+	 */
+	COPY_SCALAR_FIELD(num_workers);
+
+	return newnode;
+}
+
+/*
  * _copyIndexScan
  */
 static IndexScan *
@@ -4241,6 +4262,9 @@ copyObject(const void *from)
 		case T_SampleScan:
 			retval = _copySampleScan(from);
 			break;
+		case T_Gather:
+			retval = _copyGather(from);
+			break;
 		case T_IndexScan:
 			retval = _copyIndexScan(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index ee9c360..bc1ba61 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -459,6 +459,16 @@ _outSampleScan(StringInfo str, const SampleScan *node)
 }
 
 static void
+_outGather(StringInfo str, const Gather *node)
+{
+	WRITE_NODE_TYPE("GATHER");
+
+	_outScanInfo(str, (const Scan *) node);
+
+	WRITE_UINT_FIELD(num_workers);
+}
+
+static void
 _outIndexScan(StringInfo str, const IndexScan *node)
 {
 	WRITE_NODE_TYPE("INDEXSCAN");
@@ -3009,6 +3019,9 @@ _outNode(StringInfo str, const void *obj)
 			case T_SampleScan:
 				_outSampleScan(str, obj);
 				break;
+			case T_Gather:
+				_outGather(str, obj);
+				break;
 			case T_IndexScan:
 				_outIndexScan(str, obj);
 				break;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index d107d76..4bb3a48 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -11,6 +11,8 @@
  *	cpu_tuple_cost		Cost of typical CPU time to process a tuple
  *	cpu_index_tuple_cost  Cost of typical CPU time to process an index tuple
  *	cpu_operator_cost	Cost of CPU time to execute an operator or function
+ *	cpu_tuple_comm_cost Cost of CPU time to pass a tuple from worker to master backend
+ *	parallel_setup_cost Cost of setting up shared memory for parallelism
  *
  * We expect that the kernel will typically do some amount of read-ahead
  * optimization; this in conjunction with seek costs means that seq_page_cost
@@ -102,11 +104,15 @@ double		random_page_cost = DEFAULT_RANDOM_PAGE_COST;
 double		cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
 double		cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
 double		cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
+double		cpu_tuple_comm_cost = DEFAULT_CPU_TUPLE_COMM_COST;
+double		parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;
 
 int			effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
 
 Cost		disable_cost = 1.0e10;
 
+int			degree_of_parallelism = 0;
+
 bool		enable_seqscan = true;
 bool		enable_indexscan = true;
 bool		enable_indexonlyscan = true;
@@ -290,6 +296,42 @@ cost_samplescan(Path *path, PlannerInfo *root,
 }
 
 /*
+ * cost_gather
+ *	  Determines and returns the cost of gather path.
+ *
+ * 'baserel' is the relation to be scanned
+ * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ */
+void
+cost_gather(GatherPath *path, PlannerInfo *root,
+			RelOptInfo *baserel, ParamPathInfo *param_info)
+{
+	Cost		startup_cost = 0;
+	Cost		run_cost = 0;
+
+	/* Should only be applied to base relations */
+	Assert(baserel->relid > 0);
+	Assert(baserel->rtekind == RTE_RELATION);
+
+	/* Mark the path with the correct row estimate */
+	if (param_info)
+		path->path.rows = param_info->ppi_rows;
+	else
+		path->path.rows = baserel->rows;
+
+	startup_cost = path->subpath->startup_cost;
+
+	run_cost = path->subpath->total_cost - path->subpath->startup_cost;
+
+	/* Parallel setup and communication cost. */
+	startup_cost += parallel_setup_cost;
+	run_cost += cpu_tuple_comm_cost * baserel->tuples;
+
+	path->path.startup_cost = startup_cost;
+	path->path.total_cost = (startup_cost + run_cost);
+}
+
+/*
  * cost_index
  *	  Determines and returns the cost of scanning a relation using an index.
  *
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 404c6f5..183e77c 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path,
 					List *tlist, List *scan_clauses);
 static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path,
 					   List *tlist, List *scan_clauses);
+static Gather *create_gather_plan(PlannerInfo *root,
+				   GatherPath *best_path);
 static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path,
 					  List *tlist, List *scan_clauses, bool indexonly);
 static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
@@ -104,6 +106,9 @@ static void copy_plan_costsize(Plan *dest, Plan *src);
 static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
 static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
 				TableSampleClause *tsc);
+static Gather *make_gather(List *qptlist, List *qpqual,
+			Index scanrelid, int nworkers,
+			Plan *subplan);
 static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid,
 			   Oid indexid, List *indexqual, List *indexqualorig,
 			   List *indexorderby, List *indexorderbyorig,
@@ -273,6 +278,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path)
 			plan = create_unique_plan(root,
 									  (UniquePath *) best_path);
 			break;
+		case T_Gather:
+			plan = (Plan *) create_gather_plan(root,
+											   (GatherPath *) best_path);
+			break;
 		default:
 			elog(ERROR, "unrecognized node type: %d",
 				 (int) best_path->pathtype);
@@ -560,6 +569,7 @@ disuse_physical_tlist(PlannerInfo *root, Plan *plan, Path *path)
 	{
 		case T_SeqScan:
 		case T_SampleScan:
+		case T_Gather:
 		case T_IndexScan:
 		case T_IndexOnlyScan:
 		case T_BitmapHeapScan:
@@ -1194,6 +1204,66 @@ create_samplescan_plan(PlannerInfo *root, Path *best_path,
 }
 
 /*
+ * create_gather_plan
+ *
+ * Returns a gather plan for the base relation scanned by
+ * 'best_path'.
+ */
+static Gather *
+create_gather_plan(PlannerInfo *root, GatherPath *best_path)
+{
+	Gather	   *gather_plan;
+	Plan	   *subplan;
+	List	   *tlist;
+	RelOptInfo *rel = best_path->path.parent;
+	Index		scan_relid = best_path->path.parent->relid;
+
+	/*
+	 * For table scans, rather than using the relation targetlist (which is
+	 * only those Vars actually needed by the query), we prefer to generate a
+	 * tlist containing all Vars in order.  This will allow the executor to
+	 * optimize away projection of the table tuples, if possible.  (Note that
+	 * planner.c may replace the tlist we generate here, forcing projection to
+	 * occur.)
+	 */
+	if (use_physical_tlist(root, rel))
+	{
+		tlist = build_physical_tlist(root, rel);
+		/* if fail because of dropped cols, use regular method */
+		if (tlist == NIL)
+			tlist = build_path_tlist(root, &best_path->path);
+	}
+	else
+	{
+		tlist = build_path_tlist(root, &best_path->path);
+	}
+
+	/* it should be a base rel... */
+	Assert(scan_relid > 0);
+	Assert(best_path->path.parent->rtekind == RTE_RELATION);
+
+	subplan = create_plan_recurse(root, best_path->subpath);
+
+	/*
+	 * quals for subplan and top level plan are same as either all the quals
+	 * are pushed to subplan (partialseqscan plan) or parallel plan won't be
+	 * choosen.
+	 */
+	gather_plan = make_gather(tlist,
+							  subplan->qual,
+							  scan_relid,
+							  best_path->num_workers,
+							  subplan);
+
+	copy_path_costsize(&gather_plan->scan.plan, &best_path->path);
+
+	/* use parallel mode for parallel plans. */
+	root->glob->parallelModeNeeded = true;
+
+	return gather_plan;
+}
+
+/*
  * create_indexscan_plan
  *	  Returns an indexscan plan for the base relation scanned by 'best_path'
  *	  with restriction clauses 'scan_clauses' and targetlist 'tlist'.
@@ -3462,6 +3532,27 @@ make_samplescan(List *qptlist,
 	return node;
 }
 
+static Gather *
+make_gather(List *qptlist,
+			List *qpqual,
+			Index scanrelid,
+			int nworkers,
+			Plan *subplan)
+{
+	Gather	   *node = makeNode(Gather);
+	Plan	   *plan = &node->scan.plan;
+
+	/* cost should be inserted by caller */
+	plan->targetlist = qptlist;
+	plan->qual = qpqual;
+	plan->lefttree = subplan;
+	plan->righttree = NULL;
+	node->scan.scanrelid = scanrelid;
+	node->num_workers = nworkers;
+
+	return node;
+}
+
 static IndexScan *
 make_indexscan(List *qptlist,
 			   List *qpqual,
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 3c81697..97a4156 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -470,6 +470,26 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 					fix_scan_expr(root, (Node *) splan->tablesample, rtoffset);
 			}
 			break;
+		case T_Gather:
+			{
+				Gather	   *splan = (Gather *) plan;
+
+				/*
+				 * target list for leftree of gather plan should be same as
+				 * for gather scan as both nodes need to produce same
+				 * projection. We don't want to do this assignment after
+				 * fixing references as that will be done separately for
+				 * lefttree node.
+				 */
+				splan->scan.plan.lefttree->targetlist = splan->scan.plan.targetlist;
+
+				splan->scan.scanrelid += rtoffset;
+				splan->scan.plan.targetlist =
+					fix_scan_list(root, splan->scan.plan.targetlist, rtoffset);
+				splan->scan.plan.qual =
+					fix_scan_list(root, splan->scan.plan.qual, rtoffset);
+			}
+			break;
 		case T_IndexScan:
 			{
 				IndexScan  *splan = (IndexScan *) plan;
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index d0bc412..78f3ce1 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2243,6 +2243,10 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
 			context.paramids = bms_add_members(context.paramids, scan_params);
 			break;
 
+		case T_Gather:
+			context.paramids = bms_add_members(context.paramids, scan_params);
+			break;
+
 		case T_IndexScan:
 			finalize_primnode((Node *) ((IndexScan *) plan)->indexqual,
 							  &context);
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 935bc2b..18ef9dc 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -732,6 +732,32 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer
 }
 
 /*
+ * create_gather_path
+ *
+ *	  Creates a path corresponding to a gather scan, returning the
+ *	  pathnode.
+ */
+GatherPath *
+create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
+				   Relids required_outer, int nworkers)
+{
+	GatherPath *pathnode = makeNode(GatherPath);
+
+	pathnode->path.pathtype = T_Gather;
+	pathnode->path.parent = rel;
+	pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
+														  required_outer);
+	pathnode->path.pathkeys = NIL;		/* Gather has unordered result */
+
+	pathnode->subpath = subpath;
+	pathnode->num_workers = nworkers;
+
+	cost_gather(pathnode, root, rel, pathnode->path.param_info);
+
+	return pathnode;
+}
+
+/*
  * create_index_path
  *	  Creates a path node for an index scan.
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index baa43b2..58fc6d3 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -103,6 +103,7 @@
 #include "miscadmin.h"
 #include "pg_getopt.h"
 #include "pgstat.h"
+#include "optimizer/cost.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/fork_process.h"
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index d645751..bed1ef2 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -45,9 +45,10 @@
  *		dummy DestReceiver functions
  * ----------------
  */
-static void
+static bool
 donothingReceive(TupleTableSlot *slot, DestReceiver *self)
 {
+	return true;
 }
 
 static void
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d1f43c5..3781d81 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -42,6 +42,8 @@
 #include "catalog/pg_type.h"
 #include "commands/async.h"
 #include "commands/prepare.h"
+#include "executor/execParallel.h"
+#include "executor/tqueue.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 0df86a2..5eab231 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1117,7 +1117,13 @@ RunFromStore(Portal portal, ScanDirection direction, long count,
 			if (!ok)
 				break;
 
-			(*dest->receiveSlot) (slot, dest);
+			/*
+			 * If we are not able to send the tuple, we assume the destination
+			 * has closed and no more tuples can be sent. If that's the case,
+			 * end the loop.
+			 */
+			if (!((*dest->receiveSlot) (slot, dest)))
+				break;
 
 			ExecClearTuple(slot);
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 17053af..f14b24a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -588,6 +588,8 @@ const char *const config_group_names[] =
 	gettext_noop("Statistics / Query and Index Statistics Collector"),
 	/* AUTOVACUUM */
 	gettext_noop("Autovacuum"),
+	/* PARALLEL_QUERY */
+	gettext_noop("degree_of_parallelism"),
 	/* CLIENT_CONN */
 	gettext_noop("Client Connection Defaults"),
 	/* CLIENT_CONN_STATEMENT */
@@ -2535,6 +2537,16 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"degree_of_parallelism", PGC_SUSET, PARALLEL_QUERY,
+			gettext_noop("Sets the maximum number of simultaneously running backend worker processes."),
+			NULL
+		},
+		&degree_of_parallelism,
+		0, 0, MAX_BACKENDS,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
 			gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
 			NULL,
@@ -2711,6 +2723,26 @@ static struct config_real ConfigureNamesReal[] =
 		DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX,
 		NULL, NULL, NULL
 	},
+	{
+		{"cpu_tuple_comm_cost", PGC_USERSET, QUERY_TUNING_COST,
+			gettext_noop("Sets the planner's estimate of the cost of "
+				  "passing each tuple (row) from worker to master backend."),
+			NULL
+		},
+		&cpu_tuple_comm_cost,
+		DEFAULT_CPU_TUPLE_COMM_COST, 0, DBL_MAX,
+		NULL, NULL, NULL
+	},
+	{
+		{"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST,
+			gettext_noop("Sets the planner's estimate of the cost of "
+				  "setting up environment (shared memory) for parallelism."),
+			NULL
+		},
+		&parallel_setup_cost,
+		DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX,
+		NULL, NULL, NULL
+	},
 
 	{
 		{"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 8c65287..16b574c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -290,6 +290,8 @@
 #cpu_tuple_cost = 0.01			# same scale as above
 #cpu_index_tuple_cost = 0.005		# same scale as above
 #cpu_operator_cost = 0.0025		# same scale as above
+#cpu_tuple_comm_cost = 0.1		# same scale as above
+#parallel_setup_cost = 1000.0	# same scale as above
 #effective_cache_size = 4GB
 
 # - Genetic Query Optimizer -
@@ -502,6 +504,11 @@
 					# autovacuum, -1 means use
 					# vacuum_cost_limit
 
+#------------------------------------------------------------------------------
+# PARALLEL_QUERY PARAMETERS
+#------------------------------------------------------------------------------
+
+#degree_of_parallelism = 0		# max number of worker backend subprocesses
 
 #------------------------------------------------------------------------------
 # CLIENT CONNECTION DEFAULTS
diff --git a/src/include/access/printtup.h b/src/include/access/printtup.h
index 46c4148..92ec882 100644
--- a/src/include/access/printtup.h
+++ b/src/include/access/printtup.h
@@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist,
 
 extern void debugStartup(DestReceiver *self, int operation,
 			 TupleDesc typeinfo);
-extern void debugtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool debugtup(TupleTableSlot *slot, DestReceiver *self);
 
 /* XXX these are really in executor/spi.c */
 extern void spi_dest_startup(DestReceiver *self, int operation,
 				 TupleDesc typeinfo);
-extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self);
+extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self);
 
 #endif   /* PRINTTUP_H */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 4fc797a..131df82 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -13,24 +13,12 @@
 #ifndef EXECPARALLEL_H
 #define EXECPARALLEL_H
 
-#include "access/parallel.h"
 #include "nodes/execnodes.h"
-#include "nodes/parsenodes.h"
-#include "nodes/plannodes.h"
 
-typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
-
-typedef struct ParallelExecutorInfo
-{
-	PlanState *planstate;
-	ParallelContext *pcxt;
-	BufferUsage *buffer_usage;
-	SharedExecutorInstrumentation *instrumentation;
-	shm_mq_handle **tqueue;
-}	ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
 					 EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
+extern bool ExecParallelBufferUsageAccum(Node *node);
 
 #endif   /* EXECPARALLEL_H */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index f28e56c..795564c 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -14,6 +14,7 @@
 #define INSTRUMENT_H
 
 #include "portability/instr_time.h"
+#include "storage/spin.h"
 
 
 typedef struct BufferUsage
@@ -63,6 +64,22 @@ typedef struct Instrumentation
 	BufferUsage bufusage;		/* Total buffer usage */
 } Instrumentation;
 
+/* DSM structure for accumulating per-PlanState instrumentation. */
+typedef struct SharedPlanStateInstrumentation
+{
+	int plan_node_id;
+	slock_t mutex;
+	Instrumentation	instr;
+} SharedPlanStateInstrumentation;
+
+/* DSM structure for accumulating per-PlanState instrumentation. */
+typedef struct SharedExecutorInstrumentation
+{
+	int instrument_options;
+	int ps_ninstrument;			/* # of ps_instrument structures following */
+	SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+} SharedExecutorInstrumentation;
+
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
 
 extern Instrumentation *InstrAlloc(int n, int instrument_options);
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
new file mode 100644
index 0000000..fc99633
--- /dev/null
+++ b/src/include/executor/nodeGather.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.h
+ *		prototypes for nodeGather.c
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeGather.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEGATHER_H
+#define NODEGATHER_H
+
+#include "nodes/execnodes.h"
+
+extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern TupleTableSlot *ExecGather(GatherState *node);
+extern void ExecEndGather(GatherState *node);
+extern void DestroyParallelSetupAndAccumStats(GatherState *node);
+extern void ExecReScanGather(GatherState *node);
+
+#endif   /* NODEGATHER_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ae2f3e..6a8c107 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -16,7 +16,9 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/parallel.h"
 #include "executor/instrument.h"
+#include "executor/tqueue.h"
 #include "lib/pairingheap.h"
 #include "nodes/params.h"
 #include "nodes/plannodes.h"
@@ -421,7 +423,6 @@ typedef struct EState
 	bool	   *es_epqScanDone; /* true if EPQ tuple has been fetched */
 } EState;
 
-
 /*
  * ExecRowMark -
  *	   runtime representation of FOR [KEY] UPDATE/SHARE clauses
@@ -1049,6 +1050,13 @@ typedef struct PlanState
 	Bitmapset  *chgParam;		/* set of IDs of changed Params */
 
 	/*
+	 * At execution time, parallel scan descriptor is initialized and stored
+	 * in dynamic shared memory segment by master backend and parallel workers
+	 * retrieve it from shared memory.
+	 */
+	shm_toc    *toc;
+
+	/*
 	 * Other run-time state needed by most if not all node types.
 	 */
 	TupleTableSlot *ps_ResultTupleSlot; /* slot for my result tuples */
@@ -1058,6 +1066,15 @@ typedef struct PlanState
 								 * functions in targetlist */
 } PlanState;
 
+typedef struct ParallelExecutorInfo
+{
+	PlanState *planstate;
+	ParallelContext *pcxt;
+	BufferUsage *buffer_usage;
+	SharedExecutorInstrumentation *instrumentation;
+	shm_mq_handle **tqueue;
+} ParallelExecutorInfo;
+
 /* ----------------
  *	these are defined to avoid confusion problems with "left"
  *	and "right" and "inner" and "outer".  The convention is that
@@ -1273,6 +1290,27 @@ typedef struct SampleScanState
 } SampleScanState;
 
 /*
+ * GatherState extends ScanState by storing additional information
+ * related to parallel workers.
+ *		ParallelExecutorInfo	parallel execution info for managing generic state information
+ *							required for parallelism.
+ *		funnel				maintains the runtime information about queue's used to
+ *							receive data from parallel workers.
+ *		fs_workersReady		indicates that workers are launched.
+ *		all_workers_done	indicates that all the data from workers has been received.
+ *		local_scan_done		indicates that local scan is compleleted.
+ */
+typedef struct GatherState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	ParallelExecutorInfo *pei;
+	TupleQueueFunnel *funnel;
+	bool		fs_workersReady;
+	bool		all_workers_done;
+	bool		local_scan_done;
+} GatherState;
+
+/*
  * These structs store information about index quals that don't have simple
  * constant right-hand sides.  See comments for ExecIndexBuildScanKeys()
  * for discussion.
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 274480e..c014532 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -52,6 +52,7 @@ typedef enum NodeTag
 	T_Scan,
 	T_SeqScan,
 	T_SampleScan,
+	T_Gather,
 	T_IndexScan,
 	T_IndexOnlyScan,
 	T_BitmapIndexScan,
@@ -99,6 +100,7 @@ typedef enum NodeTag
 	T_ScanState,
 	T_SeqScanState,
 	T_SampleScanState,
+	T_GatherState,
 	T_IndexScanState,
 	T_IndexOnlyScanState,
 	T_BitmapIndexScanState,
@@ -223,6 +225,7 @@ typedef enum NodeTag
 	T_IndexOptInfo,
 	T_ParamPathInfo,
 	T_Path,
+	T_GatherPath,
 	T_IndexPath,
 	T_BitmapHeapPath,
 	T_BitmapAndPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 1e2d2bb..d1fea12 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -297,6 +297,16 @@ typedef struct SampleScan
 	struct TableSampleClause *tablesample;
 } SampleScan;
 
+/* ------------
+ *		Gather node
+ * ------------
+ */
+typedef struct Gather
+{
+	Scan		scan;
+	int			num_workers;
+} Gather;
+
 /* ----------------
  *		index scan node
  *
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 961b5d1..20192c9 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -763,6 +763,13 @@ typedef struct Path
 	/* pathkeys is a List of PathKey nodes; see above */
 } Path;
 
+typedef struct GatherPath
+{
+	Path		path;
+	Path	   *subpath;		/* path for each worker */
+	int			num_workers;
+} GatherPath;
+
 /* Macro for extracting a path's parameterization relids; beware double eval */
 #define PATH_REQ_OUTER(path)  \
 	((path)->param_info ? (path)->param_info->ppi_req_outer : (Relids) NULL)
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index dd43e45..7d536dc 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -26,6 +26,13 @@
 #define DEFAULT_CPU_TUPLE_COST	0.01
 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005
 #define DEFAULT_CPU_OPERATOR_COST  0.0025
+#define DEFAULT_CPU_TUPLE_COMM_COST 0.1
+/*
+ * XXX - We have kept reasonably high value for default parallel
+ * setup cost. In future we might want to change this value based
+ * on results.
+ */
+#define DEFAULT_PARALLEL_SETUP_COST  1000.0
 
 #define DEFAULT_EFFECTIVE_CACHE_SIZE  524288	/* measured in pages */
 
@@ -48,8 +55,11 @@ extern PGDLLIMPORT double random_page_cost;
 extern PGDLLIMPORT double cpu_tuple_cost;
 extern PGDLLIMPORT double cpu_index_tuple_cost;
 extern PGDLLIMPORT double cpu_operator_cost;
+extern PGDLLIMPORT double cpu_tuple_comm_cost;
+extern PGDLLIMPORT double parallel_setup_cost;
 extern PGDLLIMPORT int effective_cache_size;
 extern Cost disable_cost;
+extern int	degree_of_parallelism;
 extern bool enable_seqscan;
 extern bool enable_indexscan;
 extern bool enable_indexonlyscan;
@@ -70,6 +80,8 @@ extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 			 ParamPathInfo *param_info);
 extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
 				ParamPathInfo *param_info);
+extern void cost_gather(GatherPath *path, PlannerInfo *root,
+			RelOptInfo *baserel, ParamPathInfo *param_info);
 extern void cost_index(IndexPath *path, PlannerInfo *root,
 		   double loop_count);
 extern void cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 161644c..cc00ba5 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -34,6 +34,9 @@ extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel,
 					Relids required_outer);
 extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel,
 					   Relids required_outer);
+extern GatherPath *create_gather_path(PlannerInfo *root,
+				   RelOptInfo *rel, Path *subpath, Relids required_outer,
+				   int nworkers);
 extern IndexPath *create_index_path(PlannerInfo *root,
 				  IndexOptInfo *index,
 				  List *indexclauses,
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index b560672..91acd60 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -104,7 +104,9 @@ typedef enum
  *		pointers that the executor must call.
  *
  * Note: the receiveSlot routine must be passed a slot containing a TupleDesc
- * identical to the one given to the rStartup routine.
+ * identical to the one given to the rStartup routine.  It returns bool where
+ * a "true" value means "continue processing" and a "false" value means
+ * "stop early, just as if we'd reached the end of the scan".
  * ----------------
  */
 typedef struct _DestReceiver DestReceiver;
@@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver;
 struct _DestReceiver
 {
 	/* Called for each tuple to be output: */
-	void		(*receiveSlot) (TupleTableSlot *slot,
+	bool		(*receiveSlot) (TupleTableSlot *slot,
 											DestReceiver *self);
 	/* Per-executor-run initialization and shutdown: */
 	void		(*rStartup) (DestReceiver *self,
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 96c5b8b..8ae7a16 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -19,6 +19,7 @@
 #ifndef TCOPPROT_H
 #define TCOPPROT_H
 
+#include "executor/execParallel.h"
 #include "nodes/params.h"
 #include "nodes/parsenodes.h"
 #include "nodes/plannodes.h"
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 7a58ddb..3505d31 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -85,6 +85,7 @@ enum config_group
 	STATS_MONITORING,
 	STATS_COLLECTOR,
 	AUTOVACUUM,
+	PARALLEL_QUERY,
 	CLIENT_CONN,
 	CLIENT_CONN_STATEMENT,
 	CLIENT_CONN_LOCALE,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0e149ea..6e1456b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -707,6 +707,9 @@ FunctionParameterMode
 FunctionScan
 FunctionScanPerFuncState
 FunctionScanState
+Gather
+GatherPath
+GatherState
 FuzzyAttrMatchState
 GBT_NUMKEY
 GBT_NUMKEY_R
