On Sat, Mar 2, 2024 at 12:02 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> On Mon, Jan 29, 2024 at 5:16 PM Bharath Rupireddy
> <bharath.rupireddyforpostg...@gmail.com> wrote:
> >
> > > Please find the attached v9 patch set.
>
> I've had to rebase the patches due to commit 874d817, please find the
> attached v11 patch set.

Rebase needed. Please see the v12 patch set.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 8a3552e65e62afc40db99fbd7bf4f98990d45390 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 8 Mar 2024 10:11:17 +0000
Subject: [PATCH v12 1/4] New TAMs for inserts

---
 src/backend/access/heap/heapam.c         | 224 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   9 +
 src/include/access/heapam.h              |  49 +++++
 src/include/access/tableam.h             | 138 ++++++++++++++
 4 files changed, 420 insertions(+)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 34bc60f625..497940d74a 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -64,6 +64,7 @@
 #include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -2442,6 +2443,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize state required for an insert a single tuple or multiple tuples
+ * into a heap.
+ */
+TableInsertState *
+heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags)
+{
+	TableInsertState *tistate;
+
+	tistate = palloc0(sizeof(TableInsertState));
+	tistate->rel = rel;
+	tistate->cid = cid;
+	tistate->am_flags = am_flags;
+	tistate->insert_flags = insert_flags;
+
+	if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 ||
+		(am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY))
+		tistate->am_data = palloc0(sizeof(HeapInsertState));
+
+	if ((am_flags & TABLEAM_MULTI_INSERTS) != 0)
+	{
+		HeapMultiInsertState *mistate;
+
+		mistate = palloc0(sizeof(HeapMultiInsertState));
+		mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+
+		mistate->context = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert_v2 memory context",
+												 ALLOCSET_DEFAULT_SIZES);
+
+		((HeapInsertState *) tistate->am_data)->mistate = mistate;
+	}
+
+	if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0)
+		((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState();
+
+	return tistate;
+}
+
+/*
+ * Insert a single tuple into a heap.
+ */
+void
+heap_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	BulkInsertState bistate = NULL;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate == NULL);
+
+	/* Update tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		bistate = ((HeapInsertState *) state->am_data)->bistate;
+
+	/* Perform insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->insert_flags,
+				bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * Create/return next free slot from multi-insert buffered slots array.
+ */
+TupleTableSlot *
+heap_multi_insert_next_free_slot(TableInsertState * state)
+{
+	TupleTableSlot *slot;
+	HeapMultiInsertState *mistate;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate != NULL);
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	slot = mistate->slots[mistate->cur_slots];
+
+	if (slot == NULL)
+	{
+		slot = table_slot_create(state->rel, NULL);
+		mistate->slots[mistate->cur_slots] = slot;
+	}
+	else
+		ExecClearTuple(slot);
+
+	return slot;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapMultiInsertState *mistate;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate != NULL);
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	dstslot = mistate->slots[mistate->cur_slots];
+
+	if (dstslot == NULL)
+	{
+		dstslot = table_slot_create(state->rel, NULL);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	/*
+	 * Caller may have got the slot using heap_multi_insert_next_free_slot,
+	 * filled it and passed. So, skip copying in such a case.
+	 */
+	if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0)
+	{
+		ExecClearTuple(dstslot);
+		ExecCopySlot(dstslot, slot);
+	}
+	else
+		Assert(dstslot == slot);
+
+	mistate->cur_slots++;
+
+	/*
+	 * When passed-in slot is already materialized, memory allocated in slot's
+	 * memory context is a close approximation for us to track the required
+	 * space for the tuple in slot.
+	 *
+	 * For non-materialized slots, the flushing decision happens solely on the
+	 * number of tuples stored in the buffer.
+	 */
+	if (TTS_SHOULDFREE(slot))
+		mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false);
+
+	if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 &&
+		(mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		 mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES))
+		heap_multi_insert_flush(state);
+}
+
+/*
+ * Return pointer to multi-insert buffered slots array and number of currently
+ * occupied slots.
+ */
+TupleTableSlot **
+heap_multi_insert_slots(TableInsertState * state, int *num_slots)
+{
+	HeapMultiInsertState *mistate;
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	*num_slots = mistate->cur_slots;
+
+	return mistate->slots;
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_multi_insert_flush(TableInsertState * state)
+{
+	HeapMultiInsertState *mistate;
+	BulkInsertState bistate = NULL;
+	MemoryContext oldcontext;
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		bistate = ((HeapInsertState *) state->am_data)->bistate;
+
+	oldcontext = MemoryContextSwitchTo(mistate->context);
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->insert_flags, bistate);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->context);
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
+/*
+ * Clean up state used to insert a single or multiple tuples into a heap.
+ */
+void
+heap_insert_end(TableInsertState * state)
+{
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate =
+			((HeapInsertState *) state->am_data)->mistate;
+
+		/* Insert remaining tuples from multi-insert buffers */
+		if (mistate->cur_slots > 0 || mistate->cur_size > 0)
+			heap_multi_insert_flush(state);
+
+		MemoryContextDelete(mistate->context);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		pfree(mistate);
+		((HeapInsertState *) state->am_data)->mistate = NULL;
+	}
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate);
+
+	pfree(state->am_data);
+	state->am_data = NULL;
+	pfree(state);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 680a50bf8b..84793f324e 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2562,6 +2562,15 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot,
+	.tuple_multi_insert_v2 = heap_multi_insert_v2,
+	.tuple_multi_insert_slots = heap_multi_insert_slots,
+	.tuple_multi_insert_flush = heap_multi_insert_flush,
+	.tuple_insert_end = heap_insert_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4b133f6859..053be18110 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -225,6 +225,40 @@ htsv_get_valid_status(int status)
 	return (HTSV_Result) status;
 }
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer. For instance, increasing this can cause
+ * quadratic growth in memory requirements during copies into partitioned
+ * tables with a large number of partitions.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Memory context to use for flushing multi-insert buffers */
+	MemoryContext context;
+
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of slots that multi-insert buffers currently hold */
+	int			cur_slots;
+
+	/* Size of all tuples that multi-insert buffers currently hold */
+	Size		cur_size;
+}			HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+}			HeapInsertState;
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState * heap_insert_begin(Relation rel,
+											CommandId cid,
+											int am_flags,
+											int insert_flags);
+extern void heap_insert_v2(TableInsertState * state,
+						   TupleTableSlot *slot);
+extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state);
+extern void heap_multi_insert_v2(TableInsertState * state,
+								 TupleTableSlot *slot);
+extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state,
+												int *num_slots);
+extern void heap_multi_insert_flush(TableInsertState * state);
+extern void heap_insert_end(TableInsertState * state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 5f8474871d..834de15b9b 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TABLEAM_MULTI_INSERTS 0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002
+
+/*
+ * Skip flushing buffered tuples automatically. Responsibility lies with the
+ * caller to flush the buffered tuples.
+ */
+#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004
+
+
+/* Holds table insert state. */
+typedef struct TableInsertState
+{
+	/* Table AM-agnostic data starts here */
+
+	Relation	rel;			/* Target relation */
+
+	/*
+	 * Command ID for this insertion. If required, change this for each pass
+	 * of insert functions.
+	 */
+	CommandId	cid;
+
+	/* Table AM options (TABLEAM_XXX macros) */
+	int			am_flags;
+
+	/* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */
+	int			insert_flags;
+
+	/* Table AM specific data starts here */
+
+	void	   *am_data;
+}			TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -522,6 +559,20 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState *(*tuple_insert_begin) (Relation rel,
+											 CommandId cid,
+											 int am_flags,
+											 int insert_flags);
+	void		(*tuple_insert_v2) (TableInsertState * state,
+									TupleTableSlot *slot);
+	void		(*tuple_multi_insert_v2) (TableInsertState * state,
+										  TupleTableSlot *slot);
+	TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state);
+	TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state,
+												  int *num_slots);
+	void		(*tuple_multi_insert_flush) (TableInsertState * state);
+	void		(*tuple_insert_end) (TableInsertState * state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -1456,6 +1507,93 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState *
+table_insert_begin(Relation rel, CommandId cid, int am_flags,
+				   int insert_flags)
+{
+	if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin)
+		return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags,
+												   insert_flags);
+	else
+	{
+		elog(ERROR, "table_insert_begin access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(rel));
+		return NULL;			/* keep compiler quiet */
+	}
+}
+
+static inline void
+table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_v2)
+		state->rel->rd_tableam->tuple_insert_v2(state, slot);
+	else
+		elog(ERROR, "table_tuple_insert_v2 access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_v2)
+		state->rel->rd_tableam->tuple_multi_insert_v2(state, slot);
+	else
+		elog(ERROR, "table_multi_insert_v2 access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+}
+
+static inline TupleTableSlot *
+table_multi_insert_next_free_slot(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_next_free_slot)
+		return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state);
+	else
+	{
+		elog(ERROR, "table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+		return NULL;			/* keep compiler quiet */
+	}
+}
+
+static inline TupleTableSlot **
+table_multi_insert_slots(TableInsertState * state, int *num_slots)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_slots)
+		return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots);
+	else
+	{
+		elog(ERROR, "table_multi_insert_slots access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+		return NULL;			/* keep compiler quiet */
+	}
+}
+
+static inline void
+table_multi_insert_flush(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_flush)
+		state->rel->rd_tableam->tuple_multi_insert_flush(state);
+	else
+		elog(ERROR, "table_multi_insert_flush access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+}
+
+static inline void
+table_insert_end(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_end)
+		state->rel->rd_tableam->tuple_insert_end(state);
+	else
+		elog(ERROR, "table_insert_end access method is not implemented for relation \"%s\"",
+			 RelationGetRelationName(state->rel));
+}
+
 /*
  * Delete a tuple.
  *
-- 
2.34.1

From fd891115178bc33df87844417e35a724b359af96 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 8 Mar 2024 10:11:41 +0000
Subject: [PATCH v12 2/4] Optimize CTAS with multi inserts

---
 src/backend/commands/createas.c | 25 +++++++++----------------
 1 file changed, 9 insertions(+), 16 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 62050f4dc5..7a4415c62f 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *ti_state; /* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->ti_state = table_insert_begin(intoRelationDesc,
+											   GetCurrentCommandId(true),
+											   TABLEAM_MULTI_INSERTS |
+											   TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY,
+											   TABLE_INSERT_SKIP_FSM);
 	else
-		myState->bistate = NULL;
+		myState->ti_state = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -590,11 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_multi_insert_v2(myState->ti_state, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -612,10 +608,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_insert_end(myState->ti_state);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
-- 
2.34.1

From 44caa58dc21e8e4634d214c074a88986b2311b41 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 8 Mar 2024 10:12:02 +0000
Subject: [PATCH v12 3/4] Optimize RMV with multi inserts

---
 src/backend/commands/matview.c | 34 ++++++++++++----------------------
 1 file changed, 12 insertions(+), 22 deletions(-)

diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 03373462f0..889a9a21f8 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -47,10 +47,7 @@ typedef struct
 	DestReceiver pub;			/* publicly-known function pointers */
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
-	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *ti_state; /* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -453,13 +450,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 
 	transientrel = table_open(myState->transientoid, NoLock);
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	/* Fill private fields of myState for use by later routines */
+	myState->ti_state = table_insert_begin(transientrel,
+										   GetCurrentCommandId(true),
+										   TABLEAM_MULTI_INSERTS |
+										   TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY,
+										   TABLE_INSERT_SKIP_FSM |
+										   TABLE_INSERT_FROZEN);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -484,12 +481,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_multi_insert_v2(myState->ti_state, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -503,14 +495,12 @@ static void
 transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	Relation	transientrel = myState->ti_state->rel;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_insert_end(myState->ti_state);
 
 	/* close transientrel, but keep lock until commit */
-	table_close(myState->transientrel, NoLock);
-	myState->transientrel = NULL;
+	table_close(transientrel, NoLock);
 }
 
 /*
-- 
2.34.1

From d53ee9b1b31b0e68858e673a618905d7bfdcf4de Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Fri, 8 Mar 2024 10:12:32 +0000
Subject: [PATCH v12 4/4] Use new multi insert TAM for COPY FROM

---
 src/backend/commands/copyfrom.c | 92 ++++++++++++++++++---------------
 1 file changed, 50 insertions(+), 42 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8908a440e1..c2a81d4df1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,10 +74,9 @@
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableInsertState *ti_state; /* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
 	int			nused;			/* number of 'slots' containing tuples */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
