diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 05ceb6550d..a395283e06 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
+	/* Toast and set header data in all the slots */
 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-											xid, cid, options);
+	{
+		HeapTuple tuple;
+
+		tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		tuple->t_tableOid = slots[i]->tts_tableOid;
+		heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+											options);
+	}
 
 	/*
 	 * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
-	/*
-	 * Copy t_self fields back to the caller's original tuples. This does
-	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-	 * probably faster to always copy than check.
-	 */
+	/* copy t_self fields back to the caller's slots */
 	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
 	pgstat_count_heap_insert(relation, ntuples);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 5c96fc91b7..2f71e45f4b 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2411,6 +2411,7 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
+	.multi_insert = heap_multi_insert,
 	.tuple_lock = heapam_tuple_lock,
 	.finish_bulk_insert = heapam_finish_bulk_insert,
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index c1fd7b78ce..17573883ef 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -316,14 +316,7 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-			 Datum *values, bool *nulls);
-static void CopyFromInsertBatch(CopyState cstate, EState *estate,
-					CommandId mycid, int ti_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
-					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
-					uint64 firstBufferedLineNo);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int	CopyReadAttributesText(CopyState cstate);
@@ -2073,33 +2066,27 @@ CopyTo(CopyState cstate)
 
 	if (cstate->rel)
 	{
-		Datum	   *values;
-		bool	   *nulls;
+		TupleTableSlot *slot;
 		TableScanDesc scandesc;
-		HeapTuple	tuple;
-
-		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
 		scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+		slot = table_slot_create(cstate->rel, NULL);
 
 		processed = 0;
-		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+		while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
 		{
 			CHECK_FOR_INTERRUPTS();
 
-			/* Deconstruct the tuple ... faster than repeated heap_getattr */
-			heap_deform_tuple(tuple, tupDesc, values, nulls);
+			/* Deconstruct the tuple ... */
+			slot_getallattrs(slot);
 
 			/* Format and send the data */
-			CopyOneRowTo(cstate, values, nulls);
+			CopyOneRowTo(cstate, slot);
 			processed++;
 		}
 
+		ExecDropSingleTupleTableSlot(slot);
 		table_endscan(scandesc);
-
-		pfree(values);
-		pfree(nulls);
 	}
 	else
 	{
@@ -2125,7 +2112,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
 	FmgrInfo   *out_functions = cstate->out_functions;
@@ -2142,11 +2129,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
 		CopySendInt16(cstate, list_length(cstate->attnumlist));
 	}
 
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
 	foreach(cur, cstate->attnumlist)
 	{
 		int			attnum = lfirst_int(cur);
-		Datum		value = values[attnum - 1];
-		bool		isnull = nulls[attnum - 1];
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
 
 		if (!cstate->binary)
 		{
@@ -2305,49 +2295,444 @@ limit_printout_length(const char *str)
 	return res;
 }
 
+/*
+ * Multi-Insert handling code for COPY FROM.  Inserts are performed in
+ * batches of up to MAX_BUFFERED_TUPLES.
+ *
+ * When COPY FROM is used on a partitioned table, we attempt to maintain
+ * only MAX_PARTITION_BUFFERS buffers at once.  Buffers that are completely
+ * unused in each batch are removed so that we end up not keeping buffers for
+ * partitions that we might not insert into again.
+ */
+#define MAX_BUFFERED_TUPLES		1000
+#define MAX_BUFFERED_BYTES		65535
+#define MAX_PARTITION_BUFFERS	15
+
+/*
+ * CopyMultiInsertBuffer
+ *		Stores multi-insert data related to a single relation in CopyFrom.
+ */
+typedef struct
+{
+	Oid			relid;			/* Relation ID this insert data is for */
+	TupleTableSlot **slots;		/* Array of MAX_BUFFERED_TUPLES to store
+								 * tuples */
+	uint64	   *linenos;		/* Line # of tuple in copy stream */
+	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
+	BulkInsertState bistate;	/* BulkInsertState for this rel */
+	int			nused;			/* number of 'slots' containing tuples */
+} CopyMultiInsertBuffer;
+
+/*
+ * CopyMultiInsertInfo
+ *		Stores one or many CopyMultiInsertBuffers and details about the
+ *		size and number of tuples which are stored in them.  This allows
+ *		multiple buffers to exist at once when COPYing into a partitioned
+ *		table.
+ */
+typedef struct
+{
+	HTAB	   *multiInsertBufferTab;	/* Hash table by relid to find the
+										 * corresponding CopyMultiInsertBuffer */
+	CopyMultiInsertBuffer *buffer;	/* Current table/partition's buffer */
+	int			bufferedTuples; /* number of tuples buffered over all buffers */
+	int			bufferedBytes;	/* number of bytes from all buffered tuples */
+	int			nbuffers;		/* number of buffers we're tracking */
+} CopyMultiInsertInfo;
+
+static void
+CopyMultiInsertInfo_Init(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
+						 bool partitioned)
+{
+	/*
+	 * When dealing with partitioned tables we may need multiple buffers. Here
+	 * We'll just set up a hash table that these can be stored in.  We'll
+	 * build buffers for individual partitions when we route a tuple to them
+	 * for the first time.
+	 */
+	if (partitioned)
+	{
+		HASHCTL		ctl;
+		HTAB	   *htab;
+
+		memset(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(Oid);
+		ctl.entrysize = sizeof(CopyMultiInsertBuffer);
+		ctl.hcxt = CurrentMemoryContext;
+
+		htab = hash_create("Copy multi-insert buffer table", MAX_PARTITION_BUFFERS + 1,
+						   &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		miinfo->multiInsertBufferTab = htab;
+		miinfo->buffer = NULL;
+		miinfo->nbuffers = 0;
+	}
+	else
+	{
+		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
+
+		/* Non-partitioned table.  Just setup the buffer for the table. */
+		buffer->relid = RelationGetRelid(rri->ri_RelationDesc);
+		buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+		buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
+		buffer->resultRelInfo = rri;
+		buffer->bistate = GetBulkInsertState();
+		buffer->nused = 0;
+		miinfo->multiInsertBufferTab = NULL;
+		miinfo->buffer = buffer;
+		miinfo->nbuffers = 1;
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+}
+
+/*
+ * CopyMultiInsertInfo_SetCurrentBuffer
+ *		Find or make a buffer for this 'rri'.
+ */
+static inline void
+CopyMultiInsertInfo_SetCurrentBuffer(CopyMultiInsertInfo *miinfo,
+									 ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer;
+	Oid			relid = RelationGetRelid(rri->ri_RelationDesc);
+	bool		found;
+
+	Assert(miinfo->multiInsertBufferTab != NULL);
+
+	buffer = hash_search(miinfo->multiInsertBufferTab, &relid, HASH_ENTER, &found);
+
+	if (!found)
+	{
+		buffer->relid = relid;
+		buffer->slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+		buffer->linenos = palloc(MAX_BUFFERED_TUPLES * sizeof(uint64));
+		buffer->resultRelInfo = rri;
+		buffer->bistate = GetBulkInsertState();
+		buffer->nused = 0;
+		miinfo->nbuffers++;
+	}
+
+	miinfo->buffer = buffer;
+}
+
+/*
+ * CopyMultiInsertInfo_IsFull
+ *		Returns true if the buffers are full
+ */
+static inline bool
+CopyMultiInsertInfo_IsFull(CopyMultiInsertInfo *miinfo)
+{
+	/*
+	 * We also use this function to let us know when we're tracking enough
+	 * buffers already by returning true when nbuffers reaches
+	 * MAX_PARTITION_BUFFERS.  If we flush the tuples when all buffers contain
+	 * at least one tuple we won't remove any unused buffers, so another tuple
+	 * can mean we end up going over MAX_PARTITION_BUFFERS, however that batch
+	 * will only be 1-tuple big, and the subsequent flush will remove all the
+	 * buffers that were previously flushed.
+	 */
+	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
+		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES ||
+		miinfo->nbuffers >= MAX_PARTITION_BUFFERS)
+		return true;
+	return false;
+}
+
+/*
+ * CopyMultiInsertInfo_IsEmpty
+ *		Returns true if we have no buffered tuples
+ */
+static inline bool
+CopyMultiInsertInfo_IsEmpty(CopyMultiInsertInfo *miinfo)
+{
+	return miinfo->bufferedTuples == 0;
+}
+
+/*
+ * CopyMultiInsertInfo_FlushSingleBuffer
+ *		Write the tuples stored in 'buffer' out to the table.
+ */
+static inline void
+CopyMultiInsertInfo_FlushSingleBuffer(CopyMultiInsertBuffer *buffer,
+									  CopyState cstate, EState *estate,
+									  CommandId mycid, int ti_options)
+{
+	MemoryContext oldcontext;
+	int			i;
+	uint64		save_cur_lineno;
+	bool		line_buf_valid = cstate->line_buf_valid;
+	int			nBufferedTuples = buffer->nused;
+	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
+	TupleTableSlot **bufferedSlots = buffer->slots;
+
+
+	/*
+	 * Print error context information correctly, if one of the operations
+	 * below fail.
+	 */
+	cstate->line_buf_valid = false;
+	save_cur_lineno = cstate->cur_lineno;
+
+	/*
+	 * heap_multi_insert leaks memory, so switch to short-lived memory context
+	 * before calling it.
+	 */
+	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   bufferedSlots,
+					   nBufferedTuples,
+					   mycid,
+					   ti_options,
+					   buffer->bistate);
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		for (i = 0; i < nBufferedTuples; i++)
+		{
+			List	   *recheckIndexes;
+
+			cstate->cur_lineno = buffer->linenos[i];
+			recheckIndexes =
+				ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+									  NIL);
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 buffer->slots[i],
+								 recheckIndexes, cstate->transition_capture);
+			list_free(recheckIndexes);
+		}
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		for (i = 0; i < nBufferedTuples; i++)
+		{
+			cstate->cur_lineno = buffer->linenos[i];
+			ExecARInsertTriggers(estate, resultRelInfo,
+								 bufferedSlots[i],
+								 NIL, cstate->transition_capture);
+		}
+	}
+
+	for (i = 0; i < nBufferedTuples; i++)
+		ExecClearTuple(bufferedSlots[i]);
+
+	/* Mark that all slots are free */
+	buffer->nused = 0;
+
+	/* reset cur_lineno and line_buf_valid to what they were */
+	cstate->line_buf_valid = line_buf_valid;
+	cstate->cur_lineno = save_cur_lineno;
+}
+
+/*
+ * CopyMultiInsertBuffer_Cleanup
+ *		Drop used slots and free member for this buffer.  The buffer
+ *		must be flushed before cleanup.
+ */
+static inline void
+CopyMultiInsertBuffer_Cleanup(CopyMultiInsertBuffer *buffer)
+{
+	int			i;
+
+	ReleaseBulkInsertStatePin(buffer->bistate);
+
+	/* Since we only create slots ondemand, just drop the non-null ones. */
+	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
+		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+
+	pfree(buffer->slots);
+	pfree(buffer->linenos);
+}
+
+/*
+ * CopyMultiInsertBuffer_RemoveBuffer
+ *		Remove a buffer from being tracked by miinfo
+ */
+static inline void
+CopyMultiInsertBuffer_RemoveBuffer(CopyMultiInsertInfo *miinfo,
+								   CopyMultiInsertBuffer *buffer)
+{
+	Oid			relid = buffer->relid;
+
+	CopyMultiInsertBuffer_Cleanup(buffer);
+
+	hash_search(miinfo->multiInsertBufferTab, (void *) &relid, HASH_REMOVE,
+				NULL);
+	miinfo->nbuffers--;
+}
+
+/*
+ * CopyMultiInsertInfo_Flush
+ *		Write out all stored tuples in all buffers out to the tables.
+ *
+ * To save us from ending up with buffers for 1000s of partitions we remove
+ * buffers belonging to partitions that we've seen no tuples for in this batch
+ */
+static inline void
+CopyMultiInsertInfo_Flush(CopyMultiInsertInfo *miinfo, CopyState cstate,
+						  EState *estate, CommandId mycid, int ti_options)
+{
+	CopyMultiInsertBuffer *buffer;
+
+	/*
+	 * If just storing buffers for a non-partitioned table, then just flush
+	 * that buffer.
+	 */
+	if (miinfo->multiInsertBufferTab == NULL)
+	{
+		buffer = miinfo->buffer;
+
+		CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate, mycid,
+											  ti_options);
+	}
+	else
+	{
+		HASH_SEQ_STATUS status;
+
+		/*
+		 * Otherwise make a pass over the hash table and flush all buffers
+		 * that have any tuples stored in them.
+		 */
+		hash_seq_init(&status, miinfo->multiInsertBufferTab);
+
+		while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
+		{
+			if (buffer->nused > 0)
+			{
+				/* Flush the buffer if it was used */
+				CopyMultiInsertInfo_FlushSingleBuffer(buffer, cstate, estate,
+													  mycid, ti_options);
+			}
+			else
+			{
+				/*
+				 * Otherwise just remove it.  If we saw no tuples for it this
+				 * batch, then likely its best to make way for buffers for
+				 * other partitions.
+				 */
+				CopyMultiInsertBuffer_RemoveBuffer(miinfo, buffer);
+			}
+		}
+	}
+
+	miinfo->bufferedTuples = 0;
+	miinfo->bufferedBytes = 0;
+}
+
+/*
+ * CopyMultiInsertInfo_Cleanup
+ *		Cleanup allocated buffers and free memory
+ */
+static inline void
+CopyMultiInsertInfo_Cleanup(CopyMultiInsertInfo *miinfo)
+{
+	if (miinfo->multiInsertBufferTab == NULL)
+		CopyMultiInsertBuffer_Cleanup(miinfo->buffer);
+	else
+	{
+		HASH_SEQ_STATUS status;
+		CopyMultiInsertBuffer *buffer;
+
+		hash_seq_init(&status, miinfo->multiInsertBufferTab);
+
+		while ((buffer = (CopyMultiInsertBuffer *) hash_seq_search(&status)) != NULL)
+		{
+			Assert(buffer->nused == 0);
+			CopyMultiInsertBuffer_Cleanup(buffer);
+		}
+
+		hash_destroy(miinfo->multiInsertBufferTab);
+	}
+}
+
+/*
+ * CopyMultiInsertInfo_NextFreeSlot
+ *		Get the next TupleTableSlot that the next tuple should be stored in.
+ *
+ * Callers must ensure that the buffer is not full.
+ */
+static inline TupleTableSlot *
+CopyMultiInsertInfo_NextFreeSlot(CopyMultiInsertInfo *miinfo,
+								 ResultRelInfo *rri)
+{
+	CopyMultiInsertBuffer *buffer = miinfo->buffer;
+	int			nused = buffer->nused;
+
+	Assert(nused < MAX_BUFFERED_TUPLES);
+
+	if (buffer->slots[nused] == NULL)
+		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
+	return buffer->slots[nused];
+}
+
+/*
+ * CopyMultiInsertInfo_Store
+ *		Consume the previously reserved TupleTableSlot that was reserved by
+ *		CopyMultiInsertInfo_NextFreeSlot.
+ */
+static inline void
+CopyMultiInsertInfo_Store(CopyMultiInsertInfo *miinfo, TupleTableSlot *slot,
+						  int tuplen, uint64 lineno)
+{
+	CopyMultiInsertBuffer *buffer = miinfo->buffer;
+
+	Assert(slot == buffer->slots[buffer->nused]);
+
+	/* Store the line number so we can properly report any errors later */
+	buffer->linenos[buffer->nused] = lineno;
+
+	/* Record this slot as being used */
+	buffer->nused++;
+
+	/* Update how many tuples are stored and their size */
+	miinfo->bufferedTuples++;
+	miinfo->bufferedBytes += tuplen;
+}
+
 /*
  * Copy FROM file to relation.
  */
 uint64
 CopyFrom(CopyState cstate)
 {
-	HeapTuple	tuple;
-	TupleDesc	tupDesc;
-	Datum	   *values;
-	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
 	ResultRelInfo *prevResultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
+	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
-	MemoryContext batchcontext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default table_insert options */
-	BulkInsertState bistate;
+	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
+	CopyMultiInsertInfo multiInsertInfo;
 	uint64		processed = 0;
-	int			nBufferedTuples = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
 
-#define MAX_BUFFERED_TUPLES 1000
-#define RECHECK_MULTI_INSERT_THRESHOLD 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
-	uint64		firstBufferedLineNo = 0;
-	uint64		lastPartitionSampleLineNo = 0;
-	uint64		nPartitionChanges = 0;
-	double		avgTuplesPerPartChange = 0;
-
 	Assert(cstate->rel);
 
+	memset(&multiInsertInfo, 0, sizeof(CopyMultiInsertInfo));
+
 	/*
 	 * The target must be a plain, foreign, or partitioned relation, or have
 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
@@ -2382,8 +2767,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	tupDesc = RelationGetDescr(cstate->rel);
-
 	/*----------
 	 * Check to see if we can avoid writing WAL
 	 *
@@ -2467,8 +2850,8 @@ CopyFrom(CopyState cstate)
 		if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		{
 			ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					errmsg("cannot perform FREEZE on a partitioned table")));
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot perform FREEZE on a partitioned table")));
 		}
 
 		/*
@@ -2518,10 +2901,6 @@ CopyFrom(CopyState cstate)
 
 	ExecInitRangeTable(estate, cstate->range_table);
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									&TTSOpsHeapTuple);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2565,10 +2944,11 @@ CopyFrom(CopyState cstate)
 												&mtstate->ps);
 
 	/*
-	 * It's more efficient to prepare a bunch of tuples for insertion, and
-	 * insert them in one heap_multi_insert() call, than call heap_insert()
-	 * separately for every tuple. However, there are a number of reasons why
-	 * we might not be able to do this.  These are explained below.
+	 * It's generally more efficient to prepare a bunch of tuples for
+	 * insertion, and insert them in one table_multi_insert() call, than call
+	 * table_insert() separately for every tuple. However, there are a number
+	 * of reasons why we might not be able to do this.  These are explained
+	 * below.
 	 */
 	if (resultRelInfo->ri_TrigDesc != NULL &&
 		(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
@@ -2589,8 +2969,8 @@ CopyFrom(CopyState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyFromInsertBatch expects that any before row insert and
-		 * statement level insert triggers are on the same relation.
+		 * now, CopyMultiInsertInfo_Flush expects that any before row insert
+		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
 	}
@@ -2622,8 +3002,7 @@ CopyFrom(CopyState cstate)
 	{
 		/*
 		 * For partitioned tables, we may still be able to perform bulk
-		 * inserts for sets of consecutive tuples which belong to the same
-		 * partition.  However, the possibility of this depends on which types
+		 * inserts.  However, the possibility of this depends on which types
 		 * of triggers exist on the partition.  We must disable bulk inserts
 		 * if the partition is a foreign table or it has any before row insert
 		 * or insert instead triggers (same as we checked above for the parent
@@ -2632,18 +3011,27 @@ CopyFrom(CopyState cstate)
 		 * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
 		 * flag that we must later determine if we can use bulk-inserts for
 		 * the partition being inserted into.
-		 *
-		 * Normally, when performing bulk inserts we just flush the insert
-		 * buffer whenever it becomes full, but for the partitioned table
-		 * case, we flush it whenever the current tuple does not belong to the
-		 * same partition as the previous tuple.
 		 */
 		if (proute)
 			insertMethod = CIM_MULTI_CONDITIONAL;
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		CopyMultiInsertInfo_Init(&multiInsertInfo, resultRelInfo,
+								 proute != NULL);
+	}
+
+	/*
+	 * If not using batch mode (which allocates slots as needed) set up a
+	 * tuple slot too. When inserting into a partitioned table, we also need
+	 * one, even if we might batch insert, to read the tuple in the root
+	 * partition's form.
+	 */
+	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+	{
+		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									   &estate->es_tupleTable);
+		bistate = GetBulkInsertState();
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2660,10 +3048,6 @@ CopyFrom(CopyState cstate)
 	 */
 	ExecBSInsertTriggers(estate, resultRelInfo);
 
-	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
-	bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2672,17 +3056,9 @@ CopyFrom(CopyState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
-	/*
-	 * Set up memory context for batches. For cases without batching we could
-	 * use the per-tuple context, but it's simpler to just use it every time.
-	 */
-	batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-										 "batch context",
-										 ALLOCSET_DEFAULT_SIZES);
-
 	for (;;)
 	{
-		TupleTableSlot *slot;
+		TupleTableSlot *myslot;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -2693,20 +3069,33 @@ CopyFrom(CopyState cstate)
 		 */
 		ResetPerTupleExprContext(estate);
 
+		if (insertMethod == CIM_SINGLE || proute)
+		{
+			myslot = singleslot;
+			Assert(myslot != NULL);
+		}
+		else
+		{
+			Assert(resultRelInfo == target_resultRelInfo);
+			Assert(insertMethod == CIM_MULTI);
+
+			myslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+													  resultRelInfo);
+		}
+
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
 		 * evaluate default expressions etc. and requires per-tuple context.
 		 */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls))
-			break;
+		ExecClearTuple(myslot);
 
-		/* Switch into per-batch memory context before forming the tuple. */
-		MemoryContextSwitchTo(batchcontext);
+		/* Directly store the values/nulls array in the slot */
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+			break;
 
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		ExecStoreVirtualTuple(myslot);
 
 		/*
 		 * Constraints might reference the tableoid column, so (re-)initialize
@@ -2717,18 +3106,15 @@ CopyFrom(CopyState cstate)
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreHeapTuple(tuple, slot, false);
-
 		if (cstate->whereClause)
 		{
 			econtext->ecxt_scantuple = myslot;
+			/* Skip items that don't match the COPY's WHERE clause */
 			if (!ExecQual(cstate->qualexpr, econtext))
 				continue;
 		}
 
