On Thu, Oct 15, 2020 at 3:18 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > > > 1. How to represent the parallel insert for CTAS in explain plans?
> > > > explain CTAS shows the plan for only the SELECT part. How about
> > > > some textual info along with the Gather node? I'm not quite sure on
> > > > this point, any suggestions are welcome.
> > >
> > > I am also not sure about this point because we don't display anything
> > > for the DDL part in explain. Can you propose by showing some example
> > > of what you have in mind?
> >
> > I thought we could have something like this.
> >
> >      Gather  (cost=1000.00..108738.90 rows=0 width=8)
> >      Workers Planned: 2 Parallel Insert on t_test1
> >         ->  Parallel Seq Scan on t_test  (cost=0.00..106748.00
rows=4954 width=8)
> >              Filter: (many < 10000)
> >
> maybe something like below:
> Gather  (cost=1000.00..108738.90 rows=0 width=8)
>    -> Create t_test1
>        ->  Parallel Seq Scan on t_test
> I don't know what is the best thing to do here. I think for the
> temporary purpose you can keep something like above then once the
> patch is matured then we can take a separate opinion for this.

Agreed. Here's a snapshot of explain with the change suggested.

FROM t1;
                                   QUERY PLAN
 Gather (actual time=970.524..972.913 rows=0 loops=1)
  * ->  Create t1_test*
     Workers Planned: 2
     Workers Launched: 2
     ->  Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333
 Planning Time: 0.049 ms
 Execution Time: 973.733 ms

> I think there is no reason why one can't use ORDER BY in the
> statements we are talking about here. But, I think we can't enable
> parallelism for GatherMerge is because for that node we always need to
> fetch the data in the leader backend to perform the final merge phase.
> So, I was expecting a small comment saying something on those lines.

Added comments.

> 2. Addition of new test cases.

Added new test cases.

> Analysis on the 2 mismatches in write_parallel.sql regression test.

Done. It needed a small code change in costsize.c. Now, both make check and
make check-world passes.

Apart from above, a couple of other things I have finished with the v3

1. Both make check and make check-world with force_parallel_mode = regress
2. I enabled parallel inserts in case of materialized views. Hope that's

Attaching v3 patch herewith.

I'm done with all the open points in my list. Please review the v3 patch
and provide comments.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From b80460c0390317cceecd66ce9780feafd55bd5b2 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 19 Oct 2020 22:06:41 +0530
Subject: [PATCH v3] Parallel Inserts in CREATE TABLE AS

The idea of this patch is to allow the leader and each worker
insert the tuples in parallel if the SELECT part of the CTAS is

The design:
Let the planner know that the SELECT is from CTAS in createas.c
so that it can set the number of tuples transferred from the
workers to Gather node to 0. With this change, there are chances
that the planner may choose the parallel plan. After the planning,
check if the upper plan node is Gather in createas.c and mark a
parallelism flag in the CTAS dest receiver. Pass the into clause,
object id, command id from the leader to workers, so that each
worker can create its own CTAS dest receiver. Leader inserts it's
share of tuples if instructed to do, and so are workers. Each
worker writes atomically it's number of inserted tuples into a
shared memory variable, the leader combines this with it's own
number of inserted tuples and shares to the client.
 src/backend/access/heap/heapam.c             |  11 -
 src/backend/access/transam/xact.c            |  30 +-
 src/backend/commands/createas.c              | 341 ++++++++++++-------
 src/backend/commands/explain.c               |  36 ++
 src/backend/executor/execMain.c              |  19 ++
 src/backend/executor/execParallel.c          |  60 +++-
 src/backend/executor/nodeGather.c            | 101 +++++-
 src/backend/optimizer/path/costsize.c        |  12 +-
 src/include/access/xact.h                    |   1 +
 src/include/commands/createas.h              |  20 ++
 src/include/executor/execParallel.h          |   1 +
 src/include/nodes/execnodes.h                |   5 +
 src/include/nodes/parsenodes.h               |   1 +
 src/test/regress/expected/write_parallel.out | 143 ++++++++
 src/test/regress/sql/write_parallel.sql      |  65 ++++
 15 files changed, 694 insertions(+), 152 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861a02..1602525d4a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2043,17 +2043,6 @@ static HeapTuple
 heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
-	/*
-	 * To allow parallel inserts, we need to ensure that they are safe to be
-	 * performed in workers. We have the infrastructure to allow parallel
-	 * inserts in general except for the cases where inserts generate a new
-	 * CommandId (eg. inserts into a table having a foreign key column).
-	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				 errmsg("cannot insert tuples in a parallel worker")));
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index af6afcebb1..809774c4bb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -763,18 +763,34 @@ GetCurrentCommandId(bool used)
 	/* this is global to a transaction, not subtransaction-local */
 	if (used)
