On Wed, Jun 5, 2024 at 12:42 PM Bharath Rupireddy
<bharath.rupireddyforpostg...@gmail.com> wrote:
>
> Please find the v22 patches with the above changes.

Please find the v23 patches after rebasing 0005 and adapting 0004 for
9758174e2e.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 511d0a6aa3851408b88a5d5cccb1a31af26aa089 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Aug 2024 04:50:09 +0000
Subject: [PATCH v23 5/5] Use new multi insert table AM for COPY

This commit uses the new multi insert table AM added by commit
<<CHANGE_ME>> for COPY ... FROM command.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/copyfrom.c          | 234 +++++++++++++++--------
 src/include/commands/copyfrom_internal.h |   4 +-
 src/tools/pgindent/typedefs.list         |   1 +
 3 files changed, 158 insertions(+), 81 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 2d3462913e..29e0e497c1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -74,14 +74,25 @@
  */
 #define MAX_PARTITION_BUFFERS	32
 
+typedef struct CopyModifyBufferFlushContext
+{
+	CopyFromState cstate;
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} CopyModifyBufferFlushContext;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	TableModifyState *mstate;	/* Table insert state; NULL if foreign table */
+	TupleTableSlot **slots;		/* Array to store tuples */
 	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel if plain
-								 * table; NULL if foreign table */
+	TupleTableSlot *multislot;
+	CopyModifyBufferFlushContext *modify_buffer_flush_context;
 	int			nused;			/* number of 'slots' containing tuples */
+	int			currslotno;		/* Current buffered slot number that's being
+								 * flushed; Used to get correct cur_lineno for
+								 * errors while in flush callback. */
 	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
 												 * stream */
 } CopyMultiInsertBuffer;
@@ -102,6 +113,7 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
+static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
 
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
@@ -221,14 +233,37 @@ CopyLimitPrintoutLength(const char *str)
  * ResultRelInfo.
  */
 static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						  CopyFromState cstate, EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
 	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext));
+		buffer->modify_buffer_flush_context->cstate = cstate;
+		buffer->modify_buffer_flush_context->resultRelInfo = rri;
+		buffer->modify_buffer_flush_context->estate = estate;
+
+		buffer->mstate = table_modify_begin(rri->ri_RelationDesc,
+											TM_FLAG_BAS_BULKWRITE,
+											miinfo->mycid,
+											miinfo->ti_options,
+											CopyModifyBufferFlushCallback,
+											buffer->modify_buffer_flush_context);
+		buffer->slots = NULL;
+		buffer->multislot = NULL;
+	}
+	else
+	{
+		buffer->mstate = NULL;
+		buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+		buffer->multislot = NULL;
+	}
+
 	buffer->resultRelInfo = rri;
-	buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
 	buffer->nused = 0;
 
 	return buffer;
@@ -239,11 +274,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri)
  */
 static inline void
 CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
+							   ResultRelInfo *rri, CopyFromState cstate,
+							   EState *estate)
 {
 	CopyMultiInsertBuffer *buffer;
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate);
 
 	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
@@ -276,7 +312,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	 * tuples their way for the first time.
 	 */
 	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+		CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate);
 }
 
 /*
@@ -320,8 +356,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		int			batch_size = resultRelInfo->ri_BatchSize;
 		int			sent = 0;
 
-		Assert(buffer->bistate == NULL);
-
 		/* Ensure that the FDW supports batching and it's enabled */
 		Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
 		Assert(batch_size > 1);
@@ -393,13 +427,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	}
 	else
 	{
-		CommandId	mycid = miinfo->mycid;
-		int			ti_options = miinfo->ti_options;
 		bool		line_buf_valid = cstate->line_buf_valid;
 		uint64		save_cur_lineno = cstate->cur_lineno;
-		MemoryContext oldcontext;
-
-		Assert(buffer->bistate != NULL);
 
 		/*
 		 * Print error context information correctly, if one of the operations
@@ -407,56 +436,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 		 */
 		cstate->line_buf_valid = false;
 
-		/*
-		 * table_multi_insert may leak memory, so switch to short-lived memory
-		 * context before calling it.
-		 */
-		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		table_multi_insert(resultRelInfo->ri_RelationDesc,
-						   slots,
-						   nused,
-						   mycid,
-						   ti_options,
-						   buffer->bistate);
-		MemoryContextSwitchTo(oldcontext);
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-		for (i = 0; i < nused; i++)
-		{
-			/*
-			 * If there are any indexes, update them for all the inserted
-			 * tuples, and run AFTER ROW INSERT triggers.
-			 */
-			if (resultRelInfo->ri_NumIndices > 0)
-			{
-				List	   *recheckIndexes;
-
-				cstate->cur_lineno = buffer->linenos[i];
-				recheckIndexes =
-					ExecInsertIndexTuples(resultRelInfo,
-										  buffer->slots[i], estate, false,
-										  false, NULL, NIL, false);
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], recheckIndexes,
-									 cstate->transition_capture);
-				list_free(recheckIndexes);
-			}
+		table_modify_buffer_flush(buffer->mstate);
 
-			/*
-			 * There's no indexes, but see if we need to run AFTER ROW INSERT
-			 * triggers anyway.
-			 */
-			else if (resultRelInfo->ri_TrigDesc != NULL &&
-					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-			{
-				cstate->cur_lineno = buffer->linenos[i];
-				ExecARInsertTriggers(estate, resultRelInfo,
-									 slots[i], NIL,
-									 cstate->transition_capture);
-			}
+		Assert(buffer->currslotno <= buffer->nused);
+		buffer->currslotno = 0;
 
-			ExecClearTuple(slots[i]);
-		}
+		/*
+		 * Indexes are updated and AFTER ROW INSERT triggers (if any) are run
+		 * in the flush callback CopyModifyBufferFlushCallback.
+		 */
 
 		/* Update the row counter and progress of the COPY command */
 		*processed += nused;