@@ -220,14 +219,31 @@ limit_printout_length(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		int			num_slots;
+
+		buffer->ti_state = table_insert_begin(rri->ri_RelationDesc,
+											  miinfo->mycid,
+											  TABLEAM_MULTI_INSERTS |
+											  TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY |
+											  TABLEAM_SKIP_MULTI_INSERTS_FLUSH,
+											  miinfo->ti_options);
+		buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots);
+	}
+	else
+	{
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->ti_state = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -242,7 +258,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -319,8 +335,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -392,13 +406,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -406,18 +415,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		table_multi_insert_flush(buffer->ti_state);
 
 		for (i = 0; i < nused; i++)
 		{
@@ -432,7 +430,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 				cstate->cur_lineno = buffer->linenos[i];
 				recheckIndexes =
 					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
+										  slots[i], estate, false,
 										  false, NULL, NIL, false);
 				ExecARInsertTriggers(estate, resultRelInfo,
 									 slots[i], recheckIndexes,
@@ -490,20 +488,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
-	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
-	}
+		table_insert_end(buffer->ti_state);
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -590,13 +583,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused = buffer->nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(nused < MAX_BUFFERED_TUPLES);
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+		slot = table_multi_insert_next_free_slot(buffer->ti_state);
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -612,6 +617,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	Assert(buffer != NULL);
 	Assert(slot == buffer->slots[buffer->nused]);
 
+	if (rri->ri_FdwRoutine == NULL)
+		table_multi_insert_v2(buffer->ti_state, slot);
+
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
 
-- 
2.34.1

Reply via email to