On Mon, Nov 23, 2020 at 3:26 PM Heikki Linnakangas <hlinn...@iki.fi> wrote:
>
> On 23/11/2020 11:15, Bharath Rupireddy wrote:
> > Attaching v2 patch, rebased on the latest master 17958972.
>
> I just broke this again with commit c532d15ddd to split up copy.c.
> Here's another rebased version.
>

Thanks! I noticed that and am about to post a new patch. Anyways,
thanks for the rebased v3 patch. Attaching here v3 again for
visibility.

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com
From 7795a83a2485440e01dfb432c23c362c37a2a32b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Mon, 23 Nov 2020 15:36:36 +0530
Subject: [PATCH v3] Multi Inserts in Create Table As.

This could improve the performance and also benefits Create
Materialized View as it uses the code of Create Table As.
---
 src/backend/commands/copyfrom.c   |  31 +++------
 src/backend/commands/createas.c   | 100 ++++++++++++++++++++++++++++--
 src/backend/executor/execTuples.c |  66 ++++++++++++++++++++
 src/include/access/tableam.h      |  15 +++++
 src/include/executor/tuptable.h   |   2 +-
 5 files changed, 185 insertions(+), 29 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..a721600ade 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -44,34 +44,17 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
 	BulkInsertState bistate;	/* BulkInsertState for this rel */
 	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+	uint64		linenos[MAX_MULTI_INSERT_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
 
@@ -214,7 +197,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
 	buffer->resultRelInfo = rri;
 	buffer->bistate = GetBulkInsertState();
 	buffer->nused = 0;
@@ -273,8 +256,8 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 static inline bool
 CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
 {
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+	if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES ||
+		miinfo->bufferedBytes >= MAX_MULTI_INSERT_BUFFERED_BYTES)
 		return true;
 	return false;
 }
@@ -392,7 +375,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 	FreeBulkInsertState(buffer->bistate);
 
 	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+	for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++)
 		ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
 	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
@@ -484,7 +467,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 	int			nused = buffer->nused;
 
 	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
+	Assert(nused < MAX_MULTI_INSERT_TUPLES);
 
 	if (buffer->slots[nused] == NULL)
 		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..06d67ba7e4 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -61,6 +61,13 @@ typedef struct
 	CommandId	output_cid;		/* cmin to insert in output tuples */
 	int			ti_options;		/* table_tuple_insert performance options */
 	BulkInsertState bistate;	/* bulk insert state */
+	MemoryContext	mi_context;	/* A temporary memory context for multi insert */
+	/* Buffered slots for a multi insert batch. */
+	TupleTableSlot *mi_slots[MAX_MULTI_INSERT_TUPLES];
+	/* Number of current buffered slots for a multi insert batch. */
+	int				mi_slots_num;
+	/* Total tuple size for a multi insert batch. */
+	int				mi_slots_size;
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -530,15 +537,33 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	myState->reladdr = intoRelationAddr;
 	myState->output_cid = GetCurrentCommandId(true);
 	myState->ti_options = TABLE_INSERT_SKIP_FSM;
+	memset(myState->mi_slots, 0,
+		   sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES);
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
+	 * bulk inserts and multi inserts memory context as there are no tuples to
+	 * insert.
 	 */
 	if (!into->skipData)
+	{
 		myState->bistate = GetBulkInsertState();
+
+		/*
+		 * Create a temporary memory context so that we can reset once per
+		 * multi insert batch.
+		*/
+		myState->mi_context = AllocSetContextCreate(CurrentMemoryContext,
+											"intorel_multi_insert",
+											ALLOCSET_DEFAULT_SIZES);
+	}
 	else
+	{
 		myState->bistate = NULL;
+		myState->mi_context = NULL;
+	}
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -547,6 +572,34 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber);
 }
 
+/*
+ * intorel_flush_multi_insert --- insert multiple tuples
+ */
+static void
+intorel_flush_multi_insert(DR_intorel *myState)
+{
+	MemoryContext oldcontext;
+	int           i;
+
+	oldcontext = MemoryContextSwitchTo(myState->mi_context);
+
+	table_multi_insert(myState->rel,
+					   myState->mi_slots,
+					   myState->mi_slots_num,
+					   myState->output_cid,
+					   myState->ti_options,
+					   myState->bistate);
+
+	MemoryContextReset(myState->mi_context);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < myState->mi_slots_num; i++)
+		ExecClearTuple(myState->mi_slots[i]);
+
+	myState->mi_slots_num = 0;
+	myState->mi_slots_size = 0;
+}
+
 /*
  * intorel_receive --- receive one tuple
  */