-		/*
-		 * Forbid setting currentCommandIdUsed in a parallel worker, because
-		 * we have no provision for communicating this back to the leader.  We
-		 * could relax this restriction when currentCommandIdUsed was already
-		 * true at the start of the parallel operation.
-		 */
-		Assert(!IsParallelWorker());
+		 /*
+		  * This is a temporary hack for all common parallel insert cases i.e.
+		  * insert into, ctas, copy from. To be changed later. In a parallel
+		  * worker, set currentCommandIdUsed to true only if it was not set to
+		  * true at the start of the parallel operation (by way of
+		  * SetCurrentCommandIdUsedForWorker()). We have to do this because
+		  * GetCurrentCommandId(true) may be called from anywhere, especially
+		  * for parallel inserts, within parallel worker.
+		  */
+		Assert(!(IsParallelWorker() && !currentCommandIdUsed));
 		currentCommandIdUsed = true;
 	return currentCommandId;
+ *	SetCurrentCommandIdUsedForWorker
+ *
+ * For a parallel worker, record that the currentCommandId has been used.
+ * This must only be called at the start of a parallel operation.
+	Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId);
+	currentCommandIdUsed = true;
  *	SetParallelStartTimestamps
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..9df8a7face 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -51,18 +51,6 @@
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
-typedef struct
-	DestReceiver pub;			/* publicly-known function pointers */
-	IntoClause *into;			/* target relation specification */
-	/* These fields are filled by intorel_startup: */
-	Relation	rel;			/* relation to write to */
-	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
-} DR_intorel;
 /* utility functions for CTAS definition creation */
 static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into);
 static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into);
@@ -328,10 +316,27 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		query = linitial_node(Query, rewritten);
 		Assert(query->commandType == CMD_SELECT);
+		/*
+		 * Flag to let the planner know that the SELECT query is for CTAS. This
+		 * is used to calculate the tuple transfer cost from workers to gather
+		 * node(in case parallelism kicks in for the SELECT part of the CTAS),
+		 * to zero as each worker will parallelly insert its share of tuples.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, NULL))
+			query->isForCTAS = true;
 		/* plan the query */
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
+		/*
+		 * SELECT part of the CTAS is parallelizable, so we can make each
+		 * parallel worker insert the tuples that are resulted in it's
+		 * execution into the target table.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, plan))
+			((DR_intorel *) dest)->is_parallel = true;
 		 * Use a snapshot with an updated command ID to ensure this query sees
 		 * results of any previously executed queries.  (This could only
@@ -418,6 +423,9 @@ CreateIntoRelDestReceiver(IntoClause *intoClause)
 	self->pub.rDestroy = intorel_destroy;
 	self->pub.mydest = DestIntoRel;
 	self->into = intoClause;
+	self->is_parallel = false;
+	self->is_parallel_worker = false;
+	self->object_id = InvalidOid;
 	/* other private fields will be set during intorel_startup */
 	return (DestReceiver *) self;
@@ -430,135 +438,180 @@ static void
 intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
-	bool		is_matview;
-	char		relkind;
-	List	   *attrList;
 	ObjectAddress intoRelationAddr;
 	Relation	intoRelationDesc;
