Hi,

The idea of this patch is to allow the leader and each worker insert the
tuples in parallel if the SELECT part of the CTAS is parallelizable. Along
with the parallel inserts, if the CTAS code path is allowed to do
table_multi_insert()[1], then the gain we achieve is as follows:

For a table with 2 integer columns, 100million tuples(more testing results
are at [2]), the exec time on the HEAD is *120sec*, where as with the
parallelism patch proposed here and multi insert patch [1], with 3 workers
and leader participation the exec time is *22sec(5.45X)*. With the current
CTAS code which does single tuple insert(see intorel_receive()), the
performance gain is limited to ~1.7X with parallelism. This is due to the
fact that the workers contend more for locks on buffer pages while
extending the table. So, the maximum benefit we could get for CTAS is with
both parallelism and multi tuple inserts.

The design:
Let the planner know that the SELECT is from CTAS in createas.c so that it
can set the number of tuples transferred from the workers to Gather node to
0. With this change, there are chances that the planner may choose the
parallel plan. After the planning, check if the upper plan node is Gather
in createas.c and mark a parallelism flag in the CTAS dest receiver. Pass
the into clause, object id, command id from the leader to workers, so that
each worker can create its own CTAS dest receiver. Leader inserts it's
share of tuples if instructed to do, and so are workers. Each worker writes
atomically it's number of inserted tuples into a shared memory variable,
the leader combines this with it's own number of inserted tuples and shares
to the client.

Below things are still pending. Thoughts are most welcome:
1. How better we can lift the "cannot insert tuples in a parallel worker"
from heap_prepare_insert() for only CTAS cases or for that matter parallel
copy? How about having a variable in any of the worker global contexts and
use that? Of course, we can remove this restriction entirely in case we
fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.
2. How to represent the parallel insert for CTAS in explain plans? The
explain CTAS shows the plan for only the SELECT part. How about having some
textual info along with the Gather node?

 -----------------------------------------------------------------------------
     Gather  (cost=1000.00..108738.90 rows=0 width=8)
     Workers Planned: 2
        ->  Parallel Seq Scan on t_test  (cost=0.00..106748.00 rows=4954
width=8)
             Filter: (many < 10000)

 -----------------------------------------------------------------------------
3. Need to restrict parallel inserts, if CTAS tries to create temp/global
tables as the workers will not have access to those tables. Need to analyze
whether to allow parallelism if CTAS has prepared statements or with no
data.
4. Need to stop unnecessary parallel shared state such as tuple queue being
created and shared to workers.
5. Addition of new test cases. Testing with more scenarios and different
data sets, sizes, tablespaces, select into. Analysis on the 2 mismatches in
write_parallel.sql regression test.

Thoughts?

Credits:
1. Thanks to DIlip Kumar for the main design idea and the discussions.
Thanks to Vignesh for the discussions.
2. Patch development, testing is by me.
3. Thanks to the authors of table_multi_insert() in CTAS patch [1].

[1] - For table_multi_insert() in CTAS, I used an in-progress patch
available at
https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com
[2] - Table with 2 integer columns, 100million tuples, with leader
participation,with default postgresql.conf file. All readings are of
triplet form - (workers, exec time in sec, improvement).
case 1: no multi inserts -
(0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X)
case 2: with multi inserts -
(0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X)
case 3: same table but unlogged with multi inserts -
(0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X)

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From 9e45426a6d4d6f030ba24ed58eb0e2ff5912a972 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Sun, 20 Sep 2020 09:23:06 +0530
Subject: [PATCH v1] Parallel Inserts in CREATE TABLE AS

The idea of this patch is to allow the leader and each worker
insert the tuples in parallel if the SELECT part of the CTAS is
parallelizable.

