On 2017/01/05 3:26, Robert Haas wrote:
> On Tue, Dec 27, 2016 at 8:41 PM, Amit Langote
> <langote_amit...@lab.ntt.co.jp> wrote:
>> On 2016/12/27 19:07, Amit Langote wrote:
>>> Attached should fix that.
>>
>> Here are the last two patches with additional information like other
>> patches.  Forgot to do that yesterday.
> 
> 0001 has the disadvantage that get_partition_for_tuple() acquires a
> side effect.  That seems undesirable.  At the least, it needs to be
> documented in the function's header comment.

That's true.  How about we save away the original ecxt_scantuple at entry
and restore the same just before returning from the function?  That way
there would be no side effect.  0001 implements that.

> It's unclear to me why we need to do 0002.  It doesn't seem like it
> should be necessary, it doesn't seem like a good idea, and the commit
> message you proposed is uninformative.

If a single BulkInsertState object is passed to
heap_insert()/heap_multi_insert() for different heaps corresponding to
different partitions (from one input tuple to next), tuples might end up
going into wrong heaps (like demonstrated in one of the reports [1]).  A
simple solution is to disable bulk-insert in case of partitioned tables.

But my patch (or its motivations) was slightly wrongheaded, wherein I
conflated multi-insert stuff and bulk-insert considerations.  I revised
0002 to not do that.

However if we disable bulk-insert mode, COPY's purported performance
benefit compared with INSERT is naught.  Patch 0003 is a proposal to
implement bulk-insert mode even for partitioned tables.  Basically,
allocate separate BulkInsertState objects for each partition and switch to
the appropriate one just before calling heap_insert()/heap_multi_insert().
 Then to be able to use heap_multi_insert(), we must also manage buffered
tuples separately for each partition.  Although, I didn't modify the limit
on number of buffered tuples and/or size of buffered tuples which controls
when we pause buffering and do heap_multi_insert() on buffered tuples.
Maybe, it should work slightly differently for the partitioned table case,
like for example, increase the overall limit on both the number of tuples
and tuple size in the partitioning case (I observed that increasing it 10x
or 100x helped to some degree).  Thoughts on this?

Thanks,
Amit

[1]
https://www.postgresql.org/message-id/CAFmBtr32FDOqofo8yG-4mjzL1HnYHxXK5S9OGFJ%3D%3DcJpgEW4vA%40mail.gmail.com
>From 332c67c258a0f25f76c29ced23199fe0ee8e153e Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Wed, 28 Dec 2016 10:10:26 +0900
Subject: [PATCH 1/3] Set ecxt_scantuple correctly for tuple-routing

In 2ac3ef7a01df859c62d0a02333b646d65eaec5ff, we changed things so that
it's possible for a different TupleTableSlot to be used for partitioned
tables at successively lower levels.  If we do end up changing the slot
from the original, we must update ecxt_scantuple to point to the new one
for partition key of the tuple to be computed correctly.

Also update the regression tests so that the code manipulating
ecxt_scantuple is covered.