-	RangeTblEntry *rte;
-	ListCell   *lc;
-	int			attnum;
-	Assert(into != NULL);		/* else somebody forgot to set it */
-	/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
-	is_matview = (into->viewQuery != NULL);
-	relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
+	if (myState->is_parallel_worker)
+	{
+		/* In the worker */
+		intoRelationDesc = table_open(myState->object_id, AccessExclusiveLock);
+		myState->rel = intoRelationDesc;
+		myState->reladdr = InvalidObjectAddress;
+		myState->ti_options = 0;
+		myState->bistate = GetBulkInsertState();
-	/*
-	 * Build column definitions using "pre-cooked" type and collation info. If
-	 * a column name list was specified in CREATE TABLE AS, override the
-	 * column names derived from the query.  (Too few column names are OK, too
-	 * many are not.)
-	 */
-	attrList = NIL;
-	lc = list_head(into->colNames);
-	for (attnum = 0; attnum < typeinfo->natts; attnum++)
+		/*
+		 * Right after the table is created in the leader, the command id is
+		 * incremented (in create_ctas_internal()). The new command id is
+		 * marked as used in inintorel_startup(), then the parallel mode is
+		 * entered. The command id and transaction id are serialized into
+		 * parallel DSM, they are then available to all parallel workers. All
+		 * the workers need to mark the command id as used before insertion.
+		 */
+		(void) SetCurrentCommandIdUsedForWorker();
+		myState->output_cid = GetCurrentCommandId(false);
+	}
+	else
-		Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
-		ColumnDef  *col;
-		char	   *colname;
+		IntoClause *into = myState->into;
+		bool		is_matview;
+		char		relkind;
+		List	   *attrList;
+		Relation	intoRelationDesc;
+		RangeTblEntry *rte;
+		ListCell   *lc;
+		int			attnum;
-		if (lc)
-		{
-			colname = strVal(lfirst(lc));
-			lc = lnext(into->colNames, lc);
-		}
-		else
-			colname = NameStr(attribute->attname);
+		Assert(into != NULL);		/* else somebody forgot to set it */
-		col = makeColumnDef(colname,
-							attribute->atttypid,
-							attribute->atttypmod,
-							attribute->attcollation);
+		/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
+		is_matview = (into->viewQuery != NULL);
+		relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
-		 * It's possible that the column is of a collatable type but the
-		 * collation could not be resolved, so double-check.  (We must check
-		 * this here because DefineRelation would adopt the type's default
-		 * collation rather than complaining.)
-		 */
-		if (!OidIsValid(col->collOid) &&
-			type_is_collatable(col->typeName->typeOid))
-			ereport(ERROR,
-					 errmsg("no collation was derived for column \"%s\" with collatable type %s",
-							col->colname,
-							format_type_be(col->typeName->typeOid)),
-					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+		* Build column definitions using "pre-cooked" type and collation info. If
+		* a column name list was specified in CREATE TABLE AS, override the
+		* column names derived from the query.  (Too few column names are OK, too
+		* many are not.)
+		*/
+		attrList = NIL;
+		lc = list_head(into->colNames);
+		for (attnum = 0; attnum < typeinfo->natts; attnum++)
+		{
+			Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
+			ColumnDef  *col;
+			char	   *colname;
-		attrList = lappend(attrList, col);
-	}
+			if (lc)
+			{
+				colname = strVal(lfirst(lc));
+				lc = lnext(into->colNames, lc);
+			}
+			else
+				colname = NameStr(attribute->attname);
-	if (lc != NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("too many column names were specified")));
+			col = makeColumnDef(colname,
+								attribute->atttypid,
+								attribute->atttypmod,
+								attribute->attcollation);
-	/*
-	 * Actually create the target table
-	 */
-	intoRelationAddr = create_ctas_internal(attrList, into);
+			/*
+			* It's possible that the column is of a collatable type but the
+			* collation could not be resolved, so double-check.  (We must check
+			* this here because DefineRelation would adopt the type's default
+			* collation rather than complaining.)
+			*/
+			if (!OidIsValid(col->collOid) &&
+				type_is_collatable(col->typeName->typeOid))
+				ereport(ERROR,
+						errmsg("no collation was derived for column \"%s\" with collatable type %s",
+								col->colname,
+								format_type_be(col->typeName->typeOid)),
+						errhint("Use the COLLATE clause to set the collation explicitly.")));
-	/*
-	 * Finally we can open the target table
-	 */
-	intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
+			attrList = lappend(attrList, col);
+		}
-	/*
-	 * Check INSERT permission on the constructed table.
-	 *
-	 * XXX: It would arguably make sense to skip this check if into->skipData
-	 * is true.
-	 */
-	rte = makeNode(RangeTblEntry);
-	rte->rtekind = RTE_RELATION;
-	rte->relid = intoRelationAddr.objectId;
-	rte->relkind = relkind;
-	rte->rellockmode = RowExclusiveLock;
-	rte->requiredPerms = ACL_INSERT;
+		if (lc != NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("too many column names were specified")));
-	for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
-		rte->insertedCols = bms_add_member(rte->insertedCols,
-										   attnum - FirstLowInvalidHeapAttributeNumber);
+		/*
+		* Actually create the target table
+		*/
+		intoRelationAddr = create_ctas_internal(attrList, into);
-	ExecCheckRTPerms(list_make1(rte), true);
+		/*
+		* Finally we can open the target table
+		*/
+		intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
-	/*
-	 * Make sure the constructed table does not have RLS enabled.
-	 *
-	 * check_enable_rls() will ereport(ERROR) itself if the user has requested
-	 * something invalid, and otherwise will return RLS_ENABLED if RLS should
-	 * be enabled here.  We don't actually support that currently, so throw
-	 * our own ereport(ERROR) if that happens.
-	 */
-	if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
-		ereport(ERROR,
-				 errmsg("policies not yet implemented for this command")));
+		/*
+		* Check INSERT permission on the constructed table.
+		*
+		* XXX: It would arguably make sense to skip this check if into->skipData
+		* is true.
+		*/
+		rte = makeNode(RangeTblEntry);
+		rte->rtekind = RTE_RELATION;
+		rte->relid = intoRelationAddr.objectId;
+		rte->relkind = relkind;
+		rte->rellockmode = RowExclusiveLock;
+		rte->requiredPerms = ACL_INSERT;
+		for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
+			rte->insertedCols = bms_add_member(rte->insertedCols,
+											attnum - FirstLowInvalidHeapAttributeNumber);
+		ExecCheckRTPerms(list_make1(rte), true);
-	/*
-	 * Tentatively mark the target as populated, if it's a matview and we're
-	 * going to fill it; otherwise, no change needed.
-	 */
-	if (is_matview && !into->skipData)
-		SetMatViewPopulatedState(intoRelationDesc, true);
+		/*
+		* Make sure the constructed table does not have RLS enabled.
+		*
+		* check_enable_rls() will ereport(ERROR) itself if the user has requested
+		* something invalid, and otherwise will return RLS_ENABLED if RLS should
+		* be enabled here.  We don't actually support that currently, so throw
+		* our own ereport(ERROR) if that happens.
+		*/
+		if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
+			ereport(ERROR,
+					errmsg("policies not yet implemented for this command")));
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->rel = intoRelationDesc;
-	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
-	myState->bistate = GetBulkInsertState();
+		/*
+		* Tentatively mark the target as populated, if it's a matview and we're
+		* going to fill it; otherwise, no change needed.
+		*/
+		if (is_matview && !into->skipData)
+			SetMatViewPopulatedState(intoRelationDesc, true);
-	/*
-	 * Valid smgr_targblock implies something already wrote to the relation.
-	 * This may be harmless, but this function hasn't planned for it.
-	 */
-	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+		/*
+		* Fill private fields of myState for use by later routines
+		*/
+		myState->rel = intoRelationDesc;
+		myState->reladdr = intoRelationAddr;
+		myState->output_cid = GetCurrentCommandId(true);
+		myState->ti_options = TABLE_INSERT_SKIP_FSM;
+		myState->bistate = GetBulkInsertState();
+		/*
+		* Valid smgr_targblock implies something already wrote to the relation.
+		* This may be harmless, but this function hasn't planned for it.
+		*/
+		Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+		if (myState->is_parallel == true)
+		{
+			myState->object_id = intoRelationAddr.objectId;
+			/*
+			 * We don't need to skip contacting FSM while inserting tuples
+			 * for parallel mode, while extending the relations, workers
+			 * instead of blocking on a page while another worker is inserting,
+			 * can check the FSM for another page that can accommodate the
+			 * tuples. This results in major benefit for parallel inserts.
+			 */
+			myState->ti_options = 0;
+			/*
+			 * rd_createSubid is marked invalid, otherwise, the table is
+			 * not allowed to extend by the workers.
+			 */
+			myState->rel->rd_createSubid = InvalidSubTransactionId;
+		}
+	}
@@ -614,3 +667,49 @@ intorel_destroy(DestReceiver *self)
+ * IsParallelInsertInCTASAllowed --- determine whether or not parallel
+ * insertion is possible.
+ */
+bool IsParallelInsertInCTASAllowed(IntoClause *into, PlannedStmt *plannedstmt)
+	bool allowed = false;
+	if (into != NULL &&
+		IsA(into, IntoClause))
+	{
+		if (into->rel != NULL &&
+			into->rel->relpersistence != RELPERSISTENCE_TEMP)
+			allowed = true;
+		if (plannedstmt != NULL && allowed)
+		{
+			/*
+			 * We allow parallel inserts by the workers only if the upper node
+			 * is Gather. We can not let workers do parallel inserts when
+			 * GatherMerge node is involved as the leader backend does the
+			 * final phase(merge the results by workers).
+			 */
+			if (plannedstmt->parallelModeNeeded &&
+				plannedstmt->planTree != NULL &&
+				IsA(plannedstmt->planTree, Gather) &&
+				plannedstmt->planTree->lefttree != NULL &&
+				plannedstmt->planTree->lefttree->parallel_aware &&
+				plannedstmt->planTree->lefttree->parallel_safe)
+			{
+				/*
+				 * Since there are no rows that are transferred from workers to
+				 * Gather node, so we set it to 0 to be visible in explain
+				 * plans. Note that we would have accounted this for cost
+				 * calculations in cost_gather().
+				 */
+				plannedstmt->planTree->plan_rows = 0;
+			}
+			else
+				allowed = false;
+		}
+	}
+	return allowed;
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 41317f1837..b559d2d6e1 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -371,6 +371,15 @@ ExplainOneQuery(Query *query, int cursorOptions,
+	/*
+	 * Flag to let the planner know that the SELECT query is for CTAS. This is
+	 * used to calculate the tuple transfer cost from workers to gather node(in
+	 * case parallelism kicks in for the SELECT part of the CTAS), to zero as
+	 * each worker will parallelly insert its share of tuples.
+	 */
+	if (IsParallelInsertInCTASAllowed(into, NULL))
+		query->isForCTAS = true;
 	/* if an advisor plugin is present, let it manage things */
 	if (ExplainOneQuery_hook)
 		(*ExplainOneQuery_hook) (query, cursorOptions, into, es,
@@ -536,7 +545,17 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 * AS, we'd better use the appropriate tuple receiver.
 	if (into)
+	{
 		dest = CreateIntoRelDestReceiver(into);
+		/*
+		 * SELECT part of the CTAS is parallelizable, so we can make each
+		 * parallel worker insert the tuples that are resulted in it's
+		 * execution into the target table.
+		 */
+		if (IsParallelInsertInCTASAllowed(into, plannedstmt))
+			((DR_intorel *) dest)->is_parallel = true;
+	}
 		dest = None_Receiver;
@@ -1753,6 +1772,23 @@ ExplainNode(PlanState *planstate, List *ancestors,
 				Gather	   *gather = (Gather *) plan;
+				if (IsA(planstate, GatherState) &&
+					planstate->intoclause != NULL &&
+					IsA(planstate->intoclause,IntoClause) &&
+					planstate->dest != NULL &&
+					planstate->dest->mydest == DestIntoRel &&
+					((DR_intorel *) planstate->dest)->is_parallel == true &&
+					planstate->intoclause->rel != NULL &&
+					planstate->intoclause->rel->relname != NULL)
+				{
+					ExplainIndentText(es);
+					appendStringInfoString(es->str, "->  ");
+					appendStringInfoString(es->str, "Create ");
+					appendStringInfo(es->str, "%s\n", planstate->intoclause->rel->relname);
+					es->indent++;
+					ExplainIndentText(es);
+				}
 				show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
 				if (plan->qual)
 					show_instrumentation_count("Rows Removed by Filter", 1,
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index aea0479448..5482ba4e3a 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -45,6 +45,7 @@
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_publication.h"
+#include "commands/createas.h"
 #include "commands/matview.h"
 #include "commands/trigger.h"
 #include "executor/execdebug.h"
@@ -352,6 +353,24 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 	if (sendTuples)
 		dest->rStartup(dest, operation, queryDesc->tupDesc);
+	/*
+	 * For parallelizing inserts in CTAS i.e. making each
+	 * parallel worker inerst it's tuples, we must send
+	 * information such as intoclause(for each worker
+	 * building it's own dest receiver), object id(for each
+	 * worker to open the table).
+	 */
+	if (queryDesc->plannedstmt->parallelModeNeeded == true &&
+		dest != NULL &&
+		dest->mydest == DestIntoRel &&
+		((DR_intorel *) dest)->is_parallel == true &&
+		((DR_intorel *) dest)->is_parallel_worker != true)
+	{
+		queryDesc->planstate->intoclause = ((DR_intorel *) dest)->into;
+		queryDesc->planstate->objectid = ((DR_intorel *) dest)->object_id;
+		queryDesc->planstate->dest = dest;
+	}
 	 * run plan
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index befde52691..b2aa06102f 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
 #include "postgres.h"
+#include "commands/createas.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
@@ -65,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_INTO_CLAUSE		UINT64CONST(0xE00000000000000B)
@@ -77,6 +79,8 @@ typedef struct FixedParallelExecutorState
 	dsa_pointer param_exec;
 	int			eflags;
 	int			jit_flags;
+	Oid			objectid;		/* workers to open relation/table.  */
+	pg_atomic_uint64	processed; /* number tuples inserted by all the workers. */
 } FixedParallelExecutorState;
@@ -600,6 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	Size		dsa_minsize = dsa_minimum_size();
 	char	   *query_string;
 	int			query_len;
+	char 		*intoclausestr = NULL;
 	 * Force any initplan outputs that we're going to pass to workers to be
@@ -712,6 +717,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Estimate space for into clause for CTAS. */
+	if (ISCTAS(planstate->intoclause))
+	{
+		intoclausestr = nodeToString(planstate->intoclause);
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(intoclausestr) + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
 	/* Everyone's had a chance to ask for space, so now create the DSM. */
@@ -729,6 +742,15 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	fpes->param_exec = InvalidDsaPointer;
 	fpes->eflags = estate->es_top_eflags;
 	fpes->jit_flags = estate->es_jit_flags;
+	pg_atomic_init_u64(&fpes->processed, 0);
+	pei->processed = &fpes->processed;
+	if (intoclausestr != NULL &&
+		planstate->objectid != InvalidOid)
+		fpes->objectid = planstate->objectid;
+	else
+		fpes->objectid = InvalidOid;
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
 	/* Store query string */
@@ -758,8 +780,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
 	pei->wal_usage = walusage_space;
+	if (intoclausestr != NULL)
+	{
+		char *shmptr = (char *)shm_toc_allocate(pcxt->toc,
+												strlen(intoclausestr) + 1);
+		strcpy(shmptr, intoclausestr);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr);
+	}
 	/* Set up the tuple queues that the workers will write into. */
-	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+	if (intoclausestr == NULL)
+		pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 	/* We don't need the TupleQueueReaders yet, though. */
 	pei->reader = NULL;
@@ -1387,12 +1418,28 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
+	char		*intoclausestr = NULL;
+	IntoClause	*intoclause = NULL;
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
-	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
-	receiver = ExecParallelGetReceiver(seg, toc);
+	intoclausestr = shm_toc_lookup(toc, PARALLEL_KEY_INTO_CLAUSE, true);
+	if (intoclausestr != NULL)
+	{
+		/*
+		 * If the worker is for parallel insert in CTAS, then use
+		 * the proper dest receiver.
+		 */
+		intoclause = (IntoClause *) stringToNode(intoclausestr);
+		receiver = CreateIntoRelDestReceiver(intoclause);
+		((DR_intorel *)receiver)->is_parallel_worker = true;
+		((DR_intorel *)receiver)->object_id = fpes->objectid;
+	}
+	else
+		/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
+		receiver = ExecParallelGetReceiver(seg, toc);
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
 	if (instrumentation != NULL)
 		instrument_options = instrumentation->instrument_options;
@@ -1471,6 +1518,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
+	/*
+	 * Write out the number of tuples this worker has inserted. Leader
+	 * will use it to inform to the end client.
+	 */
+	if (intoclausestr != NULL)
+		pg_atomic_add_fetch_u64(&fpes->processed, queryDesc->estate->es_processed);
 	/* Must do this after capturing instrumentation. */
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index a01b46af14..99211e4941 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -32,6 +32,7 @@
 #include "access/relscan.h"
 #include "access/xact.h"
+#include "commands/createas.h"
 #include "executor/execdebug.h"
 #include "executor/execParallel.h"
 #include "executor/nodeGather.h"
@@ -48,7 +49,7 @@ static TupleTableSlot *ExecGather(PlanState *pstate);
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
 static MinimalTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
+static void ExecParallelInsertInCTAS(GatherState *node);
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -131,6 +132,69 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	return gatherstate;
+/* ----------------------------------------------------------------
+ *		ExecParallelInsertInCTAS(node)
+ *
+ *		Facilitates parallel inserts by parallel workers and/or
+ *		leader for Create Table AS.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecParallelInsertInCTAS(GatherState *node)
+	/* Enable leader to insert in case no parallel workers were launched. */
+	if (node->nworkers_launched == 0 &&
+		!node->need_to_scan_locally)
+		node->need_to_scan_locally = true;
+	/*
+	 * By now, for parallel workers (if launched any), would have started their
+	 * work i.e. insertion to target table. In case the leader is chosen to
+	 * participate for parallel inserts in CTAS, then finish it's share before
+	 * going to wait for the parallel workers to finish.
+	 */
+	if (node->need_to_scan_locally == true &&
+		node->ps.dest != NULL &&
+		node->ps.dest->mydest == DestIntoRel)
+	{
+		EState	   *estate = node->ps.state;
+		TupleTableSlot *outerTupleSlot;
+		for(;;)
+		{
+			/* Install our DSA area while executing the plan. */
+			estate->es_query_dsa =
+					node->pei ? node->pei->area : NULL;
+			outerTupleSlot = ExecProcNode(node->ps.lefttree);
+			estate->es_query_dsa = NULL;
+			if (!TupIsNull(outerTupleSlot))
+			{
+				(void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest);
+				node->ps.state->es_processed++;
+			}
+			if(TupIsNull(outerTupleSlot))
+				break;
+		}
+		node->need_to_scan_locally = false;
+	}
+	/* Wait for the parallel workers to finish. */
+	if (node->nworkers_launched > 0)
+	{
+		ExecShutdownGatherWorkers(node);
+		/*
+		 * Add up the total tuples inserted by all workers, to the tuples
+		 * inserted by the leader(if any). This will be shared to client.
+		 */
+		node->ps.state->es_processed += pg_atomic_read_u64(node->pei->processed);
+	}
 /* ----------------------------------------------------------------
  *		ExecGather(node)
@@ -166,6 +230,16 @@ ExecGather(PlanState *pstate)
 			ParallelContext *pcxt;
+			/*
+			 * Take the necessary information to be passed to workers for
+			 * parallel inserts in CTAS.
+			 */
+			if (ISCTAS(node->ps.intoclause))
+			{
+				node->ps.lefttree->intoclause = node->ps.intoclause;
+				node->ps.lefttree->objectid = node->ps.objectid;
+			}
 			/* Initialize, or re-initialize, shared state needed by workers. */
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
@@ -190,13 +264,16 @@ ExecGather(PlanState *pstate)
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
-				ExecParallelCreateReaders(node->pei);
-				/* Make a working array showing the active readers */
-				node->nreaders = pcxt->nworkers_launched;
-				node->reader = (TupleQueueReader **)
-					palloc(node->nreaders * sizeof(TupleQueueReader *));
-				memcpy(node->reader, node->pei->reader,
-					   node->nreaders * sizeof(TupleQueueReader *));
+				if (!(ISCTAS(node->ps.intoclause)))
+				{
+					ExecParallelCreateReaders(node->pei);
+					/* Make a working array showing the active readers */
+					node->nreaders = pcxt->nworkers_launched;
+					node->reader = (TupleQueueReader **)
+						palloc(node->nreaders * sizeof(TupleQueueReader *));
+					memcpy(node->reader, node->pei->reader,
+						node->nreaders * sizeof(TupleQueueReader *));
+				}
@@ -208,7 +285,8 @@ ExecGather(PlanState *pstate)
 		/* Run plan locally if no workers or enabled and not single-copy. */
-		node->need_to_scan_locally = (node->nreaders == 0)
+		node->need_to_scan_locally = (node->nreaders == 0 &&
+			!(ISCTAS(node->ps.intoclause)))
 			|| (!gather->single_copy && parallel_leader_participation);
 		node->initialized = true;
@@ -220,6 +298,11 @@ ExecGather(PlanState *pstate)
 	econtext = node->ps.ps_ExprContext;
+	if (ISCTAS(node->ps.intoclause))
+	{
+		ExecParallelInsertInCTAS(node);
+		return NULL;
+	}
 	 * Get next tuple, either from one of our workers, or by running the plan
 	 * ourselves.
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 733f7ea543..72307e0927 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -393,7 +393,17 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 	/* Parallel setup and communication cost. */
 	startup_cost += parallel_setup_cost;
-	run_cost += parallel_tuple_cost * path->path.rows;
+	/*
+	 * Make the number of tuples that are transferred from workers to gather
+	 * node zero as each worker parallelly insert the tuples that are resulted
+	 * from its chunk of plan execution. This change may make the parallel
+	 * plan cheap among all other plans, and influence the planner to consider
+	 * this parallel plan.
+	 */
+	if (!(root->parse->isForCTAS &&
+		root->query_level == 1))
+		run_cost += parallel_tuple_cost * path->path.rows;
 	path->path.startup_cost = startup_cost;
 	path->path.total_cost = (startup_cost + run_cost);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7320de345c..5beae6c617 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -389,6 +389,7 @@ extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
+extern void SetCurrentCommandIdUsedForWorker(void);
 extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts);
 extern TimestampTz GetCurrentTransactionStartTimestamp(void);
 extern TimestampTz GetCurrentStatementStartTimestamp(void);
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index 7629230254..af72ffcfe2 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -14,12 +14,30 @@
 #ifndef CREATEAS_H
 #define CREATEAS_H
+#include "access/heapam.h"
 #include "catalog/objectaddress.h"
 #include "nodes/params.h"
+#include "nodes/plannodes.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 #include "utils/queryenvironment.h"
+typedef struct
+	DestReceiver pub;			/* publicly-known function pointers */
+	IntoClause *into;			/* target relation specification */
+	/* These fields are filled by intorel_startup: */
+	Relation	rel;			/* relation to write to */
+	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
+	CommandId	output_cid;		/* cmin to insert in output tuples */
+	int			ti_options;		/* table_tuple_insert performance options */
+	BulkInsertState bistate;	/* bulk insert state */
+	bool		is_parallel;		/* true if parallelism is to be considered */
+	bool		is_parallel_worker; /* true for parallel worker */
+	Oid			object_id;			/* used for table open by parallel worker */
+} DR_intorel;
+#define ISCTAS(intoclause) (intoclause != NULL && IsA(intoclause, IntoClause))
 extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 									   ParamListInfo params, QueryEnvironment *queryEnv,
@@ -29,4 +47,6 @@ extern int	GetIntoRelEFlags(IntoClause *intoClause);
 extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause);