The design:
Let the planner know that the SELECT is from CTAS in createas.c
so that it can set the number of tuples transferred from the
workers to Gather node to 0. With this change, there are chances
that the planner may choose the parallel plan. After the planning,
check if the upper plan node is Gather in createas.c and mark a
parallelism flag in the CTAS dest receiver. Pass the into clause,
object id, command id from the leader to workers, so that each
worker can create its own CTAS dest receiver. Leader inserts it's 
share of tuples if instructed to do, and so are workers. Each 
worker writes atomically it's number of inserted tuples into a 
shared memory variable, the leader combines this with it's own 
number of inserted tuples and shares to the client.
---
 src/backend/access/heap/heapam.c      |   4 +-
 src/backend/commands/createas.c       | 284 +++++++++++++++-----------
 src/backend/commands/explain.c        |  23 +++
 src/backend/executor/execMain.c       |  21 ++
 src/backend/executor/execParallel.c   |  66 +++++-
 src/backend/executor/nodeGather.c     |  66 ++++++
 src/backend/optimizer/path/costsize.c |  12 ++
 src/include/commands/createas.h       |  15 ++
 src/include/executor/execParallel.h   |   1 +
 src/include/nodes/execnodes.h         |   6 +
 src/include/nodes/parsenodes.h        |   1 +
 11 files changed, 374 insertions(+), 125 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861a02..50766f489a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2049,10 +2049,10 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 	 * inserts in general except for the cases where inserts generate a new
 	 * CommandId (eg. inserts into a table having a foreign key column).
 	 */
-	if (IsParallelWorker())
+	/*if (IsParallelWorker())
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
+				 errmsg("cannot insert tuples in a parallel worker")));*/
 
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index d53ec952d0..4812a55518 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -51,18 +51,6 @@
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 
-typedef struct
-{
-	DestReceiver pub;			/* publicly-known function pointers */
-	IntoClause *into;			/* target relation specification */
-	/* These fields are filled by intorel_startup: */
-	Relation	rel;			/* relation to write to */
-	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
-} DR_intorel;
-
 /* utility functions for CTAS definition creation */
 static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into);
 static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into);
@@ -328,10 +316,29 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 		query = linitial_node(Query, rewritten);
 		Assert(query->commandType == CMD_SELECT);
 
+		/*
+		 * Flag to let the planner know that the SELECT query is for
+		 * CREATE TABLE AS. This is used to make the number tuples
+		 * transferred from workers to gather node(in case parallelism
+		 * kicks in for the SELECT part of the CTAS), to zero as each
+		 * worker will parallelly insert it's share of tuples.
+		 */
+		if (!is_matview)
+			query->isForCTAS = true;
+
 		/* plan the query */
 		plan = pg_plan_query(query, pstate->p_sourcetext,
 							 CURSOR_OPT_PARALLEL_OK, params);
 
+		/*
+		 * SELECT part of the CTAS is parallelizable, so we can make
+		 * each parallel worker insert the tuples that are resulted
+		 * in it's execution into the target table.
+		 */
+		if (!is_matview &&
+			IsA(plan->planTree, Gather))
+			((DR_intorel *) dest)->is_parallel = true;
+
 		/*
 		 * Use a snapshot with an updated command ID to ensure this query sees
 		 * results of any previously executed queries.  (This could only
@@ -418,6 +425,9 @@ CreateIntoRelDestReceiver(IntoClause *intoClause)
 	self->pub.rDestroy = intorel_destroy;
 	self->pub.mydest = DestIntoRel;
 	self->into = intoClause;
+	self->is_parallel = false;
+	self->is_parallel_worker = false;
+	self->object_id = InvalidOid;
 	/* other private fields will be set during intorel_startup */
 
 	return (DestReceiver *) self;
@@ -430,135 +440,167 @@ static void
 intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
-	bool		is_matview;
-	char		relkind;
-	List	   *attrList;
 	ObjectAddress intoRelationAddr;
 	Relation	intoRelationDesc;
-	RangeTblEntry *rte;
-	ListCell   *lc;
-	int			attnum;
 