@@ -472,6 +463,60 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	buffer->nused = 0;
 }
 
+static void
+CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context;
+	CopyFromState cstate = ctx->cstate;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 cstate->transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		cstate->cur_lineno = buffer->linenos[buffer->currslotno++];
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 cstate->transition_capture);
+	}
+
+	Assert(buffer->currslotno <= buffer->nused);
+}
+
 /*
  * Drop used slots and free member for this buffer.
  *
@@ -492,19 +537,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
 
 	if (resultRelInfo->ri_FdwRoutine == NULL)
 	{
-		Assert(buffer->bistate != NULL);
-		FreeBulkInsertState(buffer->bistate);
+		table_modify_end(buffer->mstate);
+		ExecDropSingleTupleTableSlot(buffer->multislot);
+		pfree(buffer->modify_buffer_flush_context);
 	}
 	else
-		Assert(buffer->bistate == NULL);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	{
+		/* Since we only create slots on demand, just drop the non-null ones. */
+		for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(buffer->slots[i]);
 
-	if (resultRelInfo->ri_FdwRoutine == NULL)
-		table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
-								 miinfo->ti_options);
+		pfree(buffer->slots);
+	}
 
 	pfree(buffer);
 }
@@ -598,15 +642,34 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
 {
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 	int			nused;
+	TupleTableSlot *slot;
 
 	Assert(buffer != NULL);
 	Assert(buffer->nused < MAX_BUFFERED_TUPLES);
 
 	nused = buffer->nused;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		if (buffer->multislot == NULL)
+			buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc),
+												   &TTSOpsVirtual);
+
+		/* Caller must clear the slot */
+		slot = buffer->multislot;
+	}
+	else
+	{
+		if (buffer->slots[nused] == NULL)
+		{
+			slot = table_slot_create(rri->ri_RelationDesc, NULL);
+			buffer->slots[nused] = slot;
+		}
+		else
+			slot = buffer->slots[nused];
+	}
+
+	return slot;
 }
 
 /*
@@ -620,7 +683,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
 
 	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+
+#ifdef USE_ASSERT_CHECKING
+	if (rri->ri_FdwRoutine != NULL)
+		Assert(slot == buffer->slots[buffer->nused]);
+#endif
 
 	/* Store the line number so we can properly report any errors later */
 	buffer->linenos[buffer->nused] = lineno;
@@ -628,6 +695,14 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	/* Record this slot as being used */
 	buffer->nused++;
 
+	if (rri->ri_FdwRoutine == NULL)
+	{
+		Assert(slot == buffer->multislot);
+		buffer->currslotno = 0;
+
+		table_modify_buffer_insert(buffer->mstate, slot);
+	}
+
 	/* Update how many tuples are stored and their size */
 	miinfo->bufferedTuples++;
 	miinfo->bufferedBytes += tuplen;
@@ -842,7 +917,7 @@ CopyFrom(CopyFromState cstate)
 	/*
 	 * It's generally more efficient to prepare a bunch of tuples for
 	 * insertion, and insert them in one
-	 * table_multi_insert()/ExecForeignBatchInsert() call, than call
+	 * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call
 	 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
 	 * However, there are a number of reasons why we might not be able to do
 	 * this.  These are explained below.
@@ -1092,7 +1167,8 @@ CopyFrom(CopyFromState cstate)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
 						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+													   resultRelInfo, cstate,
+													   estate);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
 						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..14addbc6f6 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -46,9 +46,9 @@ typedef enum EolType
 typedef enum CopyInsertMethod
 {
 	CIM_SINGLE,					/* use table_tuple_insert or ExecForeignInsert */
-	CIM_MULTI,					/* always use table_multi_insert or
+	CIM_MULTI,					/* always use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert */
-	CIM_MULTI_CONDITIONAL,		/* use table_multi_insert or
+	CIM_MULTI_CONDITIONAL,		/* use table_modify_buffer_insert or
 								 * ExecForeignBatchInsert only if valid */
 } CopyInsertMethod;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 07a61d086d..e882d4ab17 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -497,6 +497,7 @@ CopyHeaderChoice
 CopyInsertMethod
 CopyLogVerbosityChoice
 CopyMethod
+CopyModifyBufferFlushContext
 CopyMultiInsertBuffer
 CopyMultiInsertInfo
 CopyOnErrorChoice
-- 
2.43.0

From beb1928bbbfaf6fb1466a58d7fa1deb8412e47af Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Aug 2024 04:46:36 +0000
Subject: [PATCH v23 2/5] Optimize various SQL commands with new multi insert 
 table AM

This commit optimizes the following commands for heap AM using new
multi insert table AM added by commit <<CHANGE_ME>>:
- CREATE TABLE AS
- CREATE MATERIALIZED VIEW
- REFRESH MATERIALIZED VIEW
- ALTER TABLE flavours resulting in table rewrites

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/commands/createas.c  | 26 ++++++++++----------------
 src/backend/commands/matview.c   | 25 ++++++++++---------------
 src/backend/commands/tablecmds.c | 30 ++++++++++--------------------
 3 files changed, 30 insertions(+), 51 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 0b629b1f79..8378597f36 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -53,9 +53,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -547,17 +545,20 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->rel = intoRelationDesc;
 	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
 
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
 	 * bulk inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+		myState->mstate = table_modify_begin(intoRelationDesc,
+											 TM_FLAG_BAS_BULKWRITE,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 NULL,
+											 NULL);
 	else
