On Tue, Nov 3, 2020 at 2:28 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Nov 2, 2020 at 12:40 PM Heikki Linnakangas <hlinn...@iki.fi> wrote:
> >
> > On 02/11/2020 08:14, Amit Kapila wrote:
> > > On Fri, Oct 30, 2020 at 10:11 PM Heikki Linnakangas <hlinn...@iki.fi> 
> > > wrote:
> > >>
> > >> In this design, you don't need to keep line boundaries in shared memory,
> > >> because each worker process is responsible for finding the line
> > >> boundaries of its own block.
> > >>
> > >> There's a point of serialization here, in that the next block cannot be
> > >> processed, until the worker working on the previous block has finished
> > >> scanning the EOLs, and set the starting position on the next block,
> > >> putting it in READY state. That's not very different from your patch,
> > >> where you had a similar point of serialization because the leader
> > >> scanned the EOLs,
> > >
> > > But in the design (single producer multiple consumer) used by the
> > > patch the worker doesn't need to wait till the complete block is
> > > processed, it can start processing the lines already found. This will
> > > also allow workers to start much earlier to process the data as it
> > > doesn't need to wait for all the offsets corresponding to 64K block
> > > ready. However, in the design where each worker is processing the 64K
> > > block, it can lead to much longer waits. I think this will impact the
> > > Copy STDIN case more where in most cases (200-300 bytes tuples) we
> > > receive line-by-line from client and find the line-endings by leader.
> > > If the leader doesn't find the line-endings the workers need to wait
> > > till the leader fill the entire 64K chunk, OTOH, with current approach
> > > the worker can start as soon as leader is able to populate some
> > > minimum number of line-endings
> >
> > You can use a smaller block size.
> >
>
> Sure, but the same problem can happen if the last line in that block
> is too long and we need to peek into the next block. And then there
> could be cases where a single line could be greater than 64K.
>
> > However, the point of parallel copy is
> > to maximize bandwidth.
> >
>
> Okay, but this first-phase (finding the line boundaries) can anyway be
> not done in parallel and we have seen in some of the initial
> benchmarking that this initial phase is a small part of work
> especially when the table has indexes, constraints, etc. So, I think
> it won't matter much if this splitting is done in a single process or
> multiple processes.
>
> > If the workers ever have to sit idle, it means
> > that the bottleneck is in receiving data from the client, i.e. the
> > backend is fast enough, and you can't make the overall COPY finish any
> > faster no matter how you do it.
> >
> > > The other point is that the leader backend won't be used completely as
> > > it is only doing a very small part (primarily reading the file) of the
> > > overall work.
> >
> > An idle process doesn't cost anything. If you have free CPU resources,
> > use more workers.
> >
> > > We have discussed both these approaches (a) single producer multiple
> > > consumer, and (b) all workers doing the processing as you are saying
> > > in the beginning and concluded that (a) is better, see some of the
> > > relevant emails [1][2][3].
> > >
> > > [1] - 
> > > https://www.postgresql.org/message-id/20200413201633.cki4nsptynq7blhg%40alap3.anarazel.de
> > > [2] - 
> > > https://www.postgresql.org/message-id/20200415181913.4gjqcnuzxfzbbzxa%40alap3.anarazel.de
> > > [3] - 
> > > https://www.postgresql.org/message-id/78C0107E-62F2-4F76-BFD8-34C73B716944%40anarazel.de
> >
> > Sorry I'm late to the party. I don't think the design I proposed was
> > discussed in that threads.
> >
>
> I think something close to that is discussed as you have noticed in
> your next email but IIRC, because many people (Andres, Ants, myself
> and author) favoured the current approach (single reader and multiple
> consumers) we decided to go with that. I feel this patch is very much
> in the POC stage due to which the code doesn't look good and as we
> move forward we need to see what is the better way to improve it,
> maybe one of the ways is to split it as you are suggesting so that it
> can be easier to review. I think the other important thing which this
> patch has not addressed properly is the parallel-safety checks as
> pointed by me earlier. There are two things to solve there (a) the
> lower-level code (like heap_* APIs, CommandCounterIncrement, xact.c
> APIs, etc.) have checks which doesn't allow any writes, we need to see
> which of those we can open now (or do some additional work to prevent
> from those checks) after some of the work done for parallel-writes in
> PG-13[1][2], and (b) in which all cases we can parallel-writes
> (parallel copy) is allowed, for example need to identify whether table
> or one of its partitions has any constraint/expression which is
> parallel-unsafe.
>