-	Assert(into != NULL);		/* else somebody forgot to set it */
+	if (myState->is_parallel_worker)
+	{
+		/* In the worker */
+		intoRelationDesc = table_open(myState->object_id, AccessExclusiveLock);
+		myState->rel = intoRelationDesc;
+		myState->reladdr = InvalidObjectAddress;
+		myState->ti_options = 0;
+		myState->bistate = GetBulkInsertState();
+	}
+	else
+	{
+		IntoClause *into = myState->into;
+		bool		is_matview;
+		char		relkind;
+		List	   *attrList;
+		Relation	intoRelationDesc;
+		RangeTblEntry *rte;
+		ListCell   *lc;
+		int			attnum;
 
-	/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
-	is_matview = (into->viewQuery != NULL);
-	relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
+		Assert(into != NULL);		/* else somebody forgot to set it */
 
-	/*
-	 * Build column definitions using "pre-cooked" type and collation info. If
-	 * a column name list was specified in CREATE TABLE AS, override the
-	 * column names derived from the query.  (Too few column names are OK, too
-	 * many are not.)
-	 */
-	attrList = NIL;
-	lc = list_head(into->colNames);
-	for (attnum = 0; attnum < typeinfo->natts; attnum++)
-	{
-		Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
-		ColumnDef  *col;
-		char	   *colname;
+		/* This code supports both CREATE TABLE AS and CREATE MATERIALIZED VIEW */
+		is_matview = (into->viewQuery != NULL);
+		relkind = is_matview ? RELKIND_MATVIEW : RELKIND_RELATION;
 
-		if (lc)
+		/*
+		* Build column definitions using "pre-cooked" type and collation info. If
+		* a column name list was specified in CREATE TABLE AS, override the
+		* column names derived from the query.  (Too few column names are OK, too
+		* many are not.)
+		*/
+		attrList = NIL;
+		lc = list_head(into->colNames);
+		for (attnum = 0; attnum < typeinfo->natts; attnum++)
 		{
-			colname = strVal(lfirst(lc));
-			lc = lnext(into->colNames, lc);
-		}
-		else
-			colname = NameStr(attribute->attname);
+			Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum);
+			ColumnDef  *col;
+			char	   *colname;
 
-		col = makeColumnDef(colname,
-							attribute->atttypid,
-							attribute->atttypmod,
-							attribute->attcollation);
+			if (lc)
+			{
+				colname = strVal(lfirst(lc));
+				lc = lnext(into->colNames, lc);
+			}
+			else
+				colname = NameStr(attribute->attname);
 
-		/*
-		 * It's possible that the column is of a collatable type but the
-		 * collation could not be resolved, so double-check.  (We must check
-		 * this here because DefineRelation would adopt the type's default
-		 * collation rather than complaining.)
-		 */
-		if (!OidIsValid(col->collOid) &&
-			type_is_collatable(col->typeName->typeOid))
-			ereport(ERROR,
-					(errcode(ERRCODE_INDETERMINATE_COLLATION),
-					 errmsg("no collation was derived for column \"%s\" with collatable type %s",
-							col->colname,
-							format_type_be(col->typeName->typeOid)),
-					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+			col = makeColumnDef(colname,
+								attribute->atttypid,
+								attribute->atttypmod,
+								attribute->attcollation);
 
-		attrList = lappend(attrList, col);
-	}
+			/*
+			* It's possible that the column is of a collatable type but the
+			* collation could not be resolved, so double-check.  (We must check
+			* this here because DefineRelation would adopt the type's default
+			* collation rather than complaining.)
+			*/
+			if (!OidIsValid(col->collOid) &&
+				type_is_collatable(col->typeName->typeOid))
+				ereport(ERROR,
+						(errcode(ERRCODE_INDETERMINATE_COLLATION),
+						errmsg("no collation was derived for column \"%s\" with collatable type %s",
+								col->colname,
+								format_type_be(col->typeName->typeOid)),
+						errhint("Use the COLLATE clause to set the collation explicitly.")));
 
-	if (lc != NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("too many column names were specified")));
+			attrList = lappend(attrList, col);
+		}
 
-	/*
-	 * Actually create the target table
-	 */
-	intoRelationAddr = create_ctas_internal(attrList, into);
+		if (lc != NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("too many column names were specified")));
 
-	/*
-	 * Finally we can open the target table
-	 */
-	intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
+		/*
+		* Actually create the target table
+		*/
+		intoRelationAddr = create_ctas_internal(attrList, into);
 
-	/*
-	 * Check INSERT permission on the constructed table.
-	 *
-	 * XXX: It would arguably make sense to skip this check if into->skipData
-	 * is true.
-	 */
-	rte = makeNode(RangeTblEntry);
-	rte->rtekind = RTE_RELATION;
-	rte->relid = intoRelationAddr.objectId;
-	rte->relkind = relkind;
-	rte->rellockmode = RowExclusiveLock;
-	rte->requiredPerms = ACL_INSERT;
+		/*
+		* Finally we can open the target table
+		*/
+		intoRelationDesc = table_open(intoRelationAddr.objectId, AccessExclusiveLock);
 