-		myState->bistate = NULL;
+		myState->mstate = NULL;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -585,11 +586,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_modify_buffer_insert(myState->mstate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -607,10 +604,7 @@ intorel_shutdown(DestReceiver *self)
 	IntoClause *into = myState->into;
 
 	if (!into->skipData)
-	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
-	}
+		table_modify_end(myState->mstate);
 
 	/* close rel, but keep lock until commit */
 	table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 91f0fd6ea3..f036abbeb3 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -48,9 +48,7 @@ typedef struct
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
 	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableModifyState *mstate;	/* table insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -491,9 +489,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * Fill private fields of myState for use by later routines
 	 */
 	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->mstate = table_modify_begin(transientrel,
+										 TM_FLAG_BAS_BULKWRITE,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM |
+										 TABLE_INSERT_FROZEN,
+										 NULL,
+										 NULL);
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -518,12 +520,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_modify_buffer_insert(myState->mstate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -538,9 +535,7 @@ transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
 
-	FreeBulkInsertState(myState->bistate);
-
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_modify_end(myState->mstate);
 
 	/* close transientrel, but keep lock until commit */
 	table_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index dac39df83a..3ca5448a72 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5954,10 +5954,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	int			i;
 	ListCell   *l;
 	EState	   *estate;
-	CommandId	mycid;
-	BulkInsertState bistate;
-	int			ti_options;
 	ExprState  *partqualstate = NULL;
+	TableModifyState *mstate = NULL;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -5976,18 +5974,14 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	 * Prepare a BulkInsertState and options for table_tuple_insert.  The FSM
 	 * is empty, so don't bother using it.
 	 */
-	if (newrel)
-	{
-		mycid = GetCurrentCommandId(true);
-		bistate = GetBulkInsertState();
-		ti_options = TABLE_INSERT_SKIP_FSM;
-	}
-	else
+	if (newrel && mstate == NULL)
 	{
-		/* keep compiler quiet about using these uninitialized */
-		mycid = 0;
-		bistate = NULL;
-		ti_options = 0;
+		mstate = table_modify_begin(newrel,
+									TM_FLAG_BAS_BULKWRITE,
+									GetCurrentCommandId(true),
+									TABLE_INSERT_SKIP_FSM,
+									NULL,
+									NULL);
 	}
 
 	/*
@@ -6285,8 +6279,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 			/* Write the tuple out to the new relation */
 			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
-								   ti_options, bistate);
+				table_modify_buffer_insert(mstate, insertslot);
 
 			ResetExprContext(econtext);
 
@@ -6307,10 +6300,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	table_close(oldrel, NoLock);
 	if (newrel)
 	{
-		FreeBulkInsertState(bistate);
-
-		table_finish_bulk_insert(newrel, ti_options);
-
+		table_modify_end(mstate);
 		table_close(newrel, NoLock);
 	}
 }
-- 
2.43.0

From a1579bcf58f74d1e3121b81c249c029e470297f0 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Aug 2024 04:49:29 +0000
Subject: [PATCH v23 4/5] Optimize Logical Replication Apply with new multi
 insert table AM

This commit optimizes the Logical Replication Apply for heap AM
using new multi insert table AM added by commit <<CHANGE_ME>>.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/executor/execReplication.c   |  41 ++-
 src/backend/replication/logical/proto.c  |  24 ++
 src/backend/replication/logical/worker.c | 375 ++++++++++++++++++++++-
 src/include/executor/executor.h          |   4 +
 src/include/replication/conflict.h       |   6 +
 src/include/replication/logicalproto.h   |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 7 files changed, 440 insertions(+), 14 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 1086cbc962..bc1ba3e5a8 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -531,7 +531,7 @@ retry:
  * Check all the unique indexes in 'recheckIndexes' for conflict with the
  * tuple in 'remoteslot' and report if found.
  */
-static void
+void
 CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
 					   ConflictType type, List *recheckIndexes,
 					   TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
@@ -646,6 +646,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 	}
 }
 
+void
+ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+						ResultRelInfo *resultRelInfo,
+						EState *estate, TupleTableSlot *slot)
+{
+	bool		skip_tuple = false;
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+
+	/* For now we support only tables. */
+	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+	CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+	/* BEFORE ROW INSERT Triggers */
+	if (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+	{
+		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			skip_tuple = true;	/* "do nothing" */
+	}
+
+	if (!skip_tuple)
+	{
+		/* Compute stored generated columns */
+		if (rel->rd_att->constr &&
+			rel->rd_att->constr->has_generated_stored)
+			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+									   CMD_INSERT);
+
+		/* Check the constraints of the tuple */
+		if (rel->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate);
+		if (rel->rd_rel->relispartition)
+			ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+		table_modify_buffer_insert(MultiInsertState, slot);
+	}
+}
+
 /*
  * Find the searchslot tuple and update it with data in the slot,
  * update the indexes, and execute any constraints and per-row triggers.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 980f6e2741..0e7050dba8 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_tuple(out, rel, newslot, binary, columns);
 }
 
+LogicalRepRelId
+logicalrep_read_relid(StringInfo in)
+{
+	LogicalRepRelId relid;
+
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
+
+	return relid;
+}
+
+void
+logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup)
+{
+	char		action;
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	logicalrep_read_tuple(in, newtup);
+}
+
 /*
  * Read INSERT from stream.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 38c2895307..7873152c02 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -148,7 +148,6 @@
 #include <unistd.h>
 
 #include "access/table.h"
-#include "access/tableam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
@@ -414,6 +413,30 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+typedef enum LRMultiInsertReturnStatus
+{
+	LR_MULTI_INSERT_NONE = 0,
+	LR_MULTI_INSERT_REL_SKIPPED,
+	LR_MULTI_INSERT_DISALLOWED,
+	LR_MULTI_INSERT_DONE,
+} LRMultiInsertReturnStatus;
+
+static TableModifyState *MultiInsertState = NULL;
+static LogicalRepRelMapEntry *LastRel = NULL;
+static LogicalRepRelId LastMultiInsertRelId = InvalidOid;
+static ApplyExecutionData *LastEData = NULL;
+static TupleTableSlot *LastRemoteSlot = NULL;
+
+typedef struct LRModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} LRModifyBufferFlushContext;
+
+static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL;
+static void LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
+static void FinishMultiInserts(void);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -1015,6 +1038,8 @@ apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit(s, &commit_data);
 
 	if (commit_data.commit_lsn != remote_final_lsn)
@@ -1041,6 +1066,8 @@ apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
 
+	FinishMultiInserts();
+
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
 		ereport(ERROR,
@@ -1107,6 +1134,8 @@ apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_prepare(s, &prepare_data);
 
 	if (prepare_data.prepare_lsn != remote_final_lsn)
@@ -1179,6 +1208,8 @@ apply_handle_commit_prepared(StringInfo s)
 	LogicalRepCommitPreparedTxnData prepare_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
@@ -1228,6 +1259,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
@@ -1290,6 +1323,8 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1317,6 +1352,8 @@ apply_handle_stream_prepare(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
 								   prepare_data.xid, prepare_data.prepare_lsn);
 
+			FinishMultiInserts();
+
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
 
@@ -1428,6 +1465,8 @@ apply_handle_stream_prepare(StringInfo s)
 static void
 apply_handle_origin(StringInfo s)
 {
+	FinishMultiInserts();
+
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
 	 * remote transaction and before any actual writes.
@@ -1494,6 +1533,8 @@ apply_handle_stream_start(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1649,6 +1690,8 @@ apply_handle_stream_stop(StringInfo s)
 	ParallelApplyWorkerInfo *winfo;
 	TransApplyAction apply_action;
 
+	FinishMultiInserts();
+
 	if (!in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1842,6 +1885,8 @@ apply_handle_stream_abort(StringInfo s)
 	StringInfoData original_msg = *s;
 	bool		toplevel_xact;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2159,6 +2204,8 @@ apply_handle_stream_commit(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2180,6 +2227,8 @@ apply_handle_stream_commit(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
 								   commit_data.commit_lsn);
 
+			FinishMultiInserts();
+
 			apply_handle_commit_internal(&commit_data);
 
 			/* Unlink the files with serialized changes and subxact info. */