-		/* Determine the partition to heap_insert the tuple into */
+		/* Determine the partition to table_insert the tuple into */
 		if (proute)
 		{
 			TupleConversionMap *map;
@@ -2739,80 +3125,10 @@ CopyFrom(CopyState cstate)
 			 * if the found partition is not suitable for INSERTs.
 			 */
 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-											  proute, slot, estate);
+											  proute, myslot, estate);
 
 			if (prevResultRelInfo != resultRelInfo)
 			{
-				/* Check if we can multi-insert into this partition */
-				if (insertMethod == CIM_MULTI_CONDITIONAL)
-				{
-					/*
-					 * When performing bulk-inserts into partitioned tables we
-					 * must insert the tuples seen so far to the heap whenever
-					 * the partition changes.
-					 */
-					if (nBufferedTuples > 0)
-					{
-						MemoryContext	oldcontext;
-
-						CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-											prevResultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/*
-						 * The tuple is already allocated in the batch context, which
-						 * we want to reset.  So to keep the tuple we copy it into the
-						 * short-lived (per-tuple) context, reset the batch context
-						 * and then copy it back into the per-batch one.
-						 */
-						oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/* cleanup the old batch */
-						MemoryContextReset(batchcontext);
-
-						/* copy the tuple back to the per-batch context */
-						oldcontext = MemoryContextSwitchTo(batchcontext);
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/*
-						 * Also push the tuple copy to the slot (resetting the context
-						 * invalidated the slot contents).
-						 */
-						ExecStoreHeapTuple(tuple, slot, false);
-					}
-
-					nPartitionChanges++;
-
-					/*
-					 * Here we adaptively enable multi-inserts based on the
-					 * average number of tuples from recent multi-insert
-					 * batches.  We recalculate the average every
-					 * RECHECK_MULTI_INSERT_THRESHOLD tuples instead of taking
-					 * the average over the whole copy.  This allows us to
-					 * enable multi-inserts when we get periods in the copy
-					 * stream that have tuples commonly belonging to the same
-					 * partition, and disable when the partition is changing
-					 * too often.
-					 */
-					if (unlikely(lastPartitionSampleLineNo <= (cstate->cur_lineno -
-															   RECHECK_MULTI_INSERT_THRESHOLD)
-								 && cstate->cur_lineno >= RECHECK_MULTI_INSERT_THRESHOLD))
-					{
-						avgTuplesPerPartChange =
-							(cstate->cur_lineno - lastPartitionSampleLineNo) /
-							(double) nPartitionChanges;
-
-						lastPartitionSampleLineNo = cstate->cur_lineno;
-						nPartitionChanges = 0;
-					}
-				}
-
 				/* Determine which triggers exist on this partition */
 				has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 											  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -2821,22 +3137,19 @@ CopyFrom(CopyState cstate)
 											   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
 
 				/*
-				 * Tests have shown that using multi-inserts when the
-				 * partition changes on every tuple slightly decreases the
-				 * performance, however, there are benefits even when only
-				 * some batches have just 2 tuples, so let's enable
-				 * multi-inserts even when the average is quite low.
+				 * Enable multi-inserts when the partition has BEFORE/INSTEAD
+				 * OF triggers, or if the partition is a foreign partition.
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
-					avgTuplesPerPartChange >= 1.3 &&
 					!has_before_insert_row_trig &&
 					!has_instead_insert_row_trig &&
 					resultRelInfo->ri_FdwRoutine == NULL;
 
-				/*
-				 * We'd better make the bulk insert mechanism gets a new
-				 * buffer when the partition being inserted into changes.
-				 */
+				/* Set the multi-insert buffer to use for this partition. */
+				if (leafpart_use_multi_insert)
+					CopyMultiInsertInfo_SetCurrentBuffer(&multiInsertInfo,
+														 resultRelInfo);
+
 				ReleaseBulkInsertStatePin(bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
@@ -2879,26 +3192,48 @@ CopyFrom(CopyState cstate)
 			 * rowtype.
 			 */
 			map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-			if (map != NULL)
+			if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
 			{
-				TupleTableSlot *new_slot;
-				MemoryContext oldcontext;
-
-				new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-				Assert(new_slot != NULL);
-
-				slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+				/* non batch insert */
+				if (map != NULL)
+				{
+					TupleTableSlot *new_slot;
 
+					new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				}
+			}
+			else
+			{
 				/*
-				 * Get the tuple in the per-batch context, so that it will be
-				 * freed after each batch insert.
+				 * Batch insert into partitioned table.
 				 */
-				oldcontext = MemoryContextSwitchTo(batchcontext);
-				tuple = ExecCopySlotHeapTuple(slot);
-				MemoryContextSwitchTo(oldcontext);
+				TupleTableSlot *nextslot;
+
+				/* no other path available for partitioned table */
+				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+				nextslot = CopyMultiInsertInfo_NextFreeSlot(&multiInsertInfo,
+															resultRelInfo);
+
+				if (map != NULL)
+					myslot = execute_attr_map_slot(map->attrMap, myslot, nextslot);
+				else
+				{
+					/*
+					 * This looks more expensive than it is (Believe me, I
+					 * optimized it away. Twice). The input is in virtual
+					 * form, and we'll materialize the slot below - for most
+					 * slot types the copy performs the work materialization
+					 * would later require anyway.
+					 */
+					ExecCopySlot(nextslot, myslot);
+					myslot = nextslot;
+				}
 			}
 
-			slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			/* ensure that triggers etc see the right relation  */
+			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 		}
 
 		skip_tuple = false;
@@ -2906,7 +3241,7 @@ CopyFrom(CopyState cstate)
 		/* BEFORE ROW INSERT Triggers */
 		if (has_before_insert_row_trig)
 		{
-			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
 				skip_tuple = true;	/* "do nothing" */
 		}
 
@@ -2919,7 +3254,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (has_instead_insert_row_trig)
 			{
-				ExecIRInsertTriggers(estate, resultRelInfo, slot);
+				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
 			}
 			else
 			{
@@ -2931,12 +3266,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
-				{
-					ExecComputeStoredGenerated(estate, slot);
-					MemoryContextSwitchTo(batchcontext);
-					tuple = ExecCopySlotHeapTuple(slot);
-					MemoryContextSwitchTo(oldcontext);
-				}
+					ExecComputeStoredGenerated(estate, myslot);
 
 				/*
 				 * If the target is a plain table, check the constraints of
@@ -2944,7 +3274,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, myslot, estate);
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -2954,7 +3284,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
 					(proute == NULL || has_before_insert_row_trig))
-					ExecPartitionCheck(resultRelInfo, slot, estate, true);
+					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
 				/*
 				 * Perform multi-inserts when enabled, or when loading a
@@ -2963,31 +3293,21 @@ CopyFrom(CopyState cstate)
 				 */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
-						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
-
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * The slot previously might point into the per-tuple
+					 * context. For batching it needs to be longer lived.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
-					{
-						CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/* free memory occupied by tuples from the batch */
-						MemoryContextReset(batchcontext);
-					}
+					ExecMaterializeSlot(myslot);
+
+					/* Add this tuple to the tuple buffer */
+					CopyMultiInsertInfo_Store(&multiInsertInfo, myslot,
+											  cstate->line_buf.len,
+											  cstate->cur_lineno);
+
+					/* If the buffer filled up, flush it. */
+					if (CopyMultiInsertInfo_IsFull(&multiInsertInfo))
+						CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate,
+												  estate, mycid, ti_options);
 				}
 				else
 				{
@@ -2996,12 +3316,12 @@ CopyFrom(CopyState cstate)
 					/* OK, store the tuple */
 					if (resultRelInfo->ri_FdwRoutine != NULL)
 					{
-						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-																			   resultRelInfo,
-																			   slot,
-																			   NULL);
+						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+																				 resultRelInfo,
+																				 myslot,
+																				 NULL);
 
