On Sat, Mar 2, 2024 at 12:02 PM Bharath Rupireddy <bharath.rupireddyforpostg...@gmail.com> wrote: > > On Mon, Jan 29, 2024 at 5:16 PM Bharath Rupireddy > <bharath.rupireddyforpostg...@gmail.com> wrote: > > > > > Please find the attached v9 patch set. > > I've had to rebase the patches due to commit 874d817, please find the > attached v11 patch set.
Rebase needed. Please see the v12 patch set. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 8a3552e65e62afc40db99fbd7bf4f98990d45390 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Fri, 8 Mar 2024 10:11:17 +0000 Subject: [PATCH v12 1/4] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 138 ++++++++++++++ 4 files changed, 420 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 34bc60f625..497940d74a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2442,6 +2443,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 680a50bf8b..84793f324e 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2562,6 +2562,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 4b133f6859..053be18110 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..834de15b9b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,93 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + { + elog(ERROR, "table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + elog(ERROR, "table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + elog(ERROR, "table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + { + elog(ERROR, "table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + { + elog(ERROR, "table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); + return NULL; /* keep compiler quiet */ + } +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + elog(ERROR, "table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + elog(ERROR, "table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel)); +} + /* * Delete a tuple. * -- 2.34.1
From fd891115178bc33df87844417e35a724b359af96 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Fri, 8 Mar 2024 10:11:41 +0000 Subject: [PATCH v12 2/4] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 62050f4dc5..7a4415c62f 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -53,9 +53,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -590,11 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -612,10 +608,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1
From 44caa58dc21e8e4634d214c074a88986b2311b41 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Fri, 8 Mar 2024 10:12:02 +0000 Subject: [PATCH v12 3/4] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 03373462f0..889a9a21f8 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -47,10 +47,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -453,13 +450,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -484,12 +481,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -503,14 +495,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1
From d53ee9b1b31b0e68858e673a618905d7bfdcf4de Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Fri, 8 Mar 2024 10:12:32 +0000 Subject: [PATCH v12 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 8908a440e1..c2a81d4df1 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -74,10 +74,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -220,14 +219,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -242,7 +258,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -319,8 +335,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -392,13 +406,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -406,18 +415,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -432,7 +430,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -490,20 +488,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -590,13 +583,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -612,6 +617,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1