On Tue, Aug 1, 2023 at 10:32 PM Jacob Champion <jchamp...@timescale.com> wrote:
>
> On Tue, Aug 1, 2023 at 9:31 AM Bharath Rupireddy
> <bharath.rupireddyforpostg...@gmail.com> wrote:
> > Thanks. Finally, I  started to spend time on this. Just curious - may
> > I know the discussion in/for which this patch is referenced? What was
> > the motive? Is it captured somewhere?
>
> It may not have been the only place, but we at least touched on it
> during the unconference:
>
>     
> https://wiki.postgresql.org/wiki/PgCon_2023_Developer_Unconference#Table_AMs
>
> We discussed two related-but-separate ideas:
> 1) bulk/batch operations and
> 2) maintenance of TAM state across multiple related operations.

Thank you. I'm attaching v8 patch-set here which includes use of new
insert TAMs for COPY FROM. With this, postgres not only will have the
new TAM for inserts, but they also can make the following commands
faster - CREATE TABLE AS, SELECT INTO, CREATE MATERIALIZED VIEW,
REFRESH MATERIALIZED VIEW and COPY FROM. I'll perform some testing in
the coming days and post the results here, until then I appreciate any
feedback on the patches.

I've also added this proposal to CF -
https://commitfest.postgresql.org/47/4777/.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From cbdf2935be360017c0d62479e879630d4fec8766 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 17 Jan 2024 16:44:19 +0000
Subject: [PATCH v8] New TAMs for inserts