-	for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
-		rte->insertedCols = bms_add_member(rte->insertedCols,
-										   attnum - FirstLowInvalidHeapAttributeNumber);
+		/*
+		* Check INSERT permission on the constructed table.
+		*
+		* XXX: It would arguably make sense to skip this check if into->skipData
+		* is true.
+		*/
+		rte = makeNode(RangeTblEntry);
+		rte->rtekind = RTE_RELATION;
+		rte->relid = intoRelationAddr.objectId;
+		rte->relkind = relkind;
+		rte->rellockmode = RowExclusiveLock;
+		rte->requiredPerms = ACL_INSERT;
+
+		for (attnum = 1; attnum <= intoRelationDesc->rd_att->natts; attnum++)
+			rte->insertedCols = bms_add_member(rte->insertedCols,
+											attnum - FirstLowInvalidHeapAttributeNumber);
+
+		ExecCheckRTPerms(list_make1(rte), true);
 
-	ExecCheckRTPerms(list_make1(rte), true);
+		/*
+		* Make sure the constructed table does not have RLS enabled.
+		*
+		* check_enable_rls() will ereport(ERROR) itself if the user has requested
+		* something invalid, and otherwise will return RLS_ENABLED if RLS should
+		* be enabled here.  We don't actually support that currently, so throw
+		* our own ereport(ERROR) if that happens.
+		*/
+		if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("policies not yet implemented for this command")));
 
-	/*
-	 * Make sure the constructed table does not have RLS enabled.
-	 *
-	 * check_enable_rls() will ereport(ERROR) itself if the user has requested
-	 * something invalid, and otherwise will return RLS_ENABLED if RLS should
-	 * be enabled here.  We don't actually support that currently, so throw
-	 * our own ereport(ERROR) if that happens.
-	 */
-	if (check_enable_rls(intoRelationAddr.objectId, InvalidOid, false) == RLS_ENABLED)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("policies not yet implemented for this command")));
+		/*
+		* Tentatively mark the target as populated, if it's a matview and we're
+		* going to fill it; otherwise, no change needed.
+		*/
+		if (is_matview && !into->skipData)
+			SetMatViewPopulatedState(intoRelationDesc, true);
 
-	/*
-	 * Tentatively mark the target as populated, if it's a matview and we're
-	 * going to fill it; otherwise, no change needed.
-	 */
-	if (is_matview && !into->skipData)
-		SetMatViewPopulatedState(intoRelationDesc, true);
+		/*
+		* Fill private fields of myState for use by later routines
+		*/
+		myState->rel = intoRelationDesc;
+		myState->reladdr = intoRelationAddr;
+		myState->output_cid = GetCurrentCommandId(true);
+		myState->ti_options = TABLE_INSERT_SKIP_FSM;
+		myState->bistate = GetBulkInsertState();
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->rel = intoRelationDesc;
-	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
-	myState->bistate = GetBulkInsertState();
+		/*
+		* Valid smgr_targblock implies something already wrote to the relation.
+		* This may be harmless, but this function hasn't planned for it.
+		*/
+		Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
 
