On Sat, Dec 05, 2020 at 01:59:41PM -0600, Justin Pryzby wrote:
> On Thu, Dec 03, 2020 at 10:59:34AM +0530, Bharath Rupireddy wrote:
> > On Wed, Dec 2, 2020 at 10:24 PM Justin Pryzby <pry...@telsasoft.com> wrote:
> > >
> > > One loose end in this patch is how to check for volatile default 
> > > expressions.
> > 
> > I think we should be doing all the necessary checks in the planner and
> > have a flag in the planned stmt to indicate whether to go with multi
> > insert or not. For the required checks, we can have a look at how the
> > existing COPY decides to go with either CIM_MULTI or CIM_SINGLE.
> 
> Yes, you can see that I've copied the checks from copy.
> Like copy, some checks are done once, in ExecInitModifyTable, outside of the
> ExecModifyTable "loop".
> 
> This squishes some commits together.
> And uses bistate for ON CONFLICT.
> And attempts to use memory context for tuple size.

Rebased on 9dc718bdf2b1a574481a45624d42b674332e2903

I guess my patch should/may be subsumed by this other one - I'm fine with that.
https://commitfest.postgresql.org/31/2871/

Note that my interest here is just in bistate, to avoid leaving behind many
dirty buffers, not improved performance of COPY.

-- 
Justin
>From 65fd9d9352634e4d0ad49685a988c5e4e157b9d8 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 8 May 2020 02:17:32 -0500
Subject: [PATCH v9 1/4] INSERT SELECT to use BulkInsertState and multi_insert

Renames structures;
Move MultipleInsert functions from copyfrom.c to (tentatively) nodeModifyTable.h;
Move into MultiInsertInfo: transition_capture and cur_lineno (via cstate->miinfo);

Dynamically switch to multi-insert mode based on the number of insertions.
This is intended to accomodate 1) the original use case of INSERT using a small
ring buffer to avoid leaving behind dirty buffers; and, 2) Automatically using
multi-inserts for batch operations; 3) allow the old behavior of leaving behind
dirty buffers, which might allow INSERT to run more quickly, at the cost of
leaving behind many dirty buffers which other backends may have to write out.

XXX: for (1), the bulk-insert state is used even if not multi-insert, including
for a VALUES.

TODO: use cstate->miinfo.cur_lineno++ instead of mtstate->miinfo->ntuples
---
 src/backend/commands/copyfrom.c          | 394 +----------------------
 src/backend/commands/copyfromparse.c     |  10 +-
 src/backend/executor/execMain.c          |   2 +-
 src/backend/executor/execPartition.c     |   2 +-
 src/backend/executor/nodeModifyTable.c   | 196 +++++++++--
 src/backend/utils/misc/guc.c             |  10 +
 src/include/commands/copyfrom_internal.h |   5 +-
 src/include/executor/nodeModifyTable.h   | 371 +++++++++++++++++++++
 src/include/nodes/execnodes.h            |  16 +-
 src/test/regress/expected/insert.out     |  43 +++
 src/test/regress/sql/insert.sql          |  20 ++
 src/tools/pgindent/typedefs.list         |   4 +-
 12 files changed, 658 insertions(+), 415 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index c39cc736ed..8221a2c5d3 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -46,54 +46,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * No more than this many tuples per CopyMultiInsertBuffer
- *
- * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
- * multiInsertBuffers list.  Increasing this can cause quadratic growth in
- * memory requirements during copies into partitioned tables with a large
- * number of partitions.
- */
-#define MAX_BUFFERED_TUPLES		1000
-
-/*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
- */
-#define MAX_BUFFERED_BYTES		65535
-
-/* Trim the list of buffers back down to this number after flushing */
-#define MAX_PARTITION_BUFFERS	32
-
-/* Stores multi-insert data related to a single relation in CopyFrom. */
-typedef struct CopyMultiInsertBuffer
-{
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
-} CopyMultiInsertBuffer;
-
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -111,7 +63,7 @@ CopyFromErrorCallback(void *arg)
 	char		curlineno_str[32];
 
 	snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
-			 cstate->cur_lineno);
+			 cstate->miinfo.cur_lineno);
 
 	if (cstate->opts.binary)
 	{
@@ -206,317 +158,6 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
-
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
-
-	buffer = CopyMultiInsertBufferInit(rri);
-
-	/* Setup back-link so we can easily find this buffer again */
-	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
-/*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
- */
-static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
-{
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
-
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
-}
-
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState	cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
-	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
-
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
-	{
-		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, false,
-									  NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
-
-		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
-
-		ExecClearTuple(slots[i]);
-	}
-
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
-}
-
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
-{
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
-
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
-
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
-
-	pfree(buffer);
-}
-
-/*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
- */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
-
-		CopyMultiInsertBufferFlush(miinfo, buffer);
-	}
-
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-
-	/*
-	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
-	 * remove buffers starting with the ones we created first.  It seems less
-	 * likely that these older ones will be needed than the ones that were
-	 * just created.
-	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
-	{
-		CopyMultiInsertBuffer *buffer;
-
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-
-		/*
-		 * We never want to remove the buffer that's currently being used, so
-		 * if we happen to find that then move it to the end of the list.
-		 */
-		if (buffer->resultRelInfo == curr_rri)
-		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
-		}
-
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
-
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
-
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
-
-	list_free(miinfo->multiInsertBuffers);
-}
-
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
-
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
-
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
-
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
-
-	/* Record this slot as being used */
-	buffer->nused++;
-
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
-}
-
 /*
  * Copy FROM file to relation.
  */