@@ -2323,6 +2372,8 @@ apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
@@ -2346,6 +2397,8 @@ apply_handle_type(StringInfo s)
 {
 	LogicalRepTyp typ;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
 		return;
 
@@ -2384,16 +2437,150 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
-/*
- * Handle INSERT message.
- */
+static void
+FinishMultiInserts(void)
+{
+	LogicalRepMsgType saved_command;
+
+	if (MultiInsertState == NULL)
+		return;
+
+	Assert(OidIsValid(LastMultiInsertRelId));
+	Assert(LastEData != NULL);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	ExecDropSingleTupleTableSlot(LastRemoteSlot);
+	LastRemoteSlot = NULL;
+
+	table_modify_end(MultiInsertState);
+	MultiInsertState = NULL;
+	LastMultiInsertRelId = InvalidOid;
+
+	pfree(modify_buffer_flush_context);
+	modify_buffer_flush_context = NULL;
+
+	ExecCloseIndices(LastEData->targetRelInfo);
+
+	finish_edata(LastEData);
+	LastEData = NULL;
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+
+	logicalrep_rel_close(LastRel, NoLock);
+	LastRel = NULL;
+
+	end_replication_step();
+}
 
 static void
-apply_handle_insert(StringInfo s)
+LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	LogicalRepMsgType saved_command;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+		List	   *conflictindexes;
+		bool		conflict = false;
+
+		conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  conflictindexes ? true : false,
+								  &conflict,
+								  conflictindexes, false);
+
+		/*
+		 * Checks the conflict indexes to fetch the conflicting local tuple
+		 * and reports the conflict. We perform this check here, instead of
+		 * performing an additional index scan before the actual insertion and
+		 * reporting the conflict if any conflicting tuples are found. This is
+		 * to avoid the overhead of executing the extra scan for each INSERT
+		 * operation, even when no conflict arises, which could introduce
+		 * significant overhead to replication, particularly in cases where
+		 * conflicts are rare.
+		 *
+		 * XXX OTOH, this could lead to clean-up effort for dead tuples added
+		 * in heap and index in case of conflicts. But as conflicts shouldn't
+		 * be a frequent thing so we preferred to save the performance
+		 * overhead of extra scan before each insertion.
+		 */
+		if (conflict)
+			CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
+								   recheckIndexes, NULL, slot);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 NULL);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 NULL);
+	}
+
+	/*
+	 * XXX we should in theory pass a TransitionCaptureState object to the
+	 * above to capture transition tuples, but after statement triggers don't
+	 * actually get fired by replication yet anyway
+	 */
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+}
+
+static LRMultiInsertReturnStatus
+do_multi_inserts(StringInfo s, LogicalRepRelId *relid)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
-	LogicalRepRelId relid;
 	UserContext ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
@@ -2401,17 +2588,143 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	if (MultiInsertState == NULL)
+		begin_replication_step();
+
+	*relid = logicalrep_read_relid(s);
+
+	if (MultiInsertState != NULL &&
+		(LastMultiInsertRelId != InvalidOid &&
+		 *relid != InvalidOid &&
+		 LastMultiInsertRelId != *relid))
+		FinishMultiInserts();
+
+	if (MultiInsertState == NULL)
+		rel = logicalrep_rel_open(*relid, RowExclusiveLock);
+	else
+		rel = LastRel;
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_REL_SKIPPED;
+	}
+
+	/* For a partitioned table, let's not do multi inserts. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_DISALLOWED;
+	}
+
 	/*
-	 * Quick return if we are skipping data modification changes or handling
-	 * streamed transactions.
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
 	 */