-	/*
-	 * Valid smgr_targblock implies something already wrote to the relation.
-	 * This may be harmless, but this function hasn't planned for it.
-	 */
-	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
+		if (myState->is_parallel == true)
+		{
+			myState->object_id = intoRelationAddr.objectId;
+			/*
+			 * We don't need to skip contacting FSM while inserting tuples
+			 * for parallel mode, while extending the relations, workers
+			 * instead of blocking on a page while another worker is inserting,
+			 * can check the FSM for another page that can accommodate the
+			 * tuples. This results in major benefit for parallel inserts.
+			 */
+			myState->ti_options = 0;
+			/*
+			 * rd_createSubid is marked invalid, otherwise, the table is
+			 * not allowed to extend by the workers.
+			 */
+			myState->rel->rd_createSubid = InvalidSubTransactionId;
+		}
+	}
 }
 
 /*
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index c98c9b5547..5a9ab2ce39 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -371,6 +371,18 @@ ExplainOneQuery(Query *query, int cursorOptions,
 		return;
 	}
 
+	/*
+	 * Flag to let the planner know that the SELECT query is for
+	 * CREATE TABLE AS. This is used to make the number tuples
+	 * transferred from workers to gather node(in case parallelism
+	 * kicks in for the SELECT part of the CTAS), to zero as each
+	 * worker will parallelly insert it's share of tuples.
+	 */
+	if (into != NULL &&
+		into->type == T_IntoClause &&
+		into->viewQuery == NULL)
+		query->isForCTAS = true;
+
 	/* if an advisor plugin is present, let it manage things */
 	if (ExplainOneQuery_hook)
 		(*ExplainOneQuery_hook) (query, cursorOptions, into, es,
@@ -536,7 +548,18 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 	 * AS, we'd better use the appropriate tuple receiver.
 	 */
 	if (into)
+	{
 		dest = CreateIntoRelDestReceiver(into);
+		/*
+		 * SELECT part of the CTAS is parallelizable, so we can make
+		 * each parallel worker insert the tuples that are resulted
+		 * in it's execution into the target table.
+		 */
+		if (into->type == T_IntoClause &&
+			into->viewQuery == NULL &&
+			IsA(plannedstmt->planTree, Gather))
+			((DR_intorel *) dest)->is_parallel = true;
+	}
 	else
 		dest = None_Receiver;
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 2e27e26ba4..ceb45d194d 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -45,6 +45,7 @@
 #include "access/xact.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_publication.h"
+#include "commands/createas.h"
 #include "commands/matview.h"
 #include "commands/trigger.h"
 #include "executor/execdebug.h"
@@ -352,6 +353,26 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 	if (sendTuples)
 		dest->rStartup(dest, operation, queryDesc->tupDesc);
 
+	/*
+	 * For parallelizing inserts in CTAS i.e. making each
+	 * parallel worker inerst it's tuples, we must send
+	 * information such as intoclause(for each worker
+	 * building it's own dest receiver), object id(for each
+	 * worker to open the table), cid(command id for each
+	 * worker to insert properly) from leader to workers.
+	 */
+	if (queryDesc->plannedstmt->parallelModeNeeded == true &&
+		dest != NULL &&
+		dest->mydest == DestIntoRel &&
+		((DR_intorel *) dest)->is_parallel == true &&
+		((DR_intorel *) dest)->is_parallel_worker != true)
+	{
+		queryDesc->planstate->intoclause = ((DR_intorel *) dest)->into;
+		queryDesc->planstate->objectid = ((DR_intorel *) dest)->object_id;
+		queryDesc->planstate->cid 	=	((DR_intorel *) dest)->output_cid;
+		queryDesc->planstate->dest = dest;
+	}
+
 	/*
 	 * run plan
 	 */
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 382e78fb7f..c0143d4e0e 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "commands/createas.h"
 #include "executor/execParallel.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
@@ -65,6 +66,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_INTO_CLAUSE		UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -77,6 +79,9 @@ typedef struct FixedParallelExecutorState
 	dsa_pointer param_exec;
 	int			eflags;
 	int			jit_flags;
+	CommandId 	cid;			/* workers to insert appropriately. */
+	Oid			objectid;		/* workers to open relation/table.  */
+	pg_atomic_uint64	processed; /* number tuples inserted by all the workers. */
 } FixedParallelExecutorState;
 
 /*
@@ -601,6 +606,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	Size		dsa_minsize = dsa_minimum_size();
 	char	   *query_string;
 	int			query_len;
+	char 		*intoclausestr = NULL;
 
 	/*
 	 * Force any initplan outputs that we're going to pass to workers to be
@@ -713,6 +719,15 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Estimate space for into clause for CTAS. */
+	if (planstate->intoclause != NULL &&
+		planstate->intoclause->type == T_IntoClause)
+	{
+		intoclausestr = nodeToString(planstate->intoclause);
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(intoclausestr) + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
 	/* Everyone's had a chance to ask for space, so now create the DSM. */
 	InitializeParallelDSM(pcxt);
 
@@ -730,6 +745,22 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	fpes->param_exec = InvalidDsaPointer;
 	fpes->eflags = estate->es_top_eflags;
 	fpes->jit_flags = estate->es_jit_flags;
+	pg_atomic_init_u64(&fpes->processed, 0);
+	pei->processed = &fpes->processed;
+
+	if (intoclausestr != NULL &&
+		planstate->objectid != InvalidOid &&
+		planstate->cid != InvalidCommandId)
+	{
+		fpes->objectid = planstate->objectid;
+		fpes->cid = planstate->cid;
+	}
+	else
+	{
+		fpes->objectid = InvalidOid;
+		fpes->cid = InvalidCommandId;
+	}
+
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
 
 	/* Store query string */
@@ -759,6 +790,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
 	pei->wal_usage = walusage_space;
 
+	if (intoclausestr != NULL)
+	{
+		char *shmptr = (char *)shm_toc_allocate(pcxt->toc,
+												strlen(intoclausestr) + 1);
+		strcpy(shmptr, intoclausestr);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr);
+	}
 	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