+extern bool IsParallelInsertInCTASAllowed(IntoClause *intoClause, PlannedStmt *plannedstmt);
 #endif							/* CREATEAS_H */
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a39a5b29c..77f69946bf 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -35,6 +35,7 @@ typedef struct ParallelExecutorInfo
 	/* These two arrays have pcxt->nworkers_launched entries: */
 	shm_mq_handle **tqueue;		/* tuple queues for worker output */
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
+	volatile pg_atomic_uint64	*processed;	/* number of tuples inserted by all workers */
 } ParallelExecutorInfo;
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6c0a7d68d6..5083c4cfb5 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -23,6 +23,7 @@
 #include "nodes/tidbitmap.h"
 #include "partitioning/partdefs.h"
 #include "storage/condition_variable.h"
+#include "tcop/dest.h"
 #include "utils/hsearch.h"
 #include "utils/queryenvironment.h"
 #include "utils/reltrigger.h"
@@ -1020,6 +1021,10 @@ typedef struct PlanState
 	bool		outeropsset;
 	bool		inneropsset;
 	bool		resultopsset;
+	/* Below is parallel inserts in CTAS related info. */
+	IntoClause	*intoclause;
+	Oid			objectid;
+	DestReceiver *dest;
 } PlanState;
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 60c2f45466..fe43dc941e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -180,6 +180,7 @@ typedef struct Query
 	int			stmt_location;	/* start location, or -1 if unknown */
 	int			stmt_len;		/* length in bytes; 0 means "rest of string" */