-	if (is_skipping_changes() ||
-		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
-		return;
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	if (MultiInsertState == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/* Initialize the executor state. */
+		LastEData = edata = create_edata_for_relation(rel);
+		estate = edata->estate;
+
+		LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel),
+														 &TTSOpsVirtual);
+
+		modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext));
+		modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo;
+		modify_buffer_flush_context->estate = estate;
+
+		MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc,
+											  TM_FLAG_BAS_BULKWRITE,
+											  GetCurrentCommandId(true),
+											  0,
+											  LRModifyBufferFlushCallback,
+											  modify_buffer_flush_context);
+		LastRel = rel;
+		LastMultiInsertRelId = *relid;
+
+		/* We must open indexes here. */
+		ExecOpenIndices(edata->targetRelInfo, true);
+		InitConflictIndexes(edata->targetRelInfo);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		CommandId	cid;
+
+		edata = LastEData;
+		estate = edata->estate;
+		ResetExprContext(GetPerTupleExprContext(estate));
+		ExecClearTuple(LastRemoteSlot);
+		remoteslot = LastRemoteSlot;
+		cid = GetCurrentCommandId(true);
+		MultiInsertState->cid = cid;
+		estate->es_output_cid = cid;
+	}
+
+	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT);
+	ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot);
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	Assert(MultiInsertState != NULL);
+
+	CommandCounterIncrement();
+
+	return LR_MULTI_INSERT_DONE;
+}
+
+static bool
+do_single_inserts(StringInfo s, LogicalRepRelId relid)
+{
+	LogicalRepRelMapEntry *rel;
+	LogicalRepTupleData newtup;
+	UserContext ucxt;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+
+	Assert(relid != InvalidOid);
 
 	begin_replication_step();
 
-	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
 	if (!should_apply_changes_for_rel(rel))
 	{
@@ -2421,7 +2734,7 @@ apply_handle_insert(StringInfo s)
 		 */
 		logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
-		return;
+		return false;
 	}
 
 	/*
@@ -2443,6 +2756,7 @@ apply_handle_insert(StringInfo s)
 										&TTSOpsVirtual);
 
 	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
@@ -2467,6 +2781,35 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
+
+	return true;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+	LRMultiInsertReturnStatus mi_status;
+	LogicalRepRelId relid;
+
+	/*
+	 * Quick return if we are skipping data modification changes or handling
+	 * streamed transactions.
+	 */
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	mi_status = do_multi_inserts(s, &relid);
+	if (mi_status == LR_MULTI_INSERT_REL_SKIPPED ||
+		mi_status == LR_MULTI_INSERT_DONE)
+		return;
+
+	do_single_inserts(s, relid);
+
+	return;
 }
 
 /*
@@ -2554,6 +2897,8 @@ apply_handle_update(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -2761,6 +3106,8 @@ apply_handle_delete(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3245,6 +3592,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 69c3ebff00..17b2e42683 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "access/tableam.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -668,6 +669,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
+extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+									ResultRelInfo *resultRelInfo,
+									EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 									 EState *estate, EPQState *epqstate,
 									 TupleTableSlot *searchslot, TupleTableSlot *slot);
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index 02cb84da7e..3b7c910b03 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -55,4 +55,10 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								RepOriginId localorigin, TimestampTz localts);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
 
+extern void CheckAndReportConflict(ResultRelInfo *resultRelInfo,
+								   EState *estate, ConflictType type,
+								   List *recheckIndexes,
+								   TupleTableSlot *searchslot,
+								   TupleTableSlot *remoteslot);
+
 #endif
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..3f3a7f0a31 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
 									bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_relid(StringInfo in);
+extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index de5699e078..07a61d086d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1475,6 +1475,8 @@ LPTHREAD_START_ROUTINE
 LPTSTR
 LPVOID
 LPWSTR
+LRModifyBufferFlushContext
+LRMultiInsertReturnStatus
 LSEG
 LUID
 LVRelState
-- 
2.43.0

From 9846d4e9d845d25c9b57a054b42803019e41c0e5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Aug 2024 04:45:47 +0000
Subject: [PATCH v23 1/5] Introduce new Table AM for multi inserts

Until now, it's the COPY ... FROM command using multi inserts
(i.e. buffer some tuples and inserts them to table at once).
Various other commands can benefit from this multi insert
logic [Reusable].

Also, there's a need to have these multi insert AMs
(Access Methods) as scan-like API [Usability].

Also, there's a need allow various table AMs define their own
buffering and flushing strategy [Flexibility].

This commit introduces, new table AMs for multi inserts to help
achieve all of the above.

Upcoming commits will have these new table AMs being used for
various other commands.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx@alap3.anarazel.de
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/access/heap/heapam.c         | 197 ++++++++++++++++++++++-
 src/backend/access/heap/heapam_handler.c |   6 +
 src/backend/access/table/tableamapi.c    |   5 +
 src/include/access/heapam.h              |  38 +++++
 src/include/access/tableam.h             |  80 +++++++++
 src/tools/pgindent/typedefs.list         |   3 +
 6 files changed, 328 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 91b20147a0..86d60e476b 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -65,6 +65,7 @@
 #include "utils/datum.h"
 #include "utils/injection_point.h"
 #include "utils/inval.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -113,7 +114,7 @@ static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
-
+static void heap_modify_insert_end_callback(TableModifyState *state);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -2612,6 +2613,200 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Initialize heap modify state.
+ */
+TableModifyState *
+heap_modify_begin(Relation rel, int modify_flags,
+				  CommandId cid, int options,
+				  TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				  void *modify_buffer_flush_context)
+{
+	TableModifyState *state;
+	MemoryContext context;
+	MemoryContext oldcontext;
+
+	context = AllocSetContextCreate(TopTransactionContext,
+									"heap_modify memory context",
+									ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(context);
+	state = palloc0(sizeof(TableModifyState));
+	state->rel = rel;
+	state->modify_flags = modify_flags;
+	state->mem_cxt = context;
+	state->cid = cid;
+	state->options = options;
+	state->modify_buffer_flush_callback = modify_buffer_flush_callback;
+	state->modify_buffer_flush_context = modify_buffer_flush_context;
+	state->modify_end_callback = NULL;	/* To be installed lazily */
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
+/*
+ * Store passed-in tuple into in-memory buffered slots. When full, insert
+ * multiple tuples from the buffers into heap.
+ */
+void
+heap_modify_buffer_insert(TableModifyState *state,
+						  TupleTableSlot *slot)
+{
+	TupleTableSlot *dstslot;
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(state->mem_cxt);
+
+	/* First time through, initialize heap insert state */
+	if (state->data == NULL)
+	{
+		istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState));
+		istate->bistate = NULL;
+		istate->mistate = NULL;
+		state->data = istate;
+		mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState));
+		mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS);
+		istate->mistate = mistate;
+		mistate->mem_cxt = AllocSetContextCreate(CurrentMemoryContext,
+												 "heap_multi_insert memory context",
+												 ALLOCSET_DEFAULT_SIZES);
+
+		if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0)
+			istate->bistate = GetBulkInsertState();
+
+		state->modify_end_callback = heap_modify_insert_end_callback;
+	}
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	dstslot = mistate->slots[mistate->cur_slots];
+	if (dstslot == NULL)
+	{
+		/*
+		 * We use virtual tuple slots buffered slots for leveraging the
+		 * optimization it provides to minimize physical data copying. The
+		 * virtual slot gets materialized when we copy (via below
+		 * ExecCopySlot) the tuples from the source slot which can be of any
+		 * type. This way, it is ensured that the tuple storage doesn't depend
+		 * on external memory, because all the datums that aren't passed by
+		 * value are copied into the slot's memory context.
+		 */
+		dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel),
+									 &TTSOpsVirtual);
+		mistate->slots[mistate->cur_slots] = dstslot;
+	}
+
+	Assert(TTS_IS_VIRTUAL(dstslot));
+
+	/*
+	 * Note that the copy clears the previous destination slot contents, so
+	 * there's no need of explicit ExecClearTuple here.
+	 */
+	ExecCopySlot(dstslot, slot);
+
+	mistate->cur_slots++;
+
+	if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS)
+		heap_modify_buffer_flush(state);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Insert multiple tuples from in-memory buffered slots into heap.
+ */
+void
+heap_modify_buffer_flush(TableModifyState *state)
+{
+	HeapInsertState *istate;
+	HeapMultiInsertState *mistate;
+	MemoryContext oldcontext;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+	Assert(istate->mistate != NULL);
+	mistate = istate->mistate;
+
+	/* Quick exit if we have flushed already */
+	if (mistate->cur_slots == 0)
+		return;
+
+	/*
+	 * heap_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(mistate->mem_cxt);
+
+	heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots,
+					  state->cid, state->options, istate->bistate);
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(mistate->mem_cxt);
+
+	if (state->modify_buffer_flush_callback != NULL)
+	{
+		for (int i = 0; i < mistate->cur_slots; i++)
+			state->modify_buffer_flush_callback(state->modify_buffer_flush_context,
+												mistate->slots[i]);
+	}
+
+	mistate->cur_slots = 0;
+}
+
+/*
+ * Heap insert specific callback used for performing work at the end like
+ * flushing buffered tuples if any, cleaning up the insert state and buffered
+ * slots.
+ */
+static void
+heap_modify_insert_end_callback(TableModifyState *state)
+{
+	HeapInsertState *istate;
+
+	/* Quick exit if we haven't inserted anything yet */
+	if (state->data == NULL)
+		return;
+
+	istate = (HeapInsertState *) state->data;
+
+	if (istate->mistate != NULL)
+	{
+		HeapMultiInsertState *mistate = istate->mistate;
+
+		heap_modify_buffer_flush(state);
+
+		Assert(mistate->cur_slots == 0);
+
+		for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(mistate->slots[i]);
+
+		MemoryContextDelete(mistate->mem_cxt);
+	}
+
+	if (istate->bistate != NULL)
+		FreeBulkInsertState(istate->bistate);
+}
+
+/*
+ * Clean heap modify state.
+ */
+void
+heap_modify_end(TableModifyState *state)
+{
+	if (state->modify_end_callback != NULL)
+		state->modify_end_callback(state);
+
+	MemoryContextDelete(state->mem_cxt);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 1c6da286d4..3cacfdf871 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2611,6 +2611,12 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+
+	.tuple_modify_begin = heap_modify_begin,
+	.tuple_modify_buffer_insert = heap_modify_buffer_insert,
+	.tuple_modify_buffer_flush = heap_modify_buffer_flush,
+	.tuple_modify_end = heap_modify_end,
+
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index e9b598256f..772f29b1b5 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -97,6 +97,11 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->scan_sample_next_block != NULL);
 	Assert(routine->scan_sample_next_tuple != NULL);
 
+	Assert(routine->tuple_modify_begin != NULL);
+	Assert(routine->tuple_modify_buffer_insert != NULL);
+	Assert(routine->tuple_modify_buffer_flush != NULL);
+	Assert(routine->tuple_modify_end != NULL);
+
 	return routine;
 }
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 9e9aec88a6..8c44a7808d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -271,6 +271,32 @@ typedef enum
 	PRUNE_VACUUM_CLEANUP,		/* VACUUM 2nd heap pass */
 } PruneReason;
 