-						if (slot == NULL)	/* "do nothing" */
+						if (myslot == NULL) /* "do nothing" */
 							continue;	/* next tuple please */
 
 						/*
@@ -3009,27 +3329,26 @@ CopyFrom(CopyState cstate)
 						 * column, so (re-)initialize tts_tableOid before
 						 * evaluating them.
 						 */
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
-						tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-									mycid, ti_options, bistate);
-						ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						/* OK, store the tuple and create index entries for it */
+						table_insert(resultRelInfo->ri_RelationDesc, myslot,
+									 mycid, ti_options, bistate);
 					}
 
+
 					/* And create index entries for it */
 					if (resultRelInfo->ri_NumIndices > 0)
-						recheckIndexes = ExecInsertIndexTuples(slot,
+						recheckIndexes = ExecInsertIndexTuples(myslot,
 															   estate,
 															   false,
 															   NULL,
 															   NIL);
 
 					/* AFTER ROW INSERT Triggers */
-					ExecARInsertTriggers(estate, resultRelInfo, slot,
+					ExecARInsertTriggers(estate, resultRelInfo, myslot,
 										 recheckIndexes, cstate->transition_capture);
 
 					list_free(recheckIndexes);
@@ -3045,32 +3364,25 @@ CopyFrom(CopyState cstate)
 		}
 	}
 
