On Tue, Aug 1, 2023 at 10:32 PM Jacob Champion <jchamp...@timescale.com> wrote: > > On Tue, Aug 1, 2023 at 9:31 AM Bharath Rupireddy > <bharath.rupireddyforpostg...@gmail.com> wrote: > > Thanks. Finally, I started to spend time on this. Just curious - may > > I know the discussion in/for which this patch is referenced? What was > > the motive? Is it captured somewhere? > > It may not have been the only place, but we at least touched on it > during the unconference: > > > https://wiki.postgresql.org/wiki/PgCon_2023_Developer_Unconference#Table_AMs > > We discussed two related-but-separate ideas: > 1) bulk/batch operations and > 2) maintenance of TAM state across multiple related operations.
Thank you. I'm attaching v8 patch-set here which includes use of new insert TAMs for COPY FROM. With this, postgres not only will have the new TAM for inserts, but they also can make the following commands faster - CREATE TABLE AS, SELECT INTO, CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED VIEW and COPY FROM. I'll perform some testing in the coming days and post the results here, until then I appreciate any feedback on the patches. I've also added this proposal to CF - https://commitfest.postgresql.org/47/4777/. -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From cbdf2935be360017c0d62479e879630d4fec8766 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 17 Jan 2024 16:44:19 +0000 Subject: [PATCH v8] New TAMs for inserts --- src/backend/access/heap/heapam.c | 224 +++++++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 9 + src/include/access/heapam.h | 49 +++++ src/include/access/tableam.h | 143 +++++++++++++++ 4 files changed, 425 insertions(+) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..7df305380e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -68,6 +68,7 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -2446,6 +2447,229 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize state required for an insert a single tuple or multiple tuples + * into a heap. + */ +TableInsertState * +heap_insert_begin(Relation rel, CommandId cid, int am_flags, int insert_flags) +{ + TableInsertState *tistate; + + tistate = palloc0(sizeof(TableInsertState)); + tistate->rel = rel; + tistate->cid = cid; + tistate->am_flags = am_flags; + tistate->insert_flags = insert_flags; + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0 || + (am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY)) + tistate->am_data = palloc0(sizeof(HeapInsertState)); + + if ((am_flags & TABLEAM_MULTI_INSERTS) != 0) + { + HeapMultiInsertState *mistate; + + mistate = palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + + mistate->context = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert_v2 memory context", + ALLOCSET_DEFAULT_SIZES); + + ((HeapInsertState *) tistate->am_data)->mistate = mistate; + } + + if ((am_flags & TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY) != 0) + ((HeapInsertState *) tistate->am_data)->bistate = GetBulkInsertState(); + + return tistate; +} + +/* + * Insert a single tuple into a heap. + */ +void +heap_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + BulkInsertState bistate = NULL; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate == NULL); + + /* Update tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(state->rel); + tuple->t_tableOid = slot->tts_tableOid; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + /* Perform insertion, and copy the resulting ItemPointer */ + heap_insert(state->rel, tuple, state->cid, state->insert_flags, + bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + pfree(tuple); +} + +/* + * Create/return next free slot from multi-insert buffered slots array. + */ +TupleTableSlot * +heap_multi_insert_next_free_slot(TableInsertState * state) +{ + TupleTableSlot *slot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + slot = mistate->slots[mistate->cur_slots]; + + if (slot == NULL) + { + slot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = slot; + } + else + ExecClearTuple(slot); + + return slot; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapMultiInsertState *mistate; + + Assert(state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL); + + mistate = ((HeapInsertState *) state->am_data)->mistate; + dstslot = mistate->slots[mistate->cur_slots]; + + if (dstslot == NULL) + { + dstslot = table_slot_create(state->rel, NULL); + mistate->slots[mistate->cur_slots] = dstslot; + } + + /* + * Caller may have got the slot using heap_multi_insert_next_free_slot, + * filled it and passed. So, skip copying in such a case. + */ + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0) + { + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + } + else + Assert(dstslot == slot); + + mistate->cur_slots++; + + /* + * When passed-in slot is already materialized, memory allocated in slot's + * memory context is a close approximation for us to track the required + * space for the tuple in slot. + * + * For non-materialized slots, the flushing decision happens solely on the + * number of tuples stored in the buffer. + */ + if (TTS_SHOULDFREE(slot)) + mistate->cur_size += MemoryContextMemAllocated(slot->tts_mcxt, false); + + if ((state->am_flags & TABLEAM_SKIP_MULTI_INSERTS_FLUSH) == 0 && + (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES)) + heap_multi_insert_flush(state); +} + +/* + * Return pointer to multi-insert buffered slots array and number of currently + * occupied slots. + */ +TupleTableSlot ** +heap_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + HeapMultiInsertState *mistate; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + *num_slots = mistate->cur_slots; + + return mistate->slots; +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_multi_insert_flush(TableInsertState * state) +{ + HeapMultiInsertState *mistate; + BulkInsertState bistate = NULL; + MemoryContext oldcontext; + + mistate = ((HeapInsertState *) state->am_data)->mistate; + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + bistate = ((HeapInsertState *) state->am_data)->bistate; + + oldcontext = MemoryContextSwitchTo(mistate->context); + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->insert_flags, bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->context); + + mistate->cur_slots = 0; + mistate->cur_size = 0; +} + +/* + * Clean up state used to insert a single or multiple tuples into a heap. + */ +void +heap_insert_end(TableInsertState * state) +{ + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->mistate != NULL) + { + HeapMultiInsertState *mistate = + ((HeapInsertState *) state->am_data)->mistate; + + /* Insert remaining tuples from multi-insert buffers */ + if (mistate->cur_slots > 0 || mistate->cur_size > 0) + heap_multi_insert_flush(state); + + MemoryContextDelete(mistate->context); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + pfree(mistate); + ((HeapInsertState *) state->am_data)->mistate = NULL; + } + + if (state->am_data != NULL && + ((HeapInsertState *) state->am_data)->bistate != NULL) + FreeBulkInsertState(((HeapInsertState *) state->am_data)->bistate); + + pfree(state->am_data); + state->am_data = NULL; + pfree(state); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..795177812d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2564,6 +2564,15 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_insert_begin = heap_insert_begin, + .tuple_insert_v2 = heap_insert_v2, + .tuple_multi_insert_next_free_slot = heap_multi_insert_next_free_slot, + .tuple_multi_insert_v2 = heap_multi_insert_v2, + .tuple_multi_insert_slots = heap_multi_insert_slots, + .tuple_multi_insert_flush = heap_multi_insert_flush, + .tuple_insert_end = heap_insert_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 932ec0d6f2..46dba5245c 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -225,6 +225,40 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. For instance, increasing this can cause + * quadratic growth in memory requirements during copies into partitioned + * tables with a large number of partitions. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Memory context to use for flushing multi-insert buffers */ + MemoryContext context; + + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of slots that multi-insert buffers currently hold */ + int cur_slots; + + /* Size of all tuples that multi-insert buffers currently hold */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -275,6 +309,21 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableInsertState * heap_insert_begin(Relation rel, + CommandId cid, + int am_flags, + int insert_flags); +extern void heap_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot *heap_multi_insert_next_free_slot(TableInsertState * state); +extern void heap_multi_insert_v2(TableInsertState * state, + TupleTableSlot *slot); +extern TupleTableSlot **heap_multi_insert_slots(TableInsertState * state, + int *num_slots); +extern void heap_multi_insert_flush(TableInsertState * state); +extern void heap_insert_end(TableInsertState * state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5f8474871d..8fcaf6fe5a 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -247,6 +247,43 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TABLEAM_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY 0x000002 + +/* + * Skip flushing buffered tuples automatically. Responsibility lies with the + * caller to flush the buffered tuples. + */ +#define TABLEAM_SKIP_MULTI_INSERTS_FLUSH 0x000004 + + +/* Holds table insert state. */ +typedef struct TableInsertState +{ + /* Table AM-agnostic data starts here */ + + Relation rel; /* Target relation */ + + /* + * Command ID for this insertion. If required, change this for each pass + * of insert functions. + */ + CommandId cid; + + /* Table AM options (TABLEAM_XXX macros) */ + int am_flags; + + /* table_tuple_insert performance options (TABLE_INSERT_XXX macros) */ + int insert_flags; + + /* Table AM specific data starts here */ + + void *am_data; +} TableInsertState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -522,6 +559,20 @@ typedef struct TableAmRoutine void (*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate); + TableInsertState *(*tuple_insert_begin) (Relation rel, + CommandId cid, + int am_flags, + int insert_flags); + void (*tuple_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + void (*tuple_multi_insert_v2) (TableInsertState * state, + TupleTableSlot *slot); + TupleTableSlot *(*tuple_multi_insert_next_free_slot) (TableInsertState * state); + TupleTableSlot **(*tuple_multi_insert_slots) (TableInsertState * state, + int *num_slots); + void (*tuple_multi_insert_flush) (TableInsertState * state); + void (*tuple_insert_end) (TableInsertState * state); + /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, @@ -1456,6 +1507,98 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, cid, options, bistate); } +static inline TableInsertState * +table_insert_begin(Relation rel, CommandId cid, int am_flags, + int insert_flags) +{ + if (rel->rd_tableam && rel->rd_tableam->tuple_insert_begin) + return rel->rd_tableam->tuple_insert_begin(rel, cid, am_flags, + insert_flags); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_begin access method is not implemented for relation \"%s\"", + RelationGetRelationName(rel))); +} + +static inline void +table_tuple_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_v2) + state->rel->rd_tableam->tuple_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_tuple_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_v2(TableInsertState * state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_v2) + state->rel->rd_tableam->tuple_multi_insert_v2(state, slot); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_v2 access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot * +table_multi_insert_next_free_slot(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_next_free_slot) + return state->rel->rd_tableam->tuple_multi_insert_next_free_slot(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_next_free_slot access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline TupleTableSlot ** +table_multi_insert_slots(TableInsertState * state, int *num_slots) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_slots) + return state->rel->rd_tableam->tuple_multi_insert_slots(state, num_slots); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_slots access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_multi_insert_flush(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_multi_insert_flush) + state->rel->rd_tableam->tuple_multi_insert_flush(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_multi_insert_flush access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + +static inline void +table_insert_end(TableInsertState * state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_insert_end) + state->rel->rd_tableam->tuple_insert_end(state); + else + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("table_insert_end access method is not implemented for relation \"%s\"", + RelationGetRelationName(state->rel))); +} + /* * Delete a tuple. * -- 2.34.1
From 4835495e675bb178ecb67d84e6b00de15751ce8b Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 17 Jan 2024 15:23:38 +0000 Subject: [PATCH v8] Optimize CTAS with multi inserts --- src/backend/commands/createas.c | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 16a2fe65e6..3a02ea9578 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -58,9 +58,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -557,17 +555,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->ti_state = table_insert_begin(intoRelationDesc, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->ti_state = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -595,11 +595,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -617,10 +613,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_insert_end(myState->ti_state); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); -- 2.34.1
From d5fd779aa51c624662eefee8349f2d3f6517c3c5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 17 Jan 2024 15:27:37 +0000 Subject: [PATCH v8] Optimize RMV with multi inserts --- src/backend/commands/matview.c | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 1dcfbe879b..f84c79f5f0 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -52,10 +52,7 @@ typedef struct DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ - Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableInsertState *ti_state; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -457,13 +454,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) transientrel = table_open(myState->transientoid, NoLock); - /* - * Fill private fields of myState for use by later routines - */ - myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + /* Fill private fields of myState for use by later routines */ + myState->ti_state = table_insert_begin(transientrel, + GetCurrentCommandId(true), + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY, + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -488,12 +485,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + table_multi_insert_v2(myState->ti_state, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,14 +499,12 @@ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel = myState->ti_state->rel; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_insert_end(myState->ti_state); /* close transientrel, but keep lock until commit */ - table_close(myState->transientrel, NoLock); - myState->transientrel = NULL; + table_close(transientrel, NoLock); } /* -- 2.34.1
From 24062422b0f213f188bad844b2191923ff258807 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 17 Jan 2024 16:49:52 +0000 Subject: [PATCH v8] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 4058b08134..a6c703a99e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1