+/*
+ * Maximum number of slots that multi-insert buffers can hold.
+ *
+ * Caution: Don't make this too big, as we could end up with this many tuples
+ * stored in multi insert buffer.
+ */
+#define HEAP_MAX_BUFFERED_SLOTS		1000
+
+typedef struct HeapMultiInsertState
+{
+	/* Array of buffered slots */
+	TupleTableSlot **slots;
+
+	/* Number of buffered slots currently held */
+	int			cur_slots;
+
+	MemoryContext mem_cxt;
+} HeapMultiInsertState;
+
+typedef struct HeapInsertState
+{
+	struct BulkInsertStateData *bistate;
+	HeapMultiInsertState *mistate;
+} HeapInsertState;
+
+
 /* ----------------
  *		function prototypes for heap access method
  *
@@ -321,6 +347,18 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableModifyState *heap_modify_begin(Relation rel,
+										   int modify_flags,
+										   CommandId cid,
+										   int options,
+										   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+										   void *modify_buffer_flush_context);
+extern void heap_modify_buffer_insert(TableModifyState *state,
+									  TupleTableSlot *slot);
+extern void heap_modify_buffer_flush(TableModifyState *state);
+extern void heap_modify_end(TableModifyState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index da661289c1..083d9ac820 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -255,6 +255,39 @@ typedef struct TM_IndexDeleteOp
 	TM_IndexStatus *status;
 } TM_IndexDeleteOp;
 
+/* Table modify flags */
+
+/* Use BAS_BULKWRITE buffer access strategy */
+#define TM_FLAG_BAS_BULKWRITE	0x000001
+
+struct TableModifyState;
+
+/* Callback invoked for each tuple that gets flushed to disk from buffer */
+typedef void (*TableModifyBufferFlushCallback) (void *context,
+												TupleTableSlot *slot);
+
+/* Table AM specific callback that gets called in table_modify_end() */
+typedef void (*TableModifyEndCallback) (struct TableModifyState *state);
+
+/* Holds table modify state */
+typedef struct TableModifyState
+{
+	Relation	rel;
+	int			modify_flags;
+	MemoryContext mem_cxt;
+	CommandId	cid;
+	int			options;
+
+	/* Flush callback and its context */
+	TableModifyBufferFlushCallback modify_buffer_flush_callback;
+	void	   *modify_buffer_flush_context;
+
+	/* Table AM specific data */
+	void	   *data;
+
+	TableModifyEndCallback modify_end_callback;
+} TableModifyState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -578,6 +611,21 @@ typedef struct TableAmRoutine
 	void		(*finish_bulk_insert) (Relation rel, int options);
 
 