I have worked to provide a patch for the parallel safety checks. It
checks if parallely copy can be performed, Parallel copy cannot be
performed for the following a) If relation is temporary table b) If
relation is foreign table c) If relation has non parallel safe index
expressions d) If relation has triggers present whose type is of non
before statement trigger type e) If relation has check constraint
which are not parallel safe f) If relation has partition and any
partition has the above type. This patch has the checks for it. This
patch will be used by parallel copy implementation.
Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 18b5efabc5cf82870c8b5d015f78f4b7d3fe18ef Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Tue, 10 Nov 2020 18:24:09 +0530
Subject: [PATCH v10 2/6] Check if parallel copy can be performed.

Checks if parallely copy can be performed, Parallel copy cannot be performed
for the following a) If relation is temporary table b) If relation is foreign
table c) If relation has non parallel safe index expressions d) If relation has
triggers present whose type is of non before statement trigger type e) If
relation has check constraint which are not parallel safe f) If relation
has partition and any partition has the above type. This patch has the
checks for it. This patch will be used by parallel copy implementation patch.
---
 src/backend/access/heap/heapam.c     |  11 -
 src/backend/access/transam/xact.c    |  26 ++-
 src/backend/commands/Makefile        |   1 +
 src/backend/commands/copyparallel.c  | 103 +++++++++
 src/backend/optimizer/util/clauses.c | 405 +++++++++++++++++++++++++++++++++++
 src/include/access/xact.h            |   1 +
 src/include/commands/copy.h          |   2 +
 src/include/optimizer/clauses.h      |   1 +
 8 files changed, 534 insertions(+), 16 deletions(-)
 create mode 100644 src/backend/commands/copyparallel.c

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861..1602525 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2043,17 +2043,6 @@ static HeapTuple
 heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
 {
-	/*
-	 * To allow parallel inserts, we need to ensure that they are safe to be
-	 * performed in workers. We have the infrastructure to allow parallel
-	 * inserts in general except for the cases where inserts generate a new
-	 * CommandId (eg. inserts into a table having a foreign key column).
-	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
-
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index af6afce..d6d449f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -764,18 +764,34 @@ GetCurrentCommandId(bool used)
 	if (used)
 	{
 		/*
-		 * Forbid setting currentCommandIdUsed in a parallel worker, because
-		 * we have no provision for communicating this back to the leader.  We
-		 * could relax this restriction when currentCommandIdUsed was already
-		 * true at the start of the parallel operation.
+		 * If in a parallel worker, only allow setting currentCommandIdUsed
+		 * if currentCommandIdUsed was already true at the start of the
+		 * parallel operation (by way of SetCurrentCommandIdUsed()), otherwise
+		 * forbid setting currentCommandIdUsed because we have no provision
+		 * for communicating this back to the leader.
 		 */
-		Assert(!IsParallelWorker());
+		Assert(!(IsParallelWorker() && !currentCommandIdUsed));
 		currentCommandIdUsed = true;
 	}
 	return currentCommandId;
 }
 
 /*
+ *	SetCurrentCommandIdUsedForWorker
+ *
+ * For a parallel worker, record that the currentCommandId has been used.
+ * This must only be called at the start of a parallel operation.
+ */
+void
+SetCurrentCommandIdUsedForWorker(void)
+{
+	Assert(IsParallelWorker() && !currentCommandIdUsed &&
+		   (currentCommandId != InvalidCommandId));
+
+	currentCommandIdUsed = true;
+}
+
+/*
  *	SetParallelStartTimestamps
  *
  * In a parallel worker, we should inherit the parent transaction's
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3..a224aac 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -24,6 +24,7 @@ OBJS = \
 	constraint.o \
 	conversioncmds.o \
 	copy.o \
+	copyparallel.o \
 	createas.o \
 	dbcommands.o \
 	define.o \
diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c
new file mode 100644
index 0000000..290d532
--- /dev/null
+++ b/src/backend/commands/copyparallel.c
@@ -0,0 +1,103 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyparallel.c
+ *              Implements the Parallel COPY utility command
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *        src/backend/commands/copyparallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "catalog/pg_proc_d.h"
+#include "commands/copy.h"
+#include "libpq/libpq.h"
+#include "optimizer/clauses.h"
+#include "optimizer/optimizer.h"
+#include "pgstat.h"
+#include "utils/lsyscache.h"
+
+/*
+ * CheckExprParallelSafety
+ *
+ * Determine if where cluase and default expressions are parallel safe & do not
+ * have volatile expressions, return true if condition satisfies else return
+ * false.
+ */
+static bool
+CheckExprParallelSafety(CopyState cstate)
+{
+	if (max_parallel_hazard((Query *) cstate->whereClause) != PROPARALLEL_SAFE)
+		return false;
+
+	/*
+	 * Can't support parallel copy if there are any volatile function
+	 * expressions in WHERE clause as such expressions may query the table
+	 * we're inserting into.
+	 */
+	if (contain_volatile_functions(cstate->whereClause))
+		return false;
+
+	/*
+	 * Check if any of the column has default expression. if yes, and they are
+	 * not parallel safe, then parallelism is not allowed. For instance, if
+	 * there are any serial/bigserial columns for which nextval() default
+	 * expression which is parallel unsafe is associated, parallelism should
+	 * not be allowed.
+	 */
+	if (cstate->defexprs != NULL && cstate->num_defaults != 0)
+	{
+		int			i;
+
+		for (i = 0; i < cstate->num_defaults; i++)
+		{
+			if (max_parallel_hazard((Query *) cstate->defexprs[i]->expr) !=
+				PROPARALLEL_SAFE)
+				return false;
+
+			/*
+			 * Can't support parallel copy if there are any volatile function
+			 * expressions in default expressions as such expressions may
+			 * query the table we're inserting into.
+			 */
+			if (contain_volatile_functions((Node *) cstate->defexprs[i]->expr))
+				return false;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * IsParallelCopyAllowed
+ *
+ * Check if parallel copy can be allowed.
+ */
+bool
+IsParallelCopyAllowed(CopyState cstate, Oid relid)
+{
+	/*
+	 * Check if parallel operation can be performed based on local
+	 * table/foreign table/index/check constraints/triggers present for the
+	 * relation and also by doing similar checks recursively for each of the
+	 * associated parrtitions if exists.
+	 */
+	if (MaxParallelHazardForModify(relid) != PROPARALLEL_SAFE)
+		return false;
+
+	/*
+	 * If there are volatile default expressions or where clause contain
+	 * volatile expressions, allow parallelism if they are parallel safe,
+	 * otherwise not.
+	 */
+	if (!CheckExprParallelSafety(cstate))
+		return false;
+
+	return true;
+}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 85ef873..c82f5f2 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -20,12 +20,19 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "access/genam.h"
+#include "access/table.h"
+#include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_class.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_constraint_d.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_operator.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/functions.h"
 #include "funcapi.h"
@@ -42,7 +49,9 @@
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_func.h"
+#include "partitioning/partdesc.h"
 #include "rewrite/rewriteManip.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -50,6 +59,7 @@
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
@@ -1073,6 +1083,401 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 								  context);
 }
 
+/*
+ * MaxTriggerDataParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for the specified trigger data.
+ */
+static char
+MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc,
+									  max_parallel_hazard_context *context)
+{
+	int			i;
+
+	/*
+	 * When transition tables are involved (if after statement triggers are
+	 * present), we collect minimal tuples in the tuple store after processing
+	 * them so that later after statement triggers can access them.  Now, if
+	 * we want to enable parallelism for such cases, we instead need to store
+	 * and access tuples from shared tuple store.  However, it does not have
+	 * the facility to store tuples in-memory, so we always need to store and
+	 * access from a file which could be costly unless we also have an
+	 * additional way to store minimal tuples in shared memory till work_mem
+	 * and then in shared tuple store. It is possible to do all this to enable
+	 * parallel copy for such cases. Currently, we can disallow parallelism
+	 * for such cases and later allow if required.
+	 *
+	 * When there are BEFORE/AFTER/INSTEAD OF row triggers on the table. We do
+	 * not allow parallelism in such cases because such triggers might query
+	 * the table we are inserting into and act differently if the tuples that
+	 * have already been processed and prepared for insertion are not there.
+	 * Now, if we allow parallelism with such triggers the behaviour would
+	 * depend on if the parallel worker has already inserted or not that
+	 * particular tuples.
+	 */
+	if (trigdesc->trig_insert_after_statement ||
+		trigdesc->trig_insert_new_table ||
+		trigdesc->trig_insert_before_row ||
+		trigdesc->trig_insert_after_row ||
+		trigdesc->trig_insert_instead_row)
+	{
+		context->max_hazard = PROPARALLEL_UNSAFE;
+		return context->max_hazard;
+	}
+
+	for (i = 0; i < trigdesc->numtriggers; i++)
+	{
+		Trigger    *trigger = &trigdesc->triggers[i];
+		int			trigtype;
+
+		/*
+		 * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in
+		 * the relation, and this would result in creation of new CommandIds
+		 * on insert/update/delete and this isn't supported in a parallel
+		 * worker (but is safe in the parallel leader).
+		 */
+		trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+
+		/*
+		 * No parallelism if foreign key check trigger is present. This is
+		 * because, while performing foreign key checks, we take KEY SHARE
+		 * lock on primary key table rows which inturn will increment the
+		 * command counter and updates the snapshot.  Since we share the
+		 * snapshots at the beginning of the command, we can't allow it to be
+		 * changed later. So, unless we do something special for it, we can't
+		 * allow parallelism in such cases.
+		 */
+		if (trigtype == RI_TRIGGER_FK)
+		{
+			context->max_hazard = PROPARALLEL_UNSAFE;
+			return context->max_hazard;
+		}
+	}
+
+	return context->max_hazard;
+}
+
+/*
+ * MaxIndexExprsParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for any existing index
+ * expressions of a specified relation.
+ */
+static char
+MaxIndexExprsParallelHazardForModify(Relation rel,
+									 max_parallel_hazard_context *context)
+{
+	List	   *indexOidList;
+	ListCell   *lc;
+	LOCKMODE	lockmode = AccessShareLock;
+
+	indexOidList = RelationGetIndexList(rel);
+	foreach(lc, indexOidList)
+	{
+		Oid			indexOid = lfirst_oid(lc);
+		Relation	indexRel;
+		IndexInfo  *indexInfo;
+
+		if (ConditionalLockRelationOid(indexOid, lockmode))
+		{
+			indexRel = index_open(indexOid, NoLock);
+		}
+		else
+		{
+			context->max_hazard = PROPARALLEL_UNSAFE;
+			return context->max_hazard;
+		}
+
+		indexInfo = BuildIndexInfo(indexRel);
+
+		if (indexInfo->ii_Expressions != NIL)
+		{
+			int			i;
+			ListCell   *indexExprItem = list_head(indexInfo->ii_Expressions);
+
+			for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+			{
+				int			keycol = indexInfo->ii_IndexAttrNumbers[i];
+
+				if (keycol == 0)
+				{
+					/* Found an index expression */
+
+					Node	   *indexExpr;
+
+					if (indexExprItem == NULL)	/* shouldn't happen */
+						elog(ERROR, "too few entries in indexprs list");
+
+					indexExpr = (Node *) lfirst(indexExprItem);
+					indexExpr = (Node *) expression_planner((Expr *) indexExpr);
+
+					if (max_parallel_hazard_walker(indexExpr, context))
+					{
+						index_close(indexRel, lockmode);
+						return context->max_hazard;
+					}
+
+					indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem);
+				}
+			}
+		}
+		index_close(indexRel, lockmode);
+	}
+
+	return context->max_hazard;
+}
+
+/*
+ * MaxDomainParallelHazardForModify
+ *
+ * Finds the maximum parallel-mode hazard level for the specified DOMAIN type.
+ * Only any CHECK expressions are examined for parallel safety.
+ * DEFAULT values of DOMAIN-type columns in the target-list are already
+ * being checked for parallel-safety in the max_parallel_hazard() scan of the
+ * query tree in standard_planner().
+ *
+ */
+static char
+MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context)
+{
+	Relation	conRel;
+	ScanKeyData key[1];
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	LOCKMODE	lockmode = AccessShareLock;
+
+	conRel = table_open(ConstraintRelationId, lockmode);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_constraint_contypid, BTEqualStrategyNumber,
+				F_OIDEQ, ObjectIdGetDatum(typid));
+	scan = systable_beginscan(conRel, ConstraintTypidIndexId, true,
+							  NULL, 1, key);
+
+	while (HeapTupleIsValid((tup = systable_getnext(scan))))
+	{
+		Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
+
+		if (con->contype == CONSTRAINT_CHECK)
+		{
+			char	   *conbin;
+			Datum		val;
+			bool		isnull;
+			Expr	   *checkExpr;
+
+			val = SysCacheGetAttr(CONSTROID, tup,
+								  Anum_pg_constraint_conbin, &isnull);
+			if (isnull)
+				elog(ERROR, "null conbin for constraint %u", con->oid);
+			conbin = TextDatumGetCString(val);
+			checkExpr = stringToNode(conbin);
+			if (max_parallel_hazard_walker((Node *) checkExpr, context))
+			{
+				break;
+			}
+		}
+	}
+
+	systable_endscan(scan);
+	table_close(conRel, lockmode);
+	return context->max_hazard;
+}
+
+/*
+ * MaxRelParallelHazardForModify
+ *
+ * Determines the maximum parallel-mode hazard level for modification
+ * of a specified relation.
+ */
+static char
+MaxRelParallelHazardForModify(Oid relid, max_parallel_hazard_context *context)
+{
+	Relation	rel;
+	TupleDesc	tupdesc;
+	int			attnum;
+
+	LOCKMODE	lockmode = AccessShareLock;
+
+	/*
+	 * It's possible that this relation is locked for exclusive access in
+	 * another concurrent transaction (e.g. as a result of a ALTER TABLE ...
+	 * operation) until that transaction completes. If a share-lock can't be
+	 * acquired on it now, we have to assume this could be the worst-case, so
+	 * to avoid blocking here until that transaction completes, conditionally
+	 * try to acquire the lock and assume and return UNSAFE on failure.
+	 */
+	if (ConditionalLockRelationOid(relid, lockmode))
+	{
+		rel = table_open(relid, NoLock);
+	}
+	else
+	{
+		context->max_hazard = PROPARALLEL_UNSAFE;
+		return context->max_hazard;
+	}
+
+	/*
+	 * Check if copy is into foreign table. We can not allow parallelism in
+	 * this case because each worker needs to establish FDW connection and
+	 * operate in a separate transaction. Unless we have a capability to
+	 * provide two-phase commit protocol, we can not allow parallelism.
+	 *
+	 * Also check if copy is into temporary table. Since parallel workers can
+	 * not access temporary table, parallelism is not allowed.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
+		RelationUsesLocalBuffers(rel))
+	{
+		table_close(rel, lockmode);
+		context->max_hazard = PROPARALLEL_UNSAFE;
+		return context->max_hazard;
+	}
+
+	/*
+	 * If a partitioned table, check that each partition is safe for
+	 * modification in parallel-mode.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		int			i;
+		PartitionDesc pdesc;
+		PartitionKey pkey;
+		ListCell   *partexprs_item;
+		int			partnatts;
+		List	   *partexprs;
+
+		pkey = RelationGetPartitionKey(rel);
+
+		partnatts = get_partition_natts(pkey);
+		partexprs = get_partition_exprs(pkey);
+
+		partexprs_item = list_head(partexprs);
+		for (i = 0; i < partnatts; i++)
+		{
+			/* Check parallel-safety of partition key support functions */
+			if (OidIsValid(pkey->partsupfunc[i].fn_oid))
+			{
+				if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context))
+				{
+					table_close(rel, lockmode);
+					return context->max_hazard;
+				}
+			}
+
+			/* Check parallel-safety of any expressions in the partition key */
+			if (get_partition_col_attnum(pkey, i) == 0)
+			{
+				Node	   *checkExpr = (Node *) lfirst(partexprs_item);
+
+				if (max_parallel_hazard_walker(checkExpr, context))
+				{
+					table_close(rel, lockmode);
+					return context->max_hazard;
+				}
+
+				partexprs_item = lnext(partexprs, partexprs_item);
+			}
+		}
+
+		/* Recursively check each partition ... */
+		pdesc = RelationGetPartitionDesc(rel);
+		for (i = 0; i < pdesc->nparts; i++)
+		{
+			if (MaxRelParallelHazardForModify(pdesc->oids[i], context) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, lockmode);
+				return context->max_hazard;
+			}
+		}
+	}
+
+	/*
+	 * If there are any index expressions, check that they are parallel-mode
+	 * safe.
+	 */
+	if (MaxIndexExprsParallelHazardForModify(rel, context) != PROPARALLEL_SAFE)
+	{
+		table_close(rel, lockmode);
+		return context->max_hazard;
+	}
+
+	/*
+	 * If any triggers exist, check that they are parallel safe.
+	 */
+	if (rel->trigdesc != NULL &&
+		MaxTriggerDataParallelHazardForModify(rel->trigdesc, context) != PROPARALLEL_SAFE)
+	{
+		table_close(rel, lockmode);
+		return context->max_hazard;
+	}
+
+	tupdesc = RelationGetDescr(rel);
+	for (attnum = 0; attnum < tupdesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupdesc, attnum);
+
+		/* We don't need info for dropped or generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		/*
+		 * If the column is of a DOMAIN type, determine whether that domain
+		 * has any CHECK expressions that are not parallel-mode safe.
+		 */
+		if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+		{
+			if (MaxDomainParallelHazardForModify(att->atttypid, context) != PROPARALLEL_SAFE)
+			{
+				table_close(rel, lockmode);
+				return context->max_hazard;
+			}
+		}
+	}
+
+	/*
+	 * Check if there are any CHECK constraints which are not parallel-safe.
+	 */
+	if (tupdesc->constr != NULL && tupdesc->constr->num_check > 0)
+	{
+		int			i;
+
+		ConstrCheck *check = tupdesc->constr->check;
+
+		for (i = 0; i < tupdesc->constr->num_check; i++)
+		{
+			Expr	   *checkExpr = stringToNode(check->ccbin);
+
+			if (max_parallel_hazard_walker((Node *) checkExpr, context))
+			{
+				table_close(rel, lockmode);
+				return context->max_hazard;
+			}
+		}
+	}
+
+	table_close(rel, lockmode);
+	return context->max_hazard;
+}
+
+/*
+ * MaxParallelHazardForModify
+ *
+ * Determines the worst parallel-mode hazard level for the specified
+ * relation. The search returns the earliest in the following list:
+ * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE
+ */
+char
+MaxParallelHazardForModify(Oid relid)
+{
+	max_parallel_hazard_context context;
+
+	context.max_hazard = PROPARALLEL_SAFE;
+	context.max_interesting = PROPARALLEL_RESTRICTED;
+	context.safe_param_ids = NIL;
+
+	return (MaxRelParallelHazardForModify(relid, &context));
+}
 
 /*****************************************************************************
  *		Check clauses for nonstrict functions
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7320de3..42d4893 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -386,6 +386,7 @@ extern FullTransactionId GetTopFullTransactionId(void);
 extern FullTransactionId GetTopFullTransactionIdIfAny(void);
 extern FullTransactionId GetCurrentFullTransactionId(void);
 extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
+extern void SetCurrentCommandIdUsedForWorker(void);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b9331af..fd3a78e 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,4 +38,6 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern bool IsParallelCopyAllowed(CopyState cstate, Oid relid);
+
 #endif							/* COPY_H */
diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h
index 7ef8cce..f515de1 100644
--- a/src/include/optimizer/clauses.h
+++ b/src/include/optimizer/clauses.h
@@ -55,5 +55,6 @@ extern void CommuteOpExpr(OpExpr *clause);
 
 extern Query *inline_set_returning_function(PlannerInfo *root,
 											RangeTblEntry *rte);
+extern char MaxParallelHazardForModify(Oid relid);
 
 #endif							/* CLAUSES_H */
-- 
1.8.3.1

Reply via email to