@@ -1388,12 +1426,29 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
+	char		*intoclausestr = NULL;
+	IntoClause	*intoclause = NULL;
 
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
 
-	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
-	receiver = ExecParallelGetReceiver(seg, toc);
+	intoclausestr = shm_toc_lookup(toc, PARALLEL_KEY_INTO_CLAUSE, true);
+	if (intoclausestr != NULL)
+	{
+		/*
+		 * If the worker is for parallel insert in CTAS, then use
+		 * the proper dest receiver.
+		 */
+		intoclause = (IntoClause *) stringToNode(intoclausestr);
+		receiver = CreateIntoRelDestReceiver(intoclause);
+		((DR_intorel *)receiver)->is_parallel_worker = true;
+		((DR_intorel *)receiver)->object_id = fpes->objectid;
+		((DR_intorel *)receiver)->output_cid = fpes->cid;
+	}
+	else
+		/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
+		receiver = ExecParallelGetReceiver(seg, toc);
+
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
 	if (instrumentation != NULL)
 		instrument_options = instrumentation->instrument_options;
@@ -1472,6 +1527,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 			queryDesc->estate->es_jit->instr;
 	}
 
+	/*
+	 * Write out the number of tuples this worker has inserted. Leader
+	 * will use it to inform to the end client.
+	 */
+	if (intoclausestr != NULL)
+		pg_atomic_add_fetch_u64(&fpes->processed, queryDesc->estate->es_processed);
+
 	/* Must do this after capturing instrumentation. */
 	ExecutorEnd(queryDesc);
 
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index a01b46af14..bda90a23f9 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -166,6 +166,18 @@ ExecGather(PlanState *pstate)
 		{
 			ParallelContext *pcxt;
 
+			/*
+			 * Take the necessary information to be passed to workers for
+			 * parallel inserts in CTAS.
+			 */
+			if (node->ps.intoclause != NULL &&
+				node->ps.intoclause->type == T_IntoClause)
+			{
+				node->ps.lefttree->intoclause = node->ps.intoclause;
+				node->ps.lefttree->objectid = node->ps.objectid;
+				node->ps.lefttree->cid		= node->ps.cid;
+			}
+
 			/* Initialize, or re-initialize, shared state needed by workers. */
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
@@ -220,6 +232,60 @@ ExecGather(PlanState *pstate)
 	econtext = node->ps.ps_ExprContext;
 	ResetExprContext(econtext);
 
+	if (node->ps.intoclause != NULL &&
+		node->ps.intoclause->type == T_IntoClause)
+	{
+		/*
+		 * By now, for parallel workers (if launched any),
+		 * would have started their work i.e. insertion to
+		 * target table.
+		 * In case the leader is chosen to participate for
+		 * parallel inserts in CTAS, then finish it's share
+		 * before going to wait for the parallel workers to
+		 * finish.
+		 */
+		if (node->need_to_scan_locally == true &&
+			node->ps.dest != NULL &&
+			node->ps.dest->mydest == DestIntoRel)
+		{
+			EState	   *estate = node->ps.state;
+			TupleTableSlot *outerTupleSlot;
+
+			for(;;)
+			{
+				/* Install our DSA area while executing the plan. */
+				estate->es_query_dsa =
+					node->pei ? node->pei->area : NULL;
+
+				outerTupleSlot = ExecProcNode(node->ps.lefttree);
+
+				estate->es_query_dsa = NULL;
+
+				if (!TupIsNull(outerTupleSlot))
+				{
+					(void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest);
+					node->ps.state->es_processed++;
+				}
+
+				if(TupIsNull(outerTupleSlot))
+					break;
+			}
+		}
+
+		/* Wait for the parallel workers to finish. */
+		if (node->nworkers_launched > 0)
+		{
+			ExecShutdownGatherWorkers(node);
+			/*
+			 * Add up the total tuples inserted by all workers, to
+			 * the tuples inserted by the leader(if any). This will
+			 * be shared to client.
+			 */
+			node->ps.state->es_processed += pg_atomic_read_u64(node->pei->processed);
+		}
+
+		return NULL;
+	}
 	/*
 	 * Get next tuple, either from one of our workers, or by running the plan
 	 * ourselves.
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index cd3716d494..9ed671415b 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -377,6 +377,18 @@ cost_gather(GatherPath *path, PlannerInfo *root,
 	else
 		path->path.rows = rel->rows;
 
+	/*
+	 * Make the number of tuples that are transferred from
+	 * workers to gather/leader node zero as each worker
+	 * parallelly insert the tuples that are resulted from
+	 * it's chunk of plan execution. This change may make
+	 * the parallel plan cheap among all other plans, and
+	 * influence the planner to consdier this parallel plan.
+	 */
+	if (root->parse->isForCTAS &&
+		root->query_level == 1)
+		path->path.rows = 0;
+
 	startup_cost = path->subpath->startup_cost;
 
 	run_cost = path->subpath->total_cost - path->subpath->startup_cost;
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index 7629230254..1564687efe 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -14,12 +14,27 @@
 #ifndef CREATEAS_H
 #define CREATEAS_H
 
+#include "access/heapam.h"
 #include "catalog/objectaddress.h"
 #include "nodes/params.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 #include "utils/queryenvironment.h"
 
+typedef struct
+{
+	DestReceiver pub;			/* publicly-known function pointers */
+	IntoClause *into;			/* target relation specification */
+	/* These fields are filled by intorel_startup: */
+	Relation	rel;			/* relation to write to */
+	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
+	CommandId	output_cid;		/* cmin to insert in output tuples */
+	int			ti_options;		/* table_tuple_insert performance options */
+	BulkInsertState bistate;	/* bulk insert state */
+	bool		is_parallel;		/* true if parallelism is to be considered */
+	bool		is_parallel_worker; /* true for parallel worker */
+	Oid			object_id;			/* used for table open by parallel worker */
+} DR_intorel;
 
 extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
 									   ParamListInfo params, QueryEnvironment *queryEnv,
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a39a5b29c..77f69946bf 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -35,6 +35,7 @@ typedef struct ParallelExecutorInfo
 	/* These two arrays have pcxt->nworkers_launched entries: */
 	shm_mq_handle **tqueue;		/* tuple queues for worker output */
 	struct TupleQueueReader **reader;	/* tuple reader/writer support */
+	volatile pg_atomic_uint64	*processed;	/* number of tuples inserted by all workers */
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a5ab1aed14..478ffeb74c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -23,6 +23,7 @@
 #include "nodes/tidbitmap.h"
 #include "partitioning/partdefs.h"
 #include "storage/condition_variable.h"
+#include "tcop/dest.h"
 #include "utils/hsearch.h"
 #include "utils/queryenvironment.h"
 #include "utils/reltrigger.h"
@@ -1025,6 +1026,11 @@ typedef struct PlanState
 	bool		outeropsset;
 	bool		inneropsset;
 	bool		resultopsset;
+	/* Below is parallel inserts in CTAS related info. */
+	IntoClause	*intoclause;
+	Oid			objectid;
+	CommandId	cid;
+	DestReceiver *dest;
 } PlanState;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 60c2f45466..fe43dc941e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -180,6 +180,7 @@ typedef struct Query
 	 */
 	int			stmt_location;	/* start location, or -1 if unknown */
 	int			stmt_len;		/* length in bytes; 0 means "rest of string" */
+	bool		isForCTAS;		/* true if the select query is for create table as */
 } Query;
 
 
-- 
2.25.1

Reply via email to