+	/* ------------------------------------------------------------------------
+	 * Table Modify related functions.
+	 * ------------------------------------------------------------------------
+	 */
+	TableModifyState *(*tuple_modify_begin) (Relation rel,
+											 int modify_flags,
+											 CommandId cid,
+											 int options,
+											 TableModifyBufferFlushCallback modify_buffer_flush_callback,
+											 void *modify_buffer_flush_context);
+	void		(*tuple_modify_buffer_insert) (TableModifyState *state,
+											   TupleTableSlot *slot);
+	void		(*tuple_modify_buffer_flush) (TableModifyState *state);
+	void		(*tuple_modify_end) (TableModifyState *state);
+
 	/* ------------------------------------------------------------------------
 	 * DDL related functionality.
 	 * ------------------------------------------------------------------------
@@ -1599,6 +1647,38 @@ table_finish_bulk_insert(Relation rel, int options)
 		rel->rd_tableam->finish_bulk_insert(rel, options);
 }
 
+/* ------------------------------------------------------------------------
+ * Table Modify related functions.
+ * ------------------------------------------------------------------------
+ */
+static inline TableModifyState *
+table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options,
+				   TableModifyBufferFlushCallback modify_buffer_flush_callback,
+				   void *modify_buffer_flush_context)
+{
+	return rel->rd_tableam->tuple_modify_begin(rel, modify_flags,
+											   cid, options,
+											   modify_buffer_flush_callback,
+											   modify_buffer_flush_context);
+}
+
+static inline void
+table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot);
+}
+
+static inline void
+table_modify_buffer_flush(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_buffer_flush(state);
+}
+
+static inline void
+table_modify_end(TableModifyState *state)
+{
+	state->rel->rd_tableam->tuple_modify_end(state);
+}
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e951a9e6f..538132e6f4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1145,6 +1145,8 @@ HeadlineJsonState
 HeadlineParsedText
 HeadlineWordEntry
 HeapCheckContext
+HeapInsertState
+HeapMultiInsertState
 HeapPageFreeze
 HeapScanDesc
 HeapTuple
@@ -2869,6 +2871,7 @@ TableFuncScanState
 TableFuncType
 TableInfo
 TableLikeClause
+TableModifyState
 TableSampleClause
 TableScanDesc
 TableScanDescData
-- 
2.43.0

From 479a15eebfac29f6cb87ba9e603a6179fb850f9e Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 26 Aug 2024 04:47:46 +0000
Subject: [PATCH v23 3/5] Optimize INSERT INTO SELECT with new multi insert
 table AM

This commit optimizes the INSERT INTO SELECT query for heap AM
using new multi insert table AM added by commit <<CHANGE_ME>>.

Author: Bharath Rupireddy
Reviewed-by: Jeff Davis
Discussion: https://www.postgresql.org/message-id/CALj2ACVi9eTRYR%3Dgdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/8633171cb034aafc260fdf37df04b6c779aa1e2f.camel%40j-davis.com
---
 src/backend/executor/nodeModifyTable.c | 170 ++++++++++++++++++++++---
 src/tools/pgindent/typedefs.list       |   1 +
 2 files changed, 153 insertions(+), 18 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 8bf4c80d4a..03dd372227 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -123,6 +123,18 @@ typedef struct UpdateContext
 	LockTupleMode lockmode;
 } UpdateContext;
 
+typedef struct InsertModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+	ModifyTableState *mtstate;
+} InsertModifyBufferFlushContext;
+
+static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL;
+static TableModifyState *table_modify_state = NULL;
+
+static void InsertModifyBufferFlushCallback(void *context,
+											TupleTableSlot *slot);
 
 static void ExecBatchInsert(ModifyTableState *mtstate,
 							ResultRelInfo *resultRelInfo,
@@ -735,6 +747,55 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
 	return ExecProject(newProj);
 }
 