@@ -538,7 +179,6 @@ CopyFrom(CopyFromState cstate)
 	int			ti_options = 0; /* start with default options for insert */
 	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
@@ -725,7 +365,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, MultiInsertInfoFlush expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -773,7 +413,8 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
+		MultiInsertInfoInit(&cstate->miinfo, resultRelInfo,
+								cstate->transition_capture,
 								estate, mycid, ti_options);
 	}
 
@@ -836,7 +477,7 @@ CopyFrom(CopyFromState cstate)
 			Assert(resultRelInfo == target_resultRelInfo);
 			Assert(insertMethod == CIM_MULTI);
 
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+			myslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 													 resultRelInfo);
 		}
 
@@ -905,18 +546,18 @@ CopyFrom(CopyFromState cstate)
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
 				{
-					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
+					if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+						MultiInsertInfoSetupBuffer(&cstate->miinfo,
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !MultiInsertInfoIsEmpty(&cstate->miinfo))
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 
 				if (bistate != NULL)
@@ -962,7 +603,7 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
+				batchslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo,
 															resultRelInfo);
 
 				if (map != NULL)
@@ -1042,17 +683,17 @@ CopyFrom(CopyFromState cstate)
 					ExecMaterializeSlot(myslot);
 
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
+					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
 											 cstate->line_buf.len,
-											 cstate->cur_lineno);
+											 cstate->miinfo.cur_lineno);
 
 					/*
 					 * If enough inserts have queued up, then flush all
 					 * buffers out to their tables.
 					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					if (MultiInsertInfoIsFull(&cstate->miinfo))
+						MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo);
 				}
 				else
 				{
@@ -1113,8 +754,8 @@ CopyFrom(CopyFromState cstate)
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
 	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+		if (!MultiInsertInfoIsEmpty(&cstate->miinfo))
+			MultiInsertInfoFlush(&cstate->miinfo, NULL);
 	}
 
 	/* Done, clean up */
@@ -1148,7 +789,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		MultiInsertInfoCleanup(&cstate->miinfo);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
@@ -1327,7 +968,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->reached_eof = false;
 	cstate->eol_type = EOL_UNKNOWN;
 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
 
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 4c74067f84..02efad8b31 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -460,14 +460,14 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	Assert(!cstate->opts.binary);
 
 	/* on input just throw the header line away */
-	if (cstate->cur_lineno == 0 && cstate->opts.header_line)
+	if (cstate->miinfo.cur_lineno == 0 && cstate->opts.header_line)
 	{
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 		if (CopyReadLine(cstate))
 			return false;		/* done */
 	}
 
-	cstate->cur_lineno++;
+	cstate->miinfo.cur_lineno++;
 
 	/* Actually read the line into memory here */
 	done = CopyReadLine(cstate);
@@ -608,7 +608,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 		int16		fld_count;
 		ListCell   *cur;
 
-		cstate->cur_lineno++;
+		cstate->miinfo.cur_lineno++;
 
 		if (!CopyGetInt16(cstate, &fld_count))
 		{
@@ -916,7 +916,7 @@ CopyReadLineText(CopyFromState cstate)
 			 * at all --- is cur_lineno a physical or logical count?)
 			 */
 			if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
-				cstate->cur_lineno++;
+				cstate->miinfo.cur_lineno++;
 		}
 
 		/* Process \r */
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index f4dd47acc7..79dbf0b3e8 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1247,7 +1247,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 													 * ExecInitRoutingInfo */
 	resultRelInfo->ri_PartitionTupleSlot = NULL;	/* ditto */
 	resultRelInfo->ri_ChildToRootMap = NULL;