+	bool		isForCTAS;		/* true if the select query is for create table as */
 } Query;
diff --git a/src/test/regress/expected/write_parallel.out b/src/test/regress/expected/write_parallel.out
index 0c4da2591a..11ef18b8a4 100644
--- a/src/test/regress/expected/write_parallel.out
+++ b/src/test/regress/expected/write_parallel.out
@@ -76,4 +76,147 @@ explain (costs off) create table parallel_write as execute prep_stmt;
 create table parallel_write as execute prep_stmt;
 drop table parallel_write;
+-- Test parallel inserts in create table as/select into/create materialized view
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+create temporary table parallel_write as select length(stringu1) from tenk1;
+                         QUERY PLAN                          
+ Gather (actual rows=10000 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(4 rows)
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create unlogged table parallel_write as select length(stringu1) from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into parallel_write from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into temporary parallel_write from tenk1;
+                         QUERY PLAN                          
+ Gather (actual rows=10000 loops=1)
+   Workers Planned: 4
+   Workers Launched: 4
+   ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(4 rows)
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into unlogged parallel_write from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1 for update;
+                     QUERY PLAN                      
+ LockRows (actual rows=10000 loops=1)
+   ->  Seq Scan on tenk1 (actual rows=10000 loops=1)
+(2 rows)
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create materialized view parallel_mat_view as
+select length(stringu1) from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_mat_view
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop materialized view parallel_mat_view;
+-- parallel inserts must occur
+prepare parallel_write_prep as select length(stringu1) from tenk1;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as execute parallel_write_prep;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+deallocate parallel_write_prep;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select now(), four from tenk1;
+                          QUERY PLAN                           
+ Gather (actual rows=0 loops=1)
+   ->  Create parallel_write
+     Workers Planned: 4
+     Workers Launched: 4
+     ->  Parallel Seq Scan on tenk1 (actual rows=2000 loops=5)
+(5 rows)
+drop table parallel_write;
+-- parallel inserts must not occur
+create sequence parallel_write_sequence;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+select nextval('parallel_write_sequence'), four from tenk1;
+                  QUERY PLAN                   
+ Seq Scan on tenk1 (actual rows=10000 loops=1)
+(1 row)
+drop table parallel_write;
+drop sequence parallel_write_sequence;
diff --git a/src/test/regress/sql/write_parallel.sql b/src/test/regress/sql/write_parallel.sql
index 78b479cedf..dd4233b399 100644
--- a/src/test/regress/sql/write_parallel.sql
+++ b/src/test/regress/sql/write_parallel.sql
@@ -39,4 +39,69 @@ explain (costs off) create table parallel_write as execute prep_stmt;
 create table parallel_write as execute prep_stmt;
 drop table parallel_write;
+-- Test parallel inserts in create table as/select into/create materialized view
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1;
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+create temporary table parallel_write as select length(stringu1) from tenk1;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create unlogged table parallel_write as select length(stringu1) from tenk1;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into parallel_write from tenk1;
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into temporary parallel_write from tenk1;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+select length(stringu1) into unlogged parallel_write from tenk1;
+drop table parallel_write;
+-- parallel inserts must not occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select length(stringu1) from tenk1 for update;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create materialized view parallel_mat_view as
+select length(stringu1) from tenk1;
+drop materialized view parallel_mat_view;
+-- parallel inserts must occur
+prepare parallel_write_prep as select length(stringu1) from tenk1;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as execute parallel_write_prep;
+deallocate parallel_write_prep;
+drop table parallel_write;
+-- parallel inserts must occur
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as select now(), four from tenk1;
+drop table parallel_write;
+-- parallel inserts must not occur
+create sequence parallel_write_sequence;
+explain (costs off, analyze on, timing off, summary off)
+create table parallel_write as
+select nextval('parallel_write_sequence'), four from tenk1;
+drop table parallel_write;
+drop sequence parallel_write_sequence;