+static void
+InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	ModifyTableState *mtstate = ctx->mtstate;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 mtstate->mt_transition_capture);
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 mtstate->mt_transition_capture);
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -760,7 +821,8 @@ ExecInsert(ModifyTableContext *context,
 		   TupleTableSlot *slot,
 		   bool canSetTag,
 		   TupleTableSlot **inserted_tuple,
-		   ResultRelInfo **insert_destrel)
+		   ResultRelInfo **insert_destrel,
+		   bool canMultiInsert)
 {
 	ModifyTableState *mtstate = context->mtstate;
 	EState	   *estate = context->estate;
@@ -773,6 +835,7 @@ ExecInsert(ModifyTableContext *context,
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
 	MemoryContext oldContext;
+	bool		ar_insert_triggers_executed = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -1138,17 +1201,53 @@ ExecInsert(ModifyTableContext *context,
 		}
 		else
 		{
-			/* insert the tuple normally */
-			table_tuple_insert(resultRelationDesc, slot,
-							   estate->es_output_cid,
-							   0, NULL);
+			if (canMultiInsert &&
+				proute == NULL &&
+				resultRelInfo->ri_WithCheckOptions == NIL &&
+				resultRelInfo->ri_projectReturning == NULL)
+			{
+				if (insert_modify_buffer_flush_context == NULL)
+				{
+					insert_modify_buffer_flush_context =
+						(InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext));
+					insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo;
+					insert_modify_buffer_flush_context->estate = estate;
+					insert_modify_buffer_flush_context->mtstate = mtstate;
+				}
 
-			/* insert index entries for tuple */
-			if (resultRelInfo->ri_NumIndices > 0)
-				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
-													   slot, estate, false,
-													   false, NULL, NIL,
-													   false);
+				if (table_modify_state == NULL)
+				{
+					table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc,
+															0,
+															estate->es_output_cid,
+															0,
+															InsertModifyBufferFlushCallback,
+															insert_modify_buffer_flush_context);
+				}
+
+				table_modify_buffer_insert(table_modify_state, slot);
+				ar_insert_triggers_executed = true;
+			}
+			else
+			{
+				/* insert the tuple normally */
+				table_tuple_insert(resultRelationDesc, slot,
+								   estate->es_output_cid,
+								   0, NULL);
+
+				/* insert index entries for tuple */
+				if (resultRelInfo->ri_NumIndices > 0)
+					recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+														   slot, estate, false,
+														   false, NULL, NIL,
+														   false);
+
+				ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+									 mtstate->mt_transition_capture);
+
+				list_free(recheckIndexes);
+				ar_insert_triggers_executed = true;
+			}
 		}
 	}
 
@@ -1182,10 +1281,12 @@ ExecInsert(ModifyTableContext *context,
 	}
 
 	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
-
-	list_free(recheckIndexes);
+	if (!ar_insert_triggers_executed)
+	{
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+		list_free(recheckIndexes);
+	}
 
 	/*
 	 * Check any WITH CHECK OPTION constraints from parent views.  We are
@@ -1881,7 +1982,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
 	/* Tuple routing starts from the root table. */
 	context->cpUpdateReturningSlot =
 		ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag,
-				   inserted_tuple, insert_destrel);
+				   inserted_tuple, insert_destrel, false);
 
 	/*
 	 * Reset the transition state that may possibly have been written by
@@ -3385,7 +3486,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 				mtstate->mt_merge_action = action;
 
 				rslot = ExecInsert(context, mtstate->rootResultRelInfo,
-								   newslot, canSetTag, NULL, NULL);
+								   newslot, canSetTag, NULL, NULL, false);
 				mtstate->mt_merge_inserted += 1;
 				break;
 			case CMD_NOTHING:
@@ -3770,6 +3871,10 @@ ExecModifyTable(PlanState *pstate)
 	HeapTupleData oldtupdata;
 	HeapTuple	oldtuple;
 	ItemPointer tupleid;
+	bool		canMultiInsert = false;
+
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -3865,6 +3970,10 @@ ExecModifyTable(PlanState *pstate)
 		if (TupIsNull(context.planSlot))
 			break;
 
+		if (operation == CMD_INSERT &&
+			nodeTag(subplanstate) == T_SeqScanState)
+			canMultiInsert = true;
+
 		/*
 		 * When there are multiple result relations, each tuple contains a
 		 * junk column that gives the OID of the rel from which it came.
@@ -4078,7 +4187,7 @@ ExecModifyTable(PlanState *pstate)
 					ExecInitInsertProjection(node, resultRelInfo);
 				slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot);
 				slot = ExecInsert(&context, resultRelInfo, slot,
-								  node->canSetTag, NULL, NULL);
+								  node->canSetTag, NULL, NULL, canMultiInsert);
 				break;
 
 			case CMD_UPDATE:
@@ -4137,6 +4246,17 @@ ExecModifyTable(PlanState *pstate)
 			return slot;
 	}
 
+	if (table_modify_state != NULL)
+	{
+		Assert(operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Insert remaining tuples for batch insert.
 	 */
@@ -4249,6 +4369,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	mtstate->mt_merge_updated = 0;
 	mtstate->mt_merge_deleted = 0;
 
+	table_modify_state = NULL;
+	insert_modify_buffer_flush_context = NULL;
+
 	/*----------
 	 * Resolve the target relation. This is the same as:
 	 *
@@ -4702,6 +4825,17 @@ ExecEndModifyTable(ModifyTableState *node)
 {
 	int			i;
 
+	if (table_modify_state != NULL)
+	{
+		Assert(node->operation == CMD_INSERT);
+
+		table_modify_end(table_modify_state);
+		table_modify_state = NULL;
+
+		pfree(insert_modify_buffer_flush_context);
+		insert_modify_buffer_flush_context = NULL;
+	}
+
 	/*
 	 * Allow any FDWs to shut down
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 538132e6f4..de5699e078 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1184,6 +1184,7 @@ ImportForeignSchema_function
 ImportQual
 InProgressEnt
 InProgressIO
+InsertModifyBufferFlushContext
 IncludeWal
 InclusionOpaque
 IncrementVarSublevelsUp_context
-- 
2.43.0

Reply via email to