---
 src/backend/access/heap/heapam.c         | 224 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   9 +
 src/include/access/heapam.h              |  49 +++++
 src/include/access/tableam.h             | 143 +++++++++++++++
 4 files changed, 425 insertions(+)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 707460a536..7df305380e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -68,6 +68,7 @@
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize state required for an insert a single tuple or multiple tuples
+ * into a heap.
+ */
+TableInsertState *
+heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags)
+{
+	TableInsertState *tistate;
+
+	tistate = palloc0(sizeof(TableInsertState));
+	tistate->rel = rel;
+	tistate->cid = cid;
+	tistate->am_flags = am_flags;
+	tistate->insert_flags = insert_flags;
+
+	if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 ||
+		(am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY))
+		tistate->am_data = palloc0(sizeof(HeapInsertState));
+
+	if ((am_flags & TABLEAM_MULTI_INSERTS) != 0)
+	{
+		HeapMultiInsertState *mistate;
+
+		mistate = palloc0(sizeof(HeapMultiInsertState));
+		mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+
+		mistate->context = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert_v2 memory context",
+												 ALLOCSET_DEFAULT_SIZES);
+
+		((HeapInsertState *) tistate->am_data)->mistate = mistate;
+	}
+
+	if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0)
+		((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState();
+
+	return tistate;
+}
+
+/*
+ * Insert a single tuple into a heap.
+ */
+void
+heap_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	BulkInsertState bistate = NULL;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate == NULL);
+
+	/* Update tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		bistate = ((HeapInsertState *) state->am_data)->bistate;
+
+	/* Perform insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->insert_flags,
+				bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * Create/return next free slot from multi-insert buffered slots array.
+ */
+TupleTableSlot *
+heap_multi_insert_next_free_slot(TableInsertState * state)
+{
+	TupleTableSlot *slot;
+	HeapMultiInsertState *mistate;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate != NULL);
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	slot = mistate->slots[mistate->cur_slots];
+
+	if (slot == NULL)
+	{
+		slot = table_slot_create(state->rel, NULL);
+		mistate->slots[mistate->cur_slots] = slot;
+	}
+	else
+		ExecClearTuple(slot);
+
+	return slot;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapMultiInsertState *mistate;
+
+	Assert(state->am_data != NULL &&
+		   ((HeapInsertState *) state->am_data)->mistate != NULL);
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	dstslot = mistate->slots[mistate->cur_slots];
+
+	if (dstslot == NULL)
+	{
+		dstslot = table_slot_create(state->rel, NULL);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	/*
+	 * Caller may have got the slot using heap_multi_insert_next_free_slot,
+	 * filled it and passed. So, skip copying in such a case.
+	 */
+	if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0)
+	{
+		ExecClearTuple(dstslot);
+		ExecCopySlot(dstslot, slot);
+	}
+	else
+		Assert(dstslot == slot);
+
+	mistate->cur_slots++;
+
+	/*
+	 * When passed-in slot is already materialized, memory allocated in slot's
+	 * memory context is a close approximation for us to track the required
+	 * space for the tuple in slot.
+	 *
+	 * For non-materialized slots, the flushing decision happens solely on the
+	 * number of tuples stored in the buffer.
+	 */
+	if (TTS_SHOULDFREE(slot))
+		mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false);
+
+	if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 &&
+		(mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS ||
+		 mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES))
+		heap_multi_insert_flush(state);
+}
+
+/*
+ * Return pointer to multi-insert buffered slots array and number of currently
+ * occupied slots.
+ */
+TupleTableSlot **
+heap_multi_insert_slots(TableInsertState * state, int *num_slots)
+{
+	HeapMultiInsertState *mistate;
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+	*num_slots = mistate->cur_slots;
+
+	return mistate->slots;
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_multi_insert_flush(TableInsertState * state)
+{
+	HeapMultiInsertState *mistate;
+	BulkInsertState bistate = NULL;
+	MemoryContext oldcontext;
+
+	mistate = ((HeapInsertState *) state->am_data)->mistate;
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		bistate = ((HeapInsertState *) state->am_data)->bistate;
+
+	oldcontext = MemoryContextSwitchTo(mistate->context);
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->insert_flags, bistate);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->context);
+
+	mistate->cur_slots = 0;
+	mistate->cur_size = 0;
+}
+
+/*
+ * Clean up state used to insert a single or multiple tuples into a heap.
+ */
+void
+heap_insert_end(TableInsertState * state)
+{
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate =
+			((HeapInsertState *) state->am_data)->mistate;
+
+		/* Insert remaining tuples from multi-insert buffers */
+		if (mistate->cur_slots > 0 || mistate->cur_size > 0)
+			heap_multi_insert_flush(state);
+
+		MemoryContextDelete(mistate->context);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		pfree(mistate);
+		((HeapInsertState *) state->am_data)->mistate = NULL;
+	}
+
+	if (state->am_data != NULL &&
+		((HeapInsertState *) state->am_data)->bistate != NULL)
+		FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate);
+
+	pfree(state->am_data);
+	state->am_data = NULL;
+	pfree(state);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d15a02b2be..795177812d 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2564,6 +2564,15 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot,
+	.tuple_multi_insert_v2 = heap_multi_insert_v2,
+	.tuple_multi_insert_slots = heap_multi_insert_slots,
+	.tuple_multi_insert_flush = heap_multi_insert_flush,
+	.tuple_insert_end = heap_insert_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 932ec0d6f2..46dba5245c 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -225,6 +225,40 @@ htsv_get_valid_status(int status)
 	return (HTSV_Result) status;
 }
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer. For instance, increasing this can cause
+ * quadratic growth in memory requirements during copies into partitioned
+ * tables with a large number of partitions.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+/* Maximum size of all tuples that multi-insert buffers can hold */
+#define HEAP_MAX_BUFFERED_BYTES		65535
+
+typedef struct HeapMultiInsertState
+{
+	/* Memory context to use for flushing multi-insert buffers */
+	MemoryContext context;
+
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of slots that multi-insert buffers currently hold */
+	int			cur_slots;
+
+	/* Size of all tuples that multi-insert buffers currently hold */
+	Size		cur_size;
+}			HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+}			HeapInsertState;
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState * heap_insert_begin(Relation rel,
+											CommandId cid,
+											int am_flags,
+											int insert_flags);
+extern void heap_insert_v2(TableInsertState * state,
+						   TupleTableSlot *slot);
+extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state);
+extern void heap_multi_insert_v2(TableInsertState * state,
+								 TupleTableSlot *slot);
+extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state,
+												int *num_slots);
+extern void heap_multi_insert_flush(TableInsertState * state);
+extern void heap_insert_end(TableInsertState * state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 5f8474871d..8fcaf6fe5a 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */
+#define TABLEAM_MULTI_INSERTS 0x000001
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002
+
+/*
+ * Skip flushing buffered tuples automatically. Responsibility lies with the
+ * caller to flush the buffered tuples.
+ */
+#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004
+
+
+/* Holds table insert state. */
+typedef struct TableInsertState
+{
+	/* Table AM-agnostic data starts here */
+
+	Relation	rel;			/* Target relation */
+
+	/*
+	 * Command ID for this insertion. If required, change this for each pass
+	 * of insert functions.
+	 */
+	CommandId	cid;
+
+	/* Table AM options (TABLEAM_XXX macros) */
+	int			am_flags;
+
+	/* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */
+	int			insert_flags;
+
+	/* Table AM specific data starts here */
+
+	void	   *am_data;
+}			TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -522,6 +559,20 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState *(*tuple_insert_begin) (Relation rel,
+											 CommandId cid,
+											 int am_flags,
+											 int insert_flags);
+	void		(*tuple_insert_v2) (TableInsertState * state,
+									TupleTableSlot *slot);
+	void		(*tuple_multi_insert_v2) (TableInsertState * state,
+										  TupleTableSlot *slot);
+	TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state);
+	TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state,
+												  int *num_slots);
+	void		(*tuple_multi_insert_flush) (TableInsertState * state);
+	void		(*tuple_insert_end) (TableInsertState * state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -1456,6 +1507,98 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState *
+table_insert_begin(Relation rel, CommandId cid, int am_flags,
+				   int insert_flags)
+{
+	if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin)
+		return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags,
+												   insert_flags);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_insert_begin access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(rel)));
+}
+
+static inline void
+table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_v2)
+		state->rel->rd_tableam->tuple_insert_v2(state, slot);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_v2)
+		state->rel->rd_tableam->tuple_multi_insert_v2(state, slot);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
+static inline TupleTableSlot *
+table_multi_insert_next_free_slot(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_next_free_slot)
+		return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
+static inline TupleTableSlot **
+table_multi_insert_slots(TableInsertState * state, int *num_slots)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_slots)
+		return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_multi_insert_slots access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
+static inline void
+table_multi_insert_flush(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_multi_insert_flush)
+		state->rel->rd_tableam->tuple_multi_insert_flush(state);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_multi_insert_flush access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
+static inline void
+table_insert_end(TableInsertState * state)
+{
+	if (state->rel->rd_tableam &&
+		state->rel->rd_tableam->tuple_insert_end)
+		state->rel->rd_tableam->tuple_insert_end(state);
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("table_insert_end access method is not implemented for relation \"%s\"",
+					   RelationGetRelationName(state->rel)));
+}
+
 /*
  * Delete a tuple.
  *
-- 
2.34.1

From 4835495e675bb178ecb67d84e6b00de15751ce8b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 17 Jan 2024 15:23:38 +0000
Subject: [PATCH v8] Optimize CTAS with multi inserts

---
 src/backend/commands/createas.c | 25 +++++++++----------------
 1 file changed, 9 insertions(+), 16 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 16a2fe65e6..3a02ea9578 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -58,9 +58,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *ti_state; /* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->ti_state = table_insert_begin(intoRelationDesc,
+											   GetCurrentCommandId(true),
+											   TABLEAM_MULTI_INSERTS |
+											   TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY,
+											   TABLE_INSERT_SKIP_FSM);
 	else
-		myState->bistate = NULL;
+		myState->ti_state = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_multi_insert_v2(myState->ti_state, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_insert_end(myState->ti_state);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
-- 
2.34.1

From d5fd779aa51c624662eefee8349f2d3f6517c3c5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 17 Jan 2024 15:27:37 +0000
Subject: [PATCH v8] Optimize RMV with multi inserts

---
 src/backend/commands/matview.c | 34 ++++++++++++----------------------
 1 file changed, 12 insertions(+), 22 deletions(-)

diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 1dcfbe879b..f84c79f5f0 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -52,10 +52,7 @@ typedef struct
 	DestReceiver pub;			/* publicly-known function pointers */
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
-	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *ti_state; /* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 
 	transientrel = table_open(myState->transientoid, NoLock);
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	/* Fill private fields of myState for use by later routines */
+	myState->ti_state = table_insert_begin(transientrel,
+										   GetCurrentCommandId(true),
+										   TABLEAM_MULTI_INSERTS |
+										   TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY,
+										   TABLE_INSERT_SKIP_FSM |
+										   TABLE_INSERT_FROZEN);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_multi_insert_v2(myState->ti_state, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -507,14 +499,12 @@ static void
 transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	Relation	transientrel = myState->ti_state->rel;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_insert_end(myState->ti_state);
 
 	/* close transientrel, but keep lock until commit */
-	table_close(myState->transientrel, NoLock);
-	myState->transientrel = NULL;
+	table_close(transientrel, NoLock);
 }
 
 /*
-- 
2.34.1

From 24062422b0f213f188bad844b2191923ff258807 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 17 Jan 2024 16:49:52 +0000
Subject: [PATCH v8] Use new multi insert TAM for COPY FROM

---
 src/backend/commands/copyfrom.c | 92 ++++++++++++++++++---------------
 1 file changed, 50 insertions(+), 42 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 4058b08134..a6c703a99e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -77,10 +77,9 @@
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableInsertState *ti_state; /* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
 	int			nused;			/* number of 'slots' containing tuples */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