@@ -554,9 +607,36 @@ static bool
 intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
+	TupleTableSlot  *batchslot;
+	Size            sz = 0;
 
 	/* Nothing to insert if WITH NO DATA is specified. */
-	if (!myState->into->skipData)
+	if (myState->into->skipData)
+		return true;
+
+	sz = GetTupleSize(slot, MAX_MULTI_INSERT_BUFFERED_BYTES);
+
+	/* In case the computed tuple size is 0, we go for single inserts. */
+	if (sz != 0)
+	{
+		if (myState->mi_slots[myState->mi_slots_num] == NULL)
+		{
+			batchslot = table_slot_create(myState->rel, NULL);
+			myState->mi_slots[myState->mi_slots_num] = batchslot;
+		}
+		else
+			batchslot = myState->mi_slots[myState->mi_slots_num];
+
+		ExecCopySlot(batchslot, slot);
+
+		myState->mi_slots_num++;
+		myState->mi_slots_size += sz;
+
+		if (myState->mi_slots_num >= MAX_MULTI_INSERT_TUPLES ||
+			myState->mi_slots_size >= MAX_MULTI_INSERT_BUFFERED_BYTES)
+			intorel_flush_multi_insert(myState);
+	}
+	else
 	{
 		/*
 		 * Note that the input slot might not be of the type of the target
@@ -585,12 +665,24 @@ static void
 intorel_shutdown(DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
 
-	if (!into->skipData)
+	if (!myState->into->skipData)
 	{
+		int                     i;
+
+		if (myState->mi_slots_num != 0)
+			intorel_flush_multi_insert(myState);
+
+		for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->mi_slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(myState->mi_slots[i]);
+
 		FreeBulkInsertState(myState->bistate);
 		table_finish_bulk_insert(myState->rel, myState->ti_options);
+
+		if (myState->mi_context)
+			MemoryContextDelete(myState->mi_context);
+
+		myState->mi_context = NULL;
 	}
 
 	/* close rel, but keep lock until commit */
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..5ee4135151 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2318,3 +2318,69 @@ end_tup_output(TupOutputState *tstate)
 	ExecDropSingleTupleTableSlot(tstate->slot);
 	pfree(tstate);
 }
+
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+	Size sz = 0;
+	HeapTuple tuple = NULL;
+
+	if (TTS_IS_HEAPTUPLE(slot))
+		tuple = ((HeapTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_BUFFERTUPLE(slot))
+		tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+	else if(TTS_IS_MINIMALTUPLE(slot))
+		tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_VIRTUAL(slot))
+	{
+		/* Size calculation is inspired from tts_virtual_materialize(). */
+		TupleDesc	desc = slot->tts_tupleDescriptor;
+
+		for (int natt = 0; natt < desc->natts; natt++)
+		{
+			Form_pg_attribute att = TupleDescAttr(desc, natt);
+			Datum		val;
+
+			if (att->attbyval)
+				sz += att->attlen;
+
+			if (slot->tts_isnull[natt])
+				continue;
+
+			val = slot->tts_values[natt];
+
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz += EOH_get_flat_size(DatumGetEOHP(val));
+			}
+			else
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz = att_addlength_datum(sz, att->attlen, val);
+			}
+
+			/*
+			 * We are not interested in proceeding further if the computed size
+			 * crosses maxsize limit that we are looking for.
+			 */
+			if (maxsize != 0 && sz >= maxsize)
+				break;
+		}
+	}
+
+	if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+		sz = tuple->t_len;
+
+	return sz;
+}
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..087aabe880 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -140,6 +140,21 @@ typedef struct TM_FailureData
 /* Follow update chain and lock latest version of tuple */
 #define TUPLE_LOCK_FLAG_FIND_LAST_VERSION		(1 << 1)
 
+/*
+ * No more than this many tuples per multi insert buffer
+ *
+ * Caution: Don't make this too big. We could end up with this many multi
+ * insert buffer items stored as a list. Increasing this can cause quadratic
+ * growth in memory requirements during copies into partitioned tables with a
+ * large number of partitions.
+ */
+#define MAX_MULTI_INSERT_TUPLES        1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size of the tuples stored.
+ */
+#define MAX_MULTI_INSERT_BUFFERED_BYTES		65535
 
 /* Typedef for callback function for table_index_build_scan */
 typedef void (*IndexBuildCallback) (Relation index,
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..4336cc8ec8 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -329,7 +329,7 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
 extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
 								 int lastAttNum);
 extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
-
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
 
 #ifndef FRONTEND
 
-- 
2.25.1

Reply via email to