-	resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	resultRelInfo->ri_MultiInsertBuffer = NULL;
 }
 
 /*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 941731a0a9..067bfd11ba 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -993,7 +993,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 		partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
 		partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
 
-	partRelInfo->ri_CopyMultiInsertBuffer = NULL;
+	partRelInfo->ri_MultiInsertBuffer = NULL;
 
 	/*
 	 * Keep track of it in the PartitionTupleRouting->partitions array.
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 921e695419..a53cbeeb2c 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -43,6 +43,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "commands/trigger.h"
+#include "commands/copy.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -72,6 +73,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
 											   ResultRelInfo *targetRelInfo,
 											   TupleTableSlot *slot,
 											   ResultRelInfo **partRelInfo);
+/* guc */
+int bulk_insert_ntuples = 1000;
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
@@ -389,6 +392,8 @@ ExecInsert(ModifyTableState *mtstate,
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
 	OnConflictAction onconflict = node->onConflictAction;
 	PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+	TupleTableSlot *batchslot = NULL;
+	bool	use_multi_insert = false;
 
 	/*
 	 * If the input result relation is a partitioned table, find the leaf
@@ -408,6 +413,66 @@ ExecInsert(ModifyTableState *mtstate,
 
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
+	/* Use bulk insert after a threshold number of tuples */
+	// XXX: maybe this should only be done if it's not a partitioned table or
+	// if the partitions don't support miinfo, which uses its own bistates
+	mtstate->ntuples++;
+	if (mtstate->bistate == NULL &&
+			mtstate->operation == CMD_INSERT &&
+			mtstate->ntuples > bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+	{
+		elog(DEBUG1, "enabling bulk insert");
+		mtstate->bistate = GetBulkInsertState();
+	}
+
+	if (!mtstate->miinfo)
+	{
+		/*
+		 * If multi-inserts aren't possible for this statement at all, so don't
+		 * check further
+		 */
+	} else if (proute == NULL)
+	{
+		if (mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+	else
+	{
+		/*
+		 * If a partitioned table itself allows multi-insert, and bistate
+		 * indicates we've inserted the threshold number of tuples, check if
+		 * the partition also supports it.
+		 */
+
+		/* Determine which triggers exist on this partition */
+		// XXX copyfrom.c only checks triggers when the partition changes,
+		// so maybe use_multi_insert should be in mtstate ?
+		bool has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_before_row);
+
+		bool has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
+				resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
+
+		/*
+		 * Disable multi-inserts when the partition has BEFORE/INSTEAD
+		 * OF triggers, or if the partition is a foreign partition.
+		 * The number of tuples eligible for multi-insert is tracked separately
+		 * from the total number of tuples in case it's not supported for some
+		 * partitions.
+		 */
+		if (!has_before_insert_row_trig &&
+			!has_instead_insert_row_trig &&
+			resultRelInfo->ri_FdwRoutine == NULL &&
+			mtstate->miinfo->ntuples++ >= bulk_insert_ntuples &&
+			bulk_insert_ntuples >= 0)
+			use_multi_insert = true;
+	}
+
+	if (use_multi_insert && mtstate->miinfo->ntuples - 1 == bulk_insert_ntuples)
+		elog(DEBUG1, "enabling multi insert");
+
 	/*
 	 * BEFORE ROW INSERT Triggers.
 	 *
@@ -594,7 +659,7 @@ ExecInsert(ModifyTableState *mtstate,
 			table_tuple_insert_speculative(resultRelationDesc, slot,
 										   estate->es_output_cid,
 										   0,
-										   NULL,
+										   mtstate->bistate,
 										   specToken);
 
 			/* insert index entries for tuple */
@@ -629,12 +694,39 @@ ExecInsert(ModifyTableState *mtstate,
 
 			/* Since there was no insertion conflict, we're done */
 		}
+		else if (use_multi_insert)
+		{
+			if (resultRelInfo->ri_MultiInsertBuffer == NULL)
+				MultiInsertInfoSetupBuffer(mtstate->miinfo, resultRelInfo);
+
+			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
+			ExecCopySlot(batchslot, slot);
+
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+
+			if (MultiInsertInfoIsFull(mtstate->miinfo))
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+		}
 		else
 		{
+			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
+			{
+				if (mtstate->bistate)
+					ReleaseBulkInsertStatePin(mtstate->bistate);
+				mtstate->prevResultRelInfo = resultRelInfo;
+			}
+
+			/*
+			 * Flush pending inserts if this partition can't use
+			 * batching, so rows are visible to triggers etc.
+			 */
+			if (mtstate->miinfo)
+				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
-							   0, NULL);
+							   0, mtstate->bistate);
 
 			/* insert index entries for tuple */
 			if (resultRelInfo->ri_NumIndices > 0)
@@ -647,32 +739,36 @@ ExecInsert(ModifyTableState *mtstate,
 	if (canSetTag)
 		(estate->es_processed)++;
 
-	/*
-	 * If this insert is the result of a partition key update that moved the
-	 * tuple to a new partition, put this row into the transition NEW TABLE,
-	 * if there is one. We need to do this separately for DELETE and INSERT
-	 * because they happen on different tables.
-	 */
-	ar_insert_trig_tcs = mtstate->mt_transition_capture;
-	if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
-		&& mtstate->mt_transition_capture->tcs_update_new_table)
+	/* Triggers were already run in the batch insert case */
+	if (batchslot == NULL)
 	{
-		ExecARUpdateTriggers(estate, resultRelInfo, NULL,
-							 NULL,
-							 slot,
-							 NULL,
-							 mtstate->mt_transition_capture);
-
 		/*
-		 * We've already captured the NEW TABLE row, so make sure any AR
-		 * INSERT trigger fired below doesn't capture it again.
+		 * If this insert is the result of a partition key update that moved the
+		 * tuple to a new partition, put this row into the transition NEW TABLE,
+		 * if there is one. We need to do this separately for DELETE and INSERT
+		 * because they happen on different tables.
 		 */
-		ar_insert_trig_tcs = NULL;
-	}
+		ar_insert_trig_tcs = mtstate->mt_transition_capture;
+		if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture
+			&& mtstate->mt_transition_capture->tcs_update_new_table)
+		{
+			ExecARUpdateTriggers(estate, resultRelInfo, NULL,
+								 NULL,
+								 slot,
+								 NULL,
+								 mtstate->mt_transition_capture);
+
+			/*
+			 * We've already captured the NEW TABLE row, so make sure any AR
+			 * INSERT trigger fired below doesn't capture it again.
+			 */
+			ar_insert_trig_tcs = NULL;
+		}
 
-	/* AFTER ROW INSERT Triggers */
-	ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
-						 ar_insert_trig_tcs);
+		/* AFTER ROW INSERT Triggers */
+		ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes,
+							 ar_insert_trig_tcs);
+	}
 
 	list_free(recheckIndexes);
 
@@ -2229,6 +2325,45 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
+	mtstate->bistate = NULL;
+
+	/*
+	 * Set miinfo if it can support multi-insert. This is the equivalent of
+	 * CIM_MULTI_* et al in copyfrom.c
+	 */
+
+	if (operation != CMD_INSERT ||
+			node->onConflictAction != ONCONFLICT_NONE)
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			(mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+			 // mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_after_row || // XXX or any row level triggers at all?
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_instead_row))
+		/*
+		 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+		 * triggers on the table.
+		 */
+		mtstate->miinfo = NULL;
+	else if (node->rootRelation > 0 &&
+			mtstate->rootResultRelInfo->ri_TrigDesc != NULL &&
+			 mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_new_table)
+		/*
+		 * For partitioned tables we can't support multi-inserts when there
+		 * are any statement level insert triggers.
+		 */
+		mtstate->miinfo = NULL;
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
+			/* || cstate->volatile_defexprs */ )
+		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
+		/* Can't support multi-inserts to foreign tables or if there are any */
+		mtstate->miinfo = NULL;
+	else
+	{
+		mtstate->miinfo = calloc(sizeof(*mtstate->miinfo), 1);
+		MultiInsertInfoInit(mtstate->miinfo, mtstate->rootResultRelInfo,
+				mtstate->mt_transition_capture,
+				estate, GetCurrentCommandId(true), 0);
+	}
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
@@ -2693,6 +2828,19 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	if (node->bistate)
+	{
+		FreeBulkInsertState(node->bistate);
+		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
+	}
+
+	if (node->miinfo)
+	{
+		if (!MultiInsertInfoIsEmpty(node->miinfo))
+			 MultiInsertInfoFlush(node->miinfo, node->resultRelInfo); // root ?
+		MultiInsertInfoCleanup(node->miinfo);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 17579eeaca..b6f493e0c8 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -54,6 +54,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
+#include "executor/nodeModifyTable.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paths.h"
@@ -3445,6 +3446,15 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"bulk_insert_ntuples", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Enable bulk insertions after this number of tuples."),
+			gettext_noop("A ring buffer of limited size will be used and updates done in batch"),
+		},
+		&bulk_insert_ntuples,
+		1000, -1, INT_MAX,
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37942df39..372414b1c0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "executor/nodeModifyTable.h"
 #include "commands/trigger.h"
 
 /*
@@ -92,10 +93,12 @@ typedef struct CopyFromStateData
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
-	uint64		cur_lineno;		/* line number for error messages */
 	const char *cur_attname;	/* current att for error messages */
 	const char *cur_attval;		/* current att value for error messages */
 
+	/* For bulk inserts and for error callback */
+	MultiInsertInfo miinfo;
+
 	/*
 	 * Working state
 	 */
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 83e2965531..30542a542a 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -13,8 +13,12 @@
 #ifndef NODEMODIFYTABLE_H
 #define NODEMODIFYTABLE_H
 
+#include "commands/trigger.h"
+#include "executor/executor.h" // XXX
 #include "nodes/execnodes.h"
 
+extern PGDLLIMPORT int bulk_insert_ntuples;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
@@ -23,4 +27,371 @@ extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate,
 extern void ExecEndModifyTable(ModifyTableState *node);
 extern void ExecReScanModifyTable(ModifyTableState *node);
 
+/* Bulk insert stuff which used to live in copy.c */
+
+/*
+ * No more than this many tuples per MultiInsertBuffer
+ *
+ * Caution: Don't make this too big, as we could end up with this many
+ * MultiInsertBuffer items stored in MultiInsertInfo's
+ * multiInsertBuffers list.  Increasing this can cause quadratic growth in
+ * memory requirements during copies into partitioned tables with a large
+ * number of partitions.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+
+/*
+ * Flush buffers if there are >= this many bytes, as counted by the input
+ * size, of tuples stored.
+ */
+#define MAX_BUFFERED_BYTES		65535
+
+/* Trim the list of buffers back down to this number after flushing */
+#define MAX_PARTITION_BUFFERS	32
+
+/* Stores multi-insert data related to a single relation in CopyFrom. */
+typedef struct MultiInsertBuffer
+{
+	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;	/* BulkInsertState for this rel */
+	int			nused;			/* number of 'slots' containing tuples */
+	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
+												 * stream */
+} MultiInsertBuffer;
+
+/*
+ * Stores one or many MultiInsertBuffers and details about the size and
+ * number of tuples which are stored in them.  This allows multiple buffers to
+ * exist at once when COPY/INSERTing into a partitioned table.
+ */
+typedef struct MultiInsertInfo
+{
+	List	   *multiInsertBuffers; /* List of tracked MultiInsertBuffers */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;	/* number of bytes from all buffered tuples */
+	TransitionCaptureState	*transition_capture;
+	EState	   *estate;			/* Executor state */
+	CommandId	mycid;			/* Command Id */
+	int			ti_options;		/* table insert options */
+	size_t		ntuples;		/* Number of rows *eligible* for multi-insert */
+
+	/* Line number for errors in copyfrom.c */
+	uint64		cur_lineno;
+	bool		line_buf_valid;
+} MultiInsertInfo;
+
+
+/*
+ * Allocate memory and initialize a new MultiInsertBuffer for this
+ * ResultRelInfo.
+ */
+static MultiInsertBuffer *
+MultiInsertBufferInit(ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = (MultiInsertBuffer *) palloc(sizeof(MultiInsertBuffer));
+	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
+	buffer->resultRelInfo = rri;
+	buffer->bistate = GetBulkInsertState();
+	buffer->nused = 0;
+
+	return buffer;
+}
+
+/*
+ * Make a new buffer for this ResultRelInfo.
+ */
+static inline void
+MultiInsertInfoSetupBuffer(MultiInsertInfo *miinfo,
+							   ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer;
+
+	buffer = MultiInsertBufferInit(rri);
+
+	/* Setup back-link so we can easily find this buffer again */
+	rri->ri_MultiInsertBuffer = buffer;
+	/* Record that we're tracking this buffer */
+	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+}
+
+/*
+ * Initialize an already allocated MultiInsertInfo.
+ *
+ * If rri is a non-partitioned table then a MultiInsertBuffer is set up
+ * for that table.
+ */
+static inline void
+MultiInsertInfoInit(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						TransitionCaptureState *transition_capture,
+						EState *estate, CommandId mycid, int ti_options)
+{
+	miinfo->multiInsertBuffers = NIL;
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+	miinfo->transition_capture = transition_capture;
+	miinfo->estate = estate;
+	miinfo->mycid = mycid;
+	miinfo->ti_options = ti_options;
+	miinfo->cur_lineno = 0;
+
+	/*
+	 * Only setup the buffer when not dealing with a partitioned table.
+	 * Buffers for partitioned tables will just be setup when we need to send
+	 * tuples their way for the first time.
+	 */
+	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+		MultiInsertInfoSetupBuffer(miinfo, rri);
+}
+
+/*
+ * Returns true if the buffers are full
+ */
+static inline bool
+MultiInsertInfoIsFull(MultiInsertInfo *miinfo)
+{
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
+		return true;
+	return false;
+}
+
+/*
+ * Returns true if we have no buffered tuples
+ */
+static inline bool
+MultiInsertInfoIsEmpty(MultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+MultiInsertBufferFlush(MultiInsertInfo *miinfo,
+						   MultiInsertBuffer *buffer)
+{
+	MemoryContext oldcontext;
+	int			i;
+	uint64		save_cur_lineno;
+	EState	   *estate = miinfo->estate;
+	CommandId	mycid = miinfo->mycid;
+	int			ti_options = miinfo->ti_options;
+	bool		line_buf_valid = miinfo->line_buf_valid;
+	int			nused = buffer->nused;
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+	TupleTableSlot **slots = buffer->slots;
+
+	/*
+	 * Print error context information correctly, if one of the operations
+	 * below fail.
+	 */
+	miinfo->line_buf_valid = false;
+	save_cur_lineno = miinfo->cur_lineno;
+
+	/*
+	 * table_multi_insert may leak memory, so switch to short-lived memory
+	 * context before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); // XXX requires executor.h
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   slots,
+					   nused,
+					   mycid,
+					   ti_options,
+					   buffer->bistate);
+	MemoryContextSwitchTo(oldcontext);
+
+	for (i = 0; i < nused; i++)
+	{
+		/*
+		 * If there are any indexes, update them for all the inserted tuples,
+		 * and run AFTER ROW INSERT triggers.
+		 */
+		if (resultRelInfo->ri_NumIndices > 0)
+		{
+			List	   *recheckIndexes;
+
+			miinfo->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(resultRelInfo,
+									  buffer->slots[i], estate, false, false,
+									  NULL, NIL);
+
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], recheckIndexes,
+								 miinfo->transition_capture);
+			list_free(recheckIndexes);
+		}
+
+		/*
+		 * There's no indexes, but see if we need to run AFTER ROW INSERT
+		 * triggers anyway.
+		 */
+		else if (resultRelInfo->ri_TrigDesc != NULL &&
+				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		{
+			miinfo->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 slots[i], NIL, miinfo->transition_capture);
+		}
+
+		ExecClearTuple(slots[i]);
+	}
+
+	/* Mark that all slots are free */
+	buffer->nused = 0;
+
+	/* reset cur_lineno and line_buf_valid to what they were */
+	miinfo->line_buf_valid = line_buf_valid;
+	miinfo->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * Drop used slots and free member for this buffer.
+ *
+ * The buffer must be flushed before cleanup.
+ */
+static inline void
+MultiInsertBufferCleanup(MultiInsertInfo *miinfo,
+							 MultiInsertBuffer *buffer)
+{
+	int			i;
+
+	/* Ensure buffer was flushed */
+	Assert(buffer->nused == 0);
+
+	/* Remove back-link to ourself */
+	buffer->resultRelInfo->ri_MultiInsertBuffer = NULL;
+
+	FreeBulkInsertState(buffer->bistate);
+
+	/* Since we only create slots on demand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
+							 miinfo->ti_options);
+
+	pfree(buffer);
+}
+
+/*
+ * Write out all stored tuples in all buffers out to the tables.
+ *
+ * Once flushed we also trim the tracked buffers list down to size by removing
+ * the buffers created earliest first.
+ *
+ * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
+ * used.  When cleaning up old buffers we'll never remove the one for
+ * 'curr_rri'.
+ */
+static inline void
+MultiInsertInfoFlush(MultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+	{
+		MultiInsertBuffer *buffer = (MultiInsertBuffer *) lfirst(lc);
+
+		MultiInsertBufferFlush(miinfo, buffer);
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+
+	/*
+	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
+	 * remove buffers starting with the ones we created first.  It seems less
+	 * likely that these older ones will be needed than the ones that were
+	 * just created.
+	 */
+	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	{
+		MultiInsertBuffer *buffer;
+
+		buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+
+		/*
+		 * We never want to remove the buffer that's currently being used, so
+		 * if we happen to find that then move it to the end of the list.
+		 */
+		if (buffer->resultRelInfo == curr_rri)
+		{
+			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
+			buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		}
+
+		MultiInsertBufferCleanup(miinfo, buffer);
+		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
+	}
+}
+
+/*
+ * Cleanup allocated buffers and free memory
+ */
+static inline void
+MultiInsertInfoCleanup(MultiInsertInfo *miinfo)
+{
+	ListCell   *lc;
+
+	foreach(lc, miinfo->multiInsertBuffers)
+		MultiInsertBufferCleanup(miinfo, lfirst(lc));
+
+	list_free(miinfo->multiInsertBuffers);
+}
+
+/*
+ * Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ *
+ * Note: 'miinfo' is unused but has been included for consistency with the
+ * other functions in this area.
+ */
+static inline TupleTableSlot *
+MultiInsertInfoNextFreeSlot(MultiInsertInfo *miinfo,
+								ResultRelInfo *rri)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+	int			nused = buffer->nused;
+
+	Assert(buffer != NULL);
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * Record the previously reserved TupleTableSlot that was reserved by
+ * MultiInsertInfoNextFreeSlot as being consumed.
+ */
+static inline void
+MultiInsertInfoStore(MultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 TupleTableSlot *slot, int tuplen, uint64 lineno)
+{
+	MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer;
+
+	Assert(buffer != NULL);
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 #endif							/* NODEMODIFYTABLE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 48c3f570fa..8c7ca6627d 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -14,6 +14,7 @@
 #ifndef EXECNODES_H
 #define EXECNODES_H
 
+#include "access/heapam.h"
 #include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
@@ -32,6 +33,9 @@
 #include "utils/tuplesort.h"
 #include "utils/tuplestore.h"
 
+/* This would be a circular inclusion */
+// #include "executor/nodeModifyTable.h"
+
 struct PlanState;				/* forward references in this file */
 struct ParallelHashJoinState;
 struct ExecRowMark;
@@ -39,8 +43,8 @@ struct ExprState;
 struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
-struct CopyMultiInsertBuffer;
-
+// struct MultiInsertBuffer;
+// struct MultiInsertInfo;
 
 /* ----------------
  *		ExprState node
@@ -498,8 +502,8 @@ typedef struct ResultRelInfo
 	 */
 	TupleConversionMap *ri_ChildToRootMap;
 
-	/* for use by copyfrom.c when performing multi-inserts */
-	struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+	/* for use by copyfrom.c/modifyTable when performing multi-inserts */
+	struct MultiInsertBuffer *ri_MultiInsertBuffer;
 } ResultRelInfo;
 
 /* ----------------
@@ -1165,6 +1169,10 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT, when miinfo cannot be used */
+	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
+	struct MultiInsertInfo	*miinfo;
+	size_t		ntuples;	/* Number of tuples inserted; */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da50ee3b67..bc4c1a4fc2 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -462,6 +462,49 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'),
             part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED,
             part_default DEFAULT, PARTITIONED
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+ a  
+----
+ 12
+ 11
+(2 rows)
+
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+DEBUG:  enabling bulk insert
+DEBUG:  enabling multi insert
+RESET client_min_messages;
+select count(1) from hash_parted;
+ count 
+-------
+ 10001
+(1 row)
+
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Index Only Scan using hpart1_a_idx on hpart1 hash_parted
+   Index Cond: (a = 13)
+(2 rows)
+
+select * from hash_parted where a=13;
+ a  
+----
+ 13
+(1 row)
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 963faa1614..a74eb3826a 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -280,6 +280,26 @@ from hash_parted order by part;
 -- partitions
 \d+ list_parted
 
+-- bulk inserts
+truncate hash_parted;
+begin;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+-- exercise bulk insert to partitions
+SET client_min_messages=debug1;
+insert into hash_parted select generate_series(1,9999);
+RESET client_min_messages;
+select count(1) from hash_parted;
+commit;
+-- test that index was updated
+vacuum analyze hash_parted;
+explain(costs off)
+select * from hash_parted where a=13;
+select * from hash_parted where a=13;
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 943142ced8..2274f78843 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -423,8 +423,6 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyInsertMethod
-CopyMultiInsertBuffer
-CopyMultiInsertInfo
 CopyState
 CopyStateData
 CopyStmt
@@ -1401,6 +1399,8 @@ ModifyTableState
 MorphOpaque
 MsgType
 MultiAssignRef
+MultiInsertBuffer
+MultiInsertInfo
 MultiSortSupport
 MultiSortSupportData
 MultiXactId
-- 
2.17.0

>From 9443f342039fb35db6df7940183ef324777db1c5 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 23:11:31 -0600
Subject: [PATCH v9 2/4] WIP: Check for volatile defaults

We want to check if any column *uses* a volatile default value, but after
parsing and rewriting, that information appears to be lost about which column
values are defaults and which were specified.  insertedcols doesn't appear to
be useful for this.  So add a field to track if a TargetEntry is planned with
column default.
---
 src/backend/executor/nodeModifyTable.c | 64 ++++++++++++++++++++++++--
 src/backend/nodes/copyfuncs.c          |  1 +
 src/backend/nodes/equalfuncs.c         |  1 +
 src/backend/nodes/makefuncs.c          |  1 +
 src/backend/nodes/outfuncs.c           |  1 +
 src/backend/nodes/readfuncs.c          |  1 +
 src/backend/optimizer/util/tlist.c     |  1 +
 src/backend/rewrite/rewriteHandler.c   |  3 ++
 src/include/nodes/primnodes.h          |  2 +
 9 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index a53cbeeb2c..2059428e2a 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -50,6 +50,7 @@
 #include "foreign/fdwapi.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -2258,6 +2259,61 @@ ExecModifyTable(PlanState *pstate)
 	return NULL;
 }
 
+/*
+ * Determine if a table has volatile column defaults which are used by a given
+ * planned statement (if the column is not specified or specified as DEFAULT).
+ * This works only for INSERT.
+ */
+static bool
+has_volatile_defaults(ResultRelInfo *resultRelInfo, ModifyTable *node)
+{
+	TupleDesc	tupDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc);
+	Plan		*plan;
+
+	Assert(list_length(node->plans) == 1);
+	plan = linitial(node->plans);
+
+	for (int attnum = 1; attnum <= tupDesc->natts; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+		Expr		*defexpr;
+		TargetEntry	*tle;
+
+		/* We don't need to check dropped/generated attributes */
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		tle = list_nth(plan->targetlist, attnum - 1);
+		Assert(tle != NULL);
+		Assert(tle->resno == attnum);
+
+		/*
+		 * If the column was specified with a non-default value, then don't
+		 * check the volatility of its default
+		 */
+		if (!tle->isdefault)
+			continue;
+
+		/* Check the column's default value if one exists */
+		defexpr = (Expr *) build_column_default(resultRelInfo->ri_RelationDesc, attnum);
+		if (defexpr == NULL)
+			continue;
+
+		/* Run the expression through planner */
+		// defexpr = expression_planner(defexpr);
+		// (void) ExecInitExpr(defexpr, NULL);
+		expression_planner(defexpr);
+
+		if (contain_volatile_functions_not_nextval((Node *) defexpr))
+		{
+			elog(DEBUG1, "found volatile att %d", attnum);
+			return true;
+		}
+	}
+
+	return false;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitModifyTable
  * ----------------------------------------------------------------
@@ -2352,10 +2408,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 		 * are any statement level insert triggers.
 		 */
 		mtstate->miinfo = NULL;
-	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL
-			/* || cstate->volatile_defexprs */ )
-		// XXX contain_volatile_functions_not_nextval((Node *) defexpr);
-		/* Can't support multi-inserts to foreign tables or if there are any */
+	else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL ||
+			has_volatile_defaults(mtstate->rootResultRelInfo, node))
+		/* Can't support multi-inserts to foreign tables or if there are any
+		 * volatile default expressions in the table. */
 		mtstate->miinfo = NULL;
 	else
 	{
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index ba3ccc712c..2008e8e5d6 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2178,6 +2178,7 @@ _copyTargetEntry(const TargetEntry *from)
 	COPY_SCALAR_FIELD(resorigtbl);
 	COPY_SCALAR_FIELD(resorigcol);
 	COPY_SCALAR_FIELD(resjunk);
+	COPY_SCALAR_FIELD(isdefault);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index a2ef853dc2..aa3cdf3729 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -770,6 +770,7 @@ _equalTargetEntry(const TargetEntry *a, const TargetEntry *b)
 	COMPARE_SCALAR_FIELD(resorigtbl);
 	COMPARE_SCALAR_FIELD(resorigcol);
 	COMPARE_SCALAR_FIELD(resjunk);
+	COMPARE_SCALAR_FIELD(isdefault);
 
 	return true;
 }
diff --git a/src/backend/nodes/makefuncs.c b/src/backend/nodes/makefuncs.c
index 01c110cd2f..aeeba7032f 100644
--- a/src/backend/nodes/makefuncs.c
+++ b/src/backend/nodes/makefuncs.c
@@ -254,6 +254,7 @@ makeTargetEntry(Expr *expr,
 	tle->ressortgroupref = 0;
 	tle->resorigtbl = InvalidOid;
 	tle->resorigcol = 0;
+	tle->isdefault = false;
 
 	tle->resjunk = resjunk;
 
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 8392be6d44..924ffda1c6 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1661,6 +1661,7 @@ _outTargetEntry(StringInfo str, const TargetEntry *node)
 	WRITE_OID_FIELD(resorigtbl);
 	WRITE_INT_FIELD(resorigcol);
 	WRITE_BOOL_FIELD(resjunk);
+	WRITE_BOOL_FIELD(isdefault);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index d2c8d58070..642b6c63c5 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1275,6 +1275,7 @@ _readTargetEntry(void)
 	READ_OID_FIELD(resorigtbl);
 	READ_INT_FIELD(resorigcol);
 	READ_BOOL_FIELD(resjunk);
+	READ_BOOL_FIELD(isdefault);
 
 	READ_DONE();
 }
diff --git a/src/backend/optimizer/util/tlist.c b/src/backend/optimizer/util/tlist.c
index 89853a0630..7ef1517027 100644
--- a/src/backend/optimizer/util/tlist.c
+++ b/src/backend/optimizer/util/tlist.c
@@ -349,6 +349,7 @@ apply_tlist_labeling(List *dest_tlist, List *src_tlist)
 		dest_tle->resorigtbl = src_tle->resorigtbl;
 		dest_tle->resorigcol = src_tle->resorigcol;
 		dest_tle->resjunk = src_tle->resjunk;
+		dest_tle->isdefault = src_tle->isdefault;
 	}
 }
 
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 0c7508a0d8..7ab13e51e5 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -986,10 +986,13 @@ rewriteTargetListIU(List *targetList,
 			}
 
 			if (new_expr)
+			{
 				new_tle = makeTargetEntry((Expr *) new_expr,
 										  attrno,
 										  pstrdup(NameStr(att_tup->attname)),
 										  false);
+				new_tle->isdefault = true;
+			}
 		}
 
 		/*
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index d4ce037088..888bd36a07 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -1437,6 +1437,8 @@ typedef struct TargetEntry
 	AttrNumber	resorigcol;		/* column's number in source table */
 	bool		resjunk;		/* set to true to eliminate the attribute from
 								 * final target list */
+	bool		isdefault;		/* true if using the column default, either
+								 * by "DEFAULT" or omission of the column */
 } TargetEntry;
 
 
-- 
2.17.0

>From 1705f4a471d59032e716db6618882e2cf7decebd Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 5 Dec 2020 08:52:14 -0600
Subject: [PATCH v9 3/4] COPY: flush multi-insert buffer based on accumulated
 size of tuples..

..rather than line length
---
 src/backend/commands/copyfrom.c        | 2 +-
 src/include/executor/nodeModifyTable.h | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8221a2c5d3..e5ea909dfe 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -685,7 +685,7 @@ CopyFrom(CopyFromState cstate)
 					/* Add this tuple to the tuple buffer */
 					MultiInsertInfoStore(&cstate->miinfo,
 											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
+											 MemoryContextMemAllocated(myslot->tts_mcxt, true),
 											 cstate->miinfo.cur_lineno);
 
 					/*
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 30542a542a..87b689099c 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -41,10 +41,10 @@ extern void ExecReScanModifyTable(ModifyTableState *node);
 #define MAX_BUFFERED_TUPLES		1000
 
 /*
- * Flush buffers if there are >= this many bytes, as counted by the input
- * size, of tuples stored.
+ * Flush buffers if there are >= this many bytes of tuples stored, as counted
+ * by the slot's memory contexts.
  */
-#define MAX_BUFFERED_BYTES		65535
+#define MAX_BUFFERED_BYTES		(1024*1024*8)
 
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
-- 
2.17.0

>From 9443ca50187f46eaceaa2b53383d9a668afecca9 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Tue, 1 Dec 2020 17:20:25 -0600
Subject: [PATCH v9 4/4] WIP: check tuple size

Or maybe INSERT should flush buffer based only on the *number* of tuples, and
not their size ?
---
 src/backend/executor/nodeModifyTable.c | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 2059428e2a..04b0598183 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -703,7 +703,13 @@ ExecInsert(ModifyTableState *mtstate,
 			batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo);
 			ExecCopySlot(batchslot, slot);
 
-			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno
+			MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot,
+					// sizeof(void*) * batchslot->tts_nvalid, /* tuple size - underestimate */
+					MemoryContextMemAllocated(batchslot->tts_mcxt, true), /* tuple size */
+					mtstate->ntuples); /* lineno */
+
+			elog(DEBUG2, "bufferedBytes %d; tuples %ld",
+					mtstate->miinfo->bufferedBytes, mtstate->ntuples);
 
 			if (MultiInsertInfoIsFull(mtstate->miinfo))
 				MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo);
-- 
2.17.0

Reply via email to