-	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	if (insertMethod != CIM_SINGLE)
 	{
-		if (insertMethod == CIM_MULTI_CONDITIONAL)
-		{
-			CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-								prevResultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
-		}
-		else
-			CopyFromInsertBatch(cstate, estate, mycid, ti_options,
-								resultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
-								firstBufferedLineNo);
+		/* Flush any remaining buffered tuples */
+		if (!CopyMultiInsertInfo_IsEmpty(&multiInsertInfo))
+			CopyMultiInsertInfo_Flush(&multiInsertInfo, cstate, estate, mycid,
+									  ti_options);
+
+		/* Tear down the multi-insert data */
+		CopyMultiInsertInfo_Cleanup(&multiInsertInfo);
 	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	if (bistate != NULL)
+		ReleaseBulkInsertStatePin(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-	MemoryContextDelete(batchcontext);
-
 	/*
 	 * In the old protocol, tell pqcomm that we can process normal protocol
 	 * messages again.
@@ -3084,9 +3396,6 @@ CopyFrom(CopyState cstate)
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
 
-	pfree(values);
-	pfree(nulls);
-
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
@@ -3111,88 +3420,6 @@ CopyFrom(CopyState cstate)
 	return processed;
 }
 
-/*
- * A subroutine of CopyFrom, to write the current batch of buffered heap
- * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
- * triggers.
- */
-static void
-CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
-					int ti_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
-					uint64 firstBufferedLineNo)
-{
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	bool		line_buf_valid = cstate->line_buf_valid;
-
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
-	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * heap_multi_insert leaks memory, so switch to short-lived memory context
-	 * before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(resultRelInfo->ri_RelationDesc,
-					  bufferedTuples,
-					  nBufferedTuples,
-					  mycid,
-					  ti_options,
-					  bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	/*
-	 * If there are any indexes, update them for all the inserted tuples, and
-	 * run AFTER ROW INSERT triggers.
-	 */
-	if (resultRelInfo->ri_NumIndices > 0)
-	{
-		for (i = 0; i < nBufferedTuples; i++)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-			recheckIndexes =
-				ExecInsertIndexTuples(myslot,
-									  estate, false, NULL, NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
-								 recheckIndexes, cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
-	}
-
-	/*
-	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
-	 * anyway.
-	 */
-	else if (resultRelInfo->ri_TrigDesc != NULL &&
-			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
-	{
-		for (i = 0; i < nBufferedTuples; i++)
-		{
-			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
-								 NIL, cstate->transition_capture);
-		}
-	}
-
-	/* reset cur_lineno and line_buf_valid to what they were */
-	cstate->line_buf_valid = line_buf_valid;
-	cstate->cur_lineno = save_cur_lineno;
-}
-
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -4990,11 +5217,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	DR_copy    *myState = (DR_copy *) self;
 	CopyState	cstate = myState->cstate;
 
-	/* Make sure the tuple is fully deconstructed */
-	slot_getallattrs(slot);
-
-	/* And send the data */
-	CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+	/* Send the data */
+	CopyOneRowTo(cstate, slot);
 	myState->processed++;
 
 	return true;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d5..ed0e2de144 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
@@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 4efe178ed1..c2fdedc551 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -328,6 +328,9 @@ typedef struct TableAmRoutine
 	 * ------------------------------------------------------------------------
 	 */
 
+	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+								 CommandId cid, int options, struct BulkInsertStateData *bistate);
+
 	/* see table_insert() for reference about parameters */
 	void		(*tuple_insert) (Relation rel, TupleTableSlot *slot,
 								 CommandId cid, int options,
@@ -1157,6 +1160,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 										 lockmode, update_indexes);
 }
 
+/*
+ *	table_multi_insert	- insert multiple tuple into a table
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+				   CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+	rel->rd_tableam->multi_insert(rel, slots, nslots,
+								  cid, options, bistate);
+}
+
 /*
  * Lock a tuple in the specified mode.
  *