Reported by: Rajkumar Raghuwanshi
Patch by: Amit Langote
Reports: https://www.postgresql.org/message-id/CAKcux6%3Dm1qyqB2k6cjniuMMrYXb75O-MB4qGQMu8zg-iGGLjDw%40mail.gmail.com
---
 src/backend/catalog/partition.c      | 29 ++++++++++++++++++++++-------
 src/backend/executor/execMain.c      |  2 --
 src/test/regress/expected/insert.out |  2 +-
 src/test/regress/sql/insert.sql      |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index f54e1bdf3f..0de1cf245a 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -1643,7 +1643,10 @@ get_partition_for_tuple(PartitionDispatch *pd,
 	bool		isnull[PARTITION_MAX_KEYS];
 	int			cur_offset,
 				cur_index;
-	int			i;
+	int			i,
+				result;
+	ExprContext *ecxt = GetPerTupleExprContext(estate);
+	TupleTableSlot *ecxt_scantuple_old = ecxt->ecxt_scantuple;
 
 	/* start with the root partitioned table */
 	parent = pd[0];
@@ -1672,7 +1675,14 @@ get_partition_for_tuple(PartitionDispatch *pd,
 			slot = myslot;
 		}
 
-		/* Extract partition key from tuple */
+		/*
+		 * Extract partition key from tuple; FormPartitionKeyDatum() expects
+		 * ecxt_scantuple to point to the correct tuple slot (which might be
+		 * different from the slot we received from the caller if the
+		 * partitioned table of the current level has different tuple
+		 * descriptor from its parent).
+		 */
+		ecxt->ecxt_scantuple = slot;
 		FormPartitionKeyDatum(parent, slot, estate, values, isnull);
 
 		if (key->strategy == PARTITION_STRATEGY_RANGE)
@@ -1727,16 +1737,21 @@ get_partition_for_tuple(PartitionDispatch *pd,
 		 */
 		if (cur_index < 0)
 		{
+			result = -1;
 			*failed_at = RelationGetRelid(parent->reldesc);
-			return -1;
+			break;
 		}
-		else if (parent->indexes[cur_index] < 0)
-			parent = pd[-parent->indexes[cur_index]];
-		else
+		else if (parent->indexes[cur_index] >= 0)
+		{
+			result = parent->indexes[cur_index];
 			break;
+		}
+		else
+			parent = pd[-parent->indexes[cur_index]];
 	}
 
-	return parent->indexes[cur_index];
+	ecxt->ecxt_scantuple = ecxt_scantuple_old;
+	return result;
 }
 
 /*
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index ff277d300a..6a9bc8372f 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -3167,9 +3167,7 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
 {
 	int		result;
 	Oid		failed_at;
-	ExprContext *econtext = GetPerTupleExprContext(estate);
 
-	econtext->ecxt_scantuple = slot;
 	result = get_partition_for_tuple(pd, slot, estate, &failed_at);
 	if (result < 0)
 	{
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index ca3134c34c..1c7b8047ee 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -302,7 +302,7 @@ drop cascades to table part_ee_ff1
 drop cascades to table part_ee_ff2
 -- more tests for certain multi-level partitioning scenarios
 create table p (a int, b int) partition by range (a, b);
-create table p1 (b int, a int not null) partition by range (b);
+create table p1 (b int not null, a int not null) partition by range ((b+0));
 create table p11 (like p1);
 alter table p11 drop a;
 alter table p11 add a int;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 09c9879da1..c25dc14575 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -173,7 +173,7 @@ drop table list_parted cascade;
 
 -- more tests for certain multi-level partitioning scenarios
 create table p (a int, b int) partition by range (a, b);
-create table p1 (b int, a int not null) partition by range (b);
+create table p1 (b int not null, a int not null) partition by range ((b+0));
 create table p11 (like p1);
 alter table p11 drop a;
 alter table p11 add a int;
-- 
2.11.0

>From 12b50bbda59be122b411c3847a0a04a03b17d013 Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Wed, 28 Dec 2016 10:28:37 +0900
Subject: [PATCH 2/3] No multi-insert and bulk-insert when COPYing into
 partitioned tables
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

We might consider alleviating this restriction by allocating a
BulkInsertState object and managing tuple-buffering per partition.

Reported by: 高增琦, Venkata B Nagothi
Patch by: Amit Langote (with pointers from 高增琦)
Reports: https://www.postgresql.org/message-id/CAFmBtr32FDOqofo8yG-4mjzL1HnYHxXK5S9OGFJ%3D%3DcJpgEW4vA%40mail.gmail.com
         https://www.postgresql.org/message-id/CAEyp7J9WiX0L3DoiNcRrY-9iyw%3DqP%2Bj%3DDLsAnNFF1xT2J1ggfQ%40mail.gmail.com
---
 src/backend/commands/copy.c | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f56b2ac49b..322f4260fd 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2296,7 +2296,7 @@ CopyFrom(CopyState cstate)
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
-	BulkInsertState bistate;
+	BulkInsertState bistate = NULL;
 	uint64		processed = 0;
 	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
@@ -2455,8 +2455,8 @@ CopyFrom(CopyState cstate)
 	if ((resultRelInfo->ri_TrigDesc != NULL &&
 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		cstate->partition_dispatch_info != NULL ||
-		cstate->volatile_defexprs)
+		cstate->volatile_defexprs ||
+		cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
 		useHeapMultiInsert = false;
 	}
@@ -2480,7 +2480,13 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	bistate = GetBulkInsertState();
+	/*
+	 * FIXME: We don't engage the bulk-insert mode for partitioned tables,
+	 * because the the heap relation is most likely change from one row to
+	 * next due to tuple-routing.
+	 */
+	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2701,7 +2707,8 @@ CopyFrom(CopyState cstate)
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	if (bistate != NULL)
+		FreeBulkInsertState(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-- 
2.11.0

>From 2a5c2bb661a956e118ccf3a3d93c4dc18943c1fd Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Fri, 6 Jan 2017 10:28:04 +0900
Subject: [PATCH 3/3] Support bulk-insert mode for partitioned tables

Currently, the heap layer (hio.c) supports a bulk-insert mode, which
is currently used by certain callers in copy.c, createas.c, etc.
Callers must pass a BulkInsertState object down to heapam routines
like heap_insert() or heap_multi_insert() along with the input row(s)
to engage this mode.

A single BulkInsertState object is good only for a given heap relation.
In case of a partitioned table, successive input rows may be mapped to
different partitions, so different heap relations.  We must use a separate
BulkInsertState object for each partition and switch to the same every
time a given partition is selected.

Also, if we are able to use multi-insert mode in CopyFrom() and hence
will buffer tuples, we must maintain separate buffer spaces and buffered
tuples counts for every partition.  Although, maximum limits on the
number of buffered tuples and buffered tuple size (across partitions)
are still the old compile-time constants, not scaled based on, say,
number of partitions.  It might be possible to raise that limit so that
enough tuples are buffered per partition in the worst case that input
tuples are randomly ordered.
---
 src/backend/commands/copy.c | 167 +++++++++++++++++++++++++++++---------------
 1 file changed, 109 insertions(+), 58 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 322f4260fd..c4397b4f78 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2296,15 +2296,20 @@ CopyFrom(CopyState cstate)
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
-	BulkInsertState bistate = NULL;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
-	int			nBufferedTuples = 0;
 
-#define MAX_BUFFERED_TUPLES 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
-	int			firstBufferedLineNo = 0;
+#define MAX_BUFFERED_TUPLES			1000
+#define MAX_BUFFERED_TUPLES_SIZE	65535
+	int		num_heaps;
+	bool   *useHeapMultiInsert = NULL;
+	BulkInsertState *bistate = NULL;
+	HeapTuple **bufferedTuples = NULL;	/* initialize to silence warning */
+	Size	   *bufferedTuplesSize = NULL;
+	int		   *firstBufferedLineNo = NULL;
+	int		   *nBufferedTuples = NULL;
+	Size		bufferedTuplesSize_total = 0;
+	int			nBufferedTuples_total = 0;
+	int			i;
 
 	Assert(cstate->rel);
 
@@ -2449,21 +2454,44 @@ CopyFrom(CopyState cstate)
 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
 	 * expressions. Such triggers or expressions might query the table we're
 	 * inserting to, and act differently if the tuples that have already been
-	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is partitioned.
+	 * processed and prepared for insertion are not there.
+	 *
+	 * In case of a regular table there is only one heap, whereas in case of
+	 * a partitioned table, there are as many heaps as there are partitions.
+	 * We must manage buffered tuples separately for each heap.
 	 */
-	if ((resultRelInfo->ri_TrigDesc != NULL &&
-		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
-		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		cstate->volatile_defexprs ||
-		cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-	{
-		useHeapMultiInsert = false;
-	}
-	else
+	num_heaps = cstate->num_partitions > 0 ? cstate->num_partitions : 1;
+
+	bufferedTuples = (HeapTuple **) palloc0(num_heaps * sizeof(HeapTuple *));
+	useHeapMultiInsert = (bool *) palloc(num_heaps * sizeof(bool));
+	nBufferedTuples = (int *) palloc0(num_heaps * sizeof(int));
+	bufferedTuplesSize = (Size *) palloc0(num_heaps * sizeof(Size));
+	firstBufferedLineNo = (int *) palloc0(num_heaps * sizeof(int));
+
+	/* Also, maintain separate bulk-insert state for every heap */
+	bistate = (BulkInsertState *) palloc(num_heaps * sizeof(BulkInsertState));
+
+	for (i = 0; i < num_heaps; i++)
 	{
-		useHeapMultiInsert = true;
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		/*
+		 * In case of a partitioned table, we check the individual partition's
+		 * TriggerDesc and not the root parent table's.
+		 */
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
+		if ((cur_rel->ri_TrigDesc != NULL &&
+			(cur_rel->ri_TrigDesc->trig_insert_before_row ||
+			 cur_rel->ri_TrigDesc->trig_insert_instead_row)) ||
+			cstate->volatile_defexprs)
+			useHeapMultiInsert[i] = false;
+		else
+			useHeapMultiInsert[i] = true;
+
+		if (useHeapMultiInsert[i])
+			bufferedTuples[i] = palloc(MAX_BUFFERED_TUPLES *
+									   sizeof(HeapTuple));
+		bistate[i] = GetBulkInsertState();
 	}
 
 	/* Prepare to catch AFTER triggers. */
@@ -2480,13 +2508,6 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	/*
-	 * FIXME: We don't engage the bulk-insert mode for partitioned tables,
-	 * because the the heap relation is most likely change from one row to
-	 * next due to tuple-routing.
-	 */
-	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-		bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2501,15 +2522,16 @@ CopyFrom(CopyState cstate)
 					   *oldslot;
 		bool		skip_tuple;
 		Oid			loaded_oid = InvalidOid;
+		int			cur_heap = 0;
 
 		CHECK_FOR_INTERRUPTS();
 
-		if (nBufferedTuples == 0)
+		if (nBufferedTuples_total == 0)
 		{
 			/*
 			 * Reset the per-tuple exprcontext. We can only do this if the
-			 * tuple buffer is empty. (Calling the context the per-tuple
-			 * memory context is a bit of a misnomer now.)
+			 * there are no buffered tuples. (Calling the context the
+			 * per-tuple memory context is a bit of a misnomer now.)
 			 */
 			ResetPerTupleExprContext(estate);
 		}
@@ -2560,6 +2582,7 @@ CopyFrom(CopyState cstate)
 												estate);
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < cstate->num_partitions);
+			cur_heap = leaf_part_index;
 
 			/*
 			 * Save the old ResultRelInfo and switch to the one corresponding
@@ -2588,7 +2611,13 @@ CopyFrom(CopyState cstate)
 			{
 				Relation	partrel = resultRelInfo->ri_RelationDesc;
 
+				/*
+				 * Allocate memory for the converted tuple in the per-tuple
+				 * context just like the original tuple.
+				 */
+				MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 				tuple = do_convert_tuple(tuple, map);
+				MemoryContextSwitchTo(oldcontext);
 
 				/*
 				 * We must use the partition's tuple descriptor from this
@@ -2598,7 +2627,7 @@ CopyFrom(CopyState cstate)
 				slot = cstate->partition_tuple_slot;
 				Assert(slot != NULL);
 				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+				ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 			}
 
 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@@ -2633,29 +2662,47 @@ CopyFrom(CopyState cstate)
 					resultRelInfo->ri_PartitionCheck)
 					ExecConstraints(resultRelInfo, slot, oldslot, estate);
 
-				if (useHeapMultiInsert)
+				if (useHeapMultiInsert[cur_heap])
 				{
-					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
-						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+					/* Add this tuple to the corresponding tuple buffer */
+					if (nBufferedTuples[cur_heap] == 0)
+						firstBufferedLineNo[cur_heap] = cstate->cur_lineno;
+					bufferedTuples[cur_heap][nBufferedTuples[cur_heap]++] =
+																	tuple;
+					bufferedTuplesSize[cur_heap] += tuple->t_len;
+
+					/* Count the current tuple toward the totals */
+					nBufferedTuples_total += nBufferedTuples[cur_heap];
+					bufferedTuplesSize_total += bufferedTuplesSize[cur_heap];
 
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * If enough tuples are buffered, flush them from the
+					 * individual buffers. Also flush if the total size of
+					 * all the buffered tuples becomes large, to avoid using
+					 * large amounts of buffer memory when the tuples are
+					 * exceptionally wide.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+					if (nBufferedTuples_total == MAX_BUFFERED_TUPLES ||
+						bufferedTuplesSize_total > MAX_BUFFERED_TUPLES_SIZE)
 					{
-						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
+						for (i = 0; i < num_heaps; i++)
+						{
+							ResultRelInfo *cur_rel = cstate->partitions
+												   ? cstate->partitions + i
+												   : resultRelInfo;
+
+							CopyFromInsertBatch(cstate, estate, mycid,
+												hi_options, cur_rel,
+												myslot, bistate[i],
+												nBufferedTuples[i],
+												bufferedTuples[i],
+												firstBufferedLineNo[i]);
+							nBufferedTuples[i] = 0;
+							bufferedTuplesSize[i] = 0;
+						}
+
+						nBufferedTuples_total = 0;
+						bufferedTuplesSize_total = 0;
 					}
 				}
 				else
@@ -2664,7 +2711,7 @@ CopyFrom(CopyState cstate)
 
 					/* OK, store the tuple and create index entries for it */
 					heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
-								hi_options, bistate);
+								hi_options, bistate[cur_heap]);
 
 					if (resultRelInfo->ri_NumIndices > 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
@@ -2698,18 +2745,22 @@ CopyFrom(CopyState cstate)
 	}
 
 	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	for (i = 0; i < num_heaps; i++)
+	{
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+							cur_rel, myslot, bistate[i],
+							nBufferedTuples[i], bufferedTuples[i],
+							firstBufferedLineNo[i]);
+
+		FreeBulkInsertState(bistate[i]);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
-
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -2802,7 +2853,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to