@@ -223,14 +222,31 @@ limit_printout_length(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		int			num_slots;
+
+		buffer->ti_state = table_insert_begin(rri->ri_RelationDesc,
+											  miinfo->mycid,
+											  TABLEAM_MULTI_INSERTS |
+											  TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY |
+											  TABLEAM_SKIP_MULTI_INSERTS_FLUSH,
+											  miinfo->ti_options);
+		buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots);
+	}
+	else
+	{
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->ti_state = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		table_multi_insert_flush(buffer->ti_state);
 
 		for (i = 0; i < nused; i++)
 		{
@@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 				cstate->cur_lineno = buffer->linenos[i];
 				recheckIndexes =
 					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
+										  slots[i], estate, false,
 										  false, NULL, NIL, false);
 				ExecARInsertTriggers(estate, resultRelInfo,
 									 slots[i], recheckIndexes,
@@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
-	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
-	}
+		table_insert_end(buffer->ti_state);
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused = buffer->nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(nused < MAX_BUFFERED_TUPLES);
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+		slot = table_multi_insert_next_free_slot(buffer->ti_state);
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	Assert(buffer != NULL);
 	Assert(slot == buffer->slots[buffer->nused]);
 
+	if (rri->ri_FdwRoutine == NULL)
+		table_multi_insert_v2(buffer->ti_state, slot);
+
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
 
-- 
2.34.1

Reply via email to