On Wed, Apr 3, 2024 at 1:10 AM Jeff Davis <pg...@j-davis.com> wrote: > > On Sun, 2024-03-31 at 21:18 +0530, Bharath Rupireddy wrote: > > if (table_modify_buffer_insert() is defined) > > table_modify_buffer_insert(...); > > else > > { > > myState->bistate = GetBulkInsertState(); > > table_tuple_insert(...); > > } > > We can't alloc/free the bulk insert state for every insert call. I see > two options: > > * Each caller needs to support two code paths: if the buffered insert > APIs are defined, then use those; otherwise the caller needs to manage > the bulk insert state itself and call the plain insert API. > > * Have default implementation for the new API methods, so that the > default for the begin method would allocate the bulk insert state, and > the default for the buffered insert method would be to call plain > insert using the bulk insert state. > > I'd prefer the latter, at least in the long term. But I haven't really > thought through the details, so perhaps we'd need to use the former.
I too prefer the latter so that the caller doesn't have to have two paths. The new API can just transparently fallback to single inserts. I've implemented that in the attached v17 patch. I also tested the default APIs manually, but I'll see if I can add some tests to it the default API. > > > After we have these new APIs fully in place and used by COPY, what > > > will > > > happen to those other APIs? Will they be deprecated or will there > > > be a > > > reason to keep them? > > > > Deprecated perhaps? > > Including Alexander on this thread, because he's making changes to the > multi-insert API. We need some consensus on where we are going with > these APIs before we make more changes, and what incremental steps make > sense in v17. > > Here's where I think this API should go: > > 1. Have table_modify_begin/end and table_modify_buffer_insert, like > those that are implemented in your patch. > > 2. Add some kind of flush callback that will be called either while the > tuples are being flushed or after the tuples are flushed (but before > they are freed by the AM). (Aside: do we need to call it while the > tuples are being flushed to get the right visibility semantics for > after-row triggers?) > > 3. Add table_modify_buffer_{update|delete} APIs. > > 4. Some kind of API tweaks to help manage memory when modifying > pertitioned tables, so that the buffering doesn't get out of control. > Perhaps just reporting memory usage and allowing the caller to force > flushes would be enough. > > 5. Use these new methods for CREATE/REFRESH MATERIALIZED VIEW. This is > fairly straightforward, I believe, and handled by your patch. Indexes > are (re)built afterward, and no triggers are possible. > > 6. Use these new methods for CREATE TABLE ... AS. This is fairly > straightforward, I believe, and handled by your patch. No indexes or > triggers are possible. > > 7. Use these new methods for COPY. We have to be careful to avoid > regressions for the heap method, because it's already managing its own > buffers. If the AM manages the buffering, then it may require > additional copying of slots, which could be a disadvantage. To solve > this, we may need some minor API tweaks to avoid copying when the > caller guarantees that the memory will not be freed to early, or > perhaps expose the AM's memory context to copyfrom.c. Another thing to > consider is that the buffering in copyfrom.c is also used for FDWs, so > that buffering code path needs to be preserved in copyfrom.c even if > not used for AMs. > > 8. Use these new methods for INSERT INTO ... SELECT. One potential > challenge here is that execution nodes are not always run to > completion, so we need to be sure that the flush isn't forgotten in > that case. > > 9. Use these new methods for DELETE, UPDATE, and MERGE. MERGE can use > the buffer_insert/update/delete APIs; we don't need a separate merge > method. This probably requires that the AM maintain 3 separate buffers > to distinguish different kinds of changes at flush time (obviously > these can be initialized lazily to avoid overhead when not being used). > > 10. Use these new methods for logical apply. > > 11. Deprecate the multi_insert API. > > Thoughts on this plan? Does your patch make sense in v17 as a stepping > stone, or should we try to make all of these API changes together in > v18? I'd like to see the new multi insert API (as proposed in the v17 patches) for PG17 if possible. The basic idea with these new APIs is to let the AM implementers choose the right buffered insert strategy (one can choose the AM specific slot type to buffer the tuples, choose the AM specific memory and flushing decisions etc.). Another advantage with these new multi insert API is that the CREATE MATERIALIZED VIEW, REFRESH MATERIALIZED VIEW, CREATE TABLE AS commands for heap AM got faster by 62.54%, 68.87%, 74.31% or 2.67, 3.21, 3.89 times respectively. The performance improvement in REFRESH MATERIALIZED VIEW can benefit customers running analytical workloads on postgres. I'm fine if we gradually add more infrastructure to support COPY, INSERT INTO SELECT, Logical Replication Apply, Table Rewrites in future releases. I'm sure it requires a lot more thoughts and time. > Also, a sample AM code would be a huge benefit here. Writing a real AM > is hard, but perhaps we can at least have an example one to demonstrate > how to use these APIs? The heap AM implements this new API. Also, there's a default implementation for the new API falling back on to single inserts. Aren't these sufficient to help AM implementers to come up with their own implementations? -- Bharath Rupireddy PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
From 4e349a0d877a48ff4068f776e65dcfec49e96356 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 3 Apr 2024 08:36:50 +0000 Subject: [PATCH v17 1/2] Introduce new table modify access methods --- src/backend/access/heap/heapam.c | 189 ++++++++++++++++++++++- src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableam.c | 86 +++++++++++ src/backend/access/table/tableamapi.c | 8 + src/include/access/heapam.h | 41 +++++ src/include/access/tableam.h | 106 +++++++++++++ src/tools/pgindent/typedefs.list | 3 + 7 files changed, 437 insertions(+), 1 deletion(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b661d9811e..69f8c597d8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -64,6 +64,7 @@ #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/relcache.h" #include "utils/snapmgr.h" #include "utils/spccache.h" @@ -107,7 +108,8 @@ static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); - +static void heap_modify_buffer_flush(TableModifyState *state); +static void heap_modify_insert_end(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2441,6 +2443,191 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, *insert_indexes = true; } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, int modify_flags, CommandId cid, + int options) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mctx = context; + state->cid = cid; + state->options = options; + state->insert_indexes = false; + state->modify_end_cb = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + TupleTableSlot *dstslot; + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mctx); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc0(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + + if ((state->modify_flags & TM_FLAG_MULTI_INSERTS) != 0) + { + mistate = (HeapMultiInsertState *) palloc0(sizeof(HeapMultiInsertState)); + mistate->slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * HEAP_MAX_BUFFERED_SLOTS); + istate->mistate = mistate; + } + + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + istate->bistate = GetBulkInsertState(); + + state->modify_end_cb = heap_modify_insert_end; + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + dstslot = mistate->slots[mistate->cur_slots]; + if (dstslot == NULL) + { + /* + * We use virtual tuple slots buffered slots for leveraging the + * optimization it provides to minimize physical data copying. The + * virtual slot gets materialized when we copy (via below + * ExecCopySlot) the tuples from the source slot which can be of any + * type. This way, it is ensured that the tuple storage doesn't depend + * on external memory, because all the datums that aren't passed by + * value are copied into the slot's memory context. + */ + dstslot = MakeTupleTableSlot(RelationGetDescr(state->rel), + &TTSOpsVirtual); + mistate->slots[mistate->cur_slots] = dstslot; + } + + ExecClearTuple(dstslot); + ExecCopySlot(dstslot, slot); + + mistate->cur_slots++; + + /* + * Memory allocated for the whole tuple is in slot's memory context, so + * use it keep track of the total space occupied by all buffered tuples. + */ + if (TTS_SHOULDFREE(dstslot)) + mistate->cur_size += MemoryContextMemAllocated(dstslot->tts_mcxt, false); + + if (mistate->cur_slots >= HEAP_MAX_BUFFERED_SLOTS || + mistate->cur_size >= HEAP_MAX_BUFFERED_BYTES) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +static void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + Assert(istate->bistate != NULL); + + if (mistate->cur_slots == 0) + return; + + oldcontext = MemoryContextSwitchTo(state->mctx); + + heap_multi_insert(state->rel, mistate->slots, mistate->cur_slots, + state->cid, state->options, istate->bistate, + &state->insert_indexes); + + mistate->cur_slots = 0; + mistate->cur_size = 0; + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Heap insert specific callback used for performing work at the end like + * flushing buffered tuples if any, cleaning up the insert state and buffered + * slots. + */ +static void +heap_modify_insert_end(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->cur_slots == 0 && + mistate->cur_size == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + if (state->modify_end_cb != NULL) + state->modify_end_cb(state); + + MemoryContextDelete(state->mctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index c86000d245..f3aa29851d 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2638,6 +2638,11 @@ static const TableAmRoutine heapam_methods = { .tuple_insert_speculative = heapam_tuple_insert_speculative, .tuple_complete_speculative = heapam_tuple_complete_speculative, .multi_insert = heap_multi_insert, + + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_end = heap_modify_end, + .tuple_delete = heapam_tuple_delete, .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 805d222ceb..4c7b5433ec 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -21,6 +21,7 @@ #include <math.h> +#include "access/heapam.h" /* just for BulkInsertState */ #include "access/syncscan.h" #include "access/tableam.h" #include "access/xact.h" @@ -29,6 +30,7 @@ #include "storage/bufmgr.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "utils/memutils.h" /* * Constants to control the behavior of block allocation to parallel workers @@ -48,6 +50,7 @@ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; bool synchronize_seqscans = true; +static void default_table_modify_insert_end(TableModifyState *state); /* ---------------------------------------------------------------------------- * Slot functions. @@ -772,3 +775,86 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths, else *allvisfrac = (double) relallvisible / curpages; } + +/* + * Initialize default table modify state. + */ +TableModifyState * +default_table_modify_begin(Relation rel, int modify_flags, CommandId cid, + int options) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + context = AllocSetContextCreate(CurrentMemoryContext, + "default_table_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc0(sizeof(TableModifyState)); + state->rel = rel; + state->modify_flags = modify_flags; + state->mctx = context; + state->cid = cid; + state->options = options; + state->insert_indexes = false; + state->modify_end_cb = NULL; /* To be installed lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Default table modify implementation for inserts. + */ +void +default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->mctx); + + /* First time through, initialize default table modify state */ + if (state->data == NULL) + { + if ((state->modify_flags & TM_FLAG_BAS_BULKWRITE) != 0) + state->data = (BulkInsertState) GetBulkInsertState(); + + state->modify_end_cb = default_table_modify_insert_end; + } + + /* Fallback to table AM single insert routine */ + table_tuple_insert(state->rel, + slot, + state->cid, + state->options, + (BulkInsertState) state->data, + &state->insert_indexes); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Default table modify insert specific callback used for performing work at + * the end like cleaning up the bulk insert state. + */ +static void +default_table_modify_insert_end(TableModifyState *state) +{ + if (state->data != NULL) + FreeBulkInsertState((BulkInsertState) state->data); +} + +/* + * Clean default table modify state. + */ +void +default_table_modify_end(TableModifyState *state) +{ + if (state->modify_end_cb != NULL) + state->modify_end_cb(state); + + MemoryContextDelete(state->mctx); +} diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 55b8caeadf..9c095b93e7 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -95,6 +95,14 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + /* optional, but either all of them are defined or none. */ + Assert((routine->tuple_modify_begin == NULL && + routine->tuple_modify_buffer_insert == NULL && + routine->tuple_modify_end == NULL) || + (routine->tuple_modify_begin != NULL && + routine->tuple_modify_buffer_insert != NULL && + routine->tuple_modify_end != NULL)); + return routine; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b632fe953c..b35ba5509b 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -236,6 +236,36 @@ htsv_get_valid_status(int status) return (HTSV_Result) status; } +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +/* Maximum size of all tuples that multi-insert buffers can hold */ +#define HEAP_MAX_BUFFERED_BYTES 65535 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Number of buffered slots currently held */ + int cur_slots; + + /* Approximate size of all tuples currently held in buffered slots */ + Size cur_size; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + + /* ---------------- * function prototypes for heap access method * @@ -286,6 +316,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate, bool *insert_indexes); + +extern TableModifyState *heap_modify_begin(Relation rel, + int modify_flags, + CommandId cid, + int options); + +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); + +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, int options, struct TM_FailureData *tmfd, bool changingPart, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 2c1a540155..71b823af66 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -248,6 +248,35 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +/* Table modify flags */ + +/* Use multi inserts, i.e. buffer multiple tuples and insert them at once */ +#define TM_FLAG_MULTI_INSERTS 0x000001 + +/* Use BAS_BULKWRITE buffer access strategy */ +#define TM_FLAG_BAS_BULKWRITE 0x000002 + +struct TableModifyState; + +/* Table AM specific callback that gets called in table_modify_end() */ +typedef void (*TableModifyEndCP) (struct TableModifyState *state); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + Relation rel; + int modify_flags; + MemoryContext mctx; + CommandId cid; + int options; + bool insert_indexes; + + /* Table AM specific data starts here */ + void *data; + + TableModifyEndCP modify_end_cb; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 @@ -584,6 +613,18 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + int modify_flags, + CommandId cid, + int options); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_end) (TableModifyState *state); + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1604,6 +1645,71 @@ table_finish_bulk_insert(Relation rel, int options) rel->rd_tableam->finish_bulk_insert(rel, options); } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +extern TableModifyState *default_table_modify_begin(Relation rel, int modify_flags, + CommandId cid, int options); +extern void default_table_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void default_table_modify_end(TableModifyState *state); + +static inline TableModifyState * +table_modify_begin(Relation rel, int modify_flags, CommandId cid, int options) +{ + if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin != NULL) + { + return rel->rd_tableam->tuple_modify_begin(rel, modify_flags, + cid, options); + } + else if (rel->rd_tableam && + rel->rd_tableam->tuple_modify_begin == NULL) + { + /* Fallback to a default implementation */ + return default_table_modify_begin(rel, modify_flags, + cid, options); + } + else + Assert(false); +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert != NULL) + { + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_buffer_insert == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_buffer_insert(state, slot); + } + else + Assert(false); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end != NULL) + { + state->rel->rd_tableam->tuple_modify_end(state); + } + else if (state->rel->rd_tableam && + state->rel->rd_tableam->tuple_modify_end == NULL) + { + /* Fallback to a default implementation */ + default_table_modify_end(state); + } + else + Assert(false); +} /* ------------------------------------------------------------------------ * DDL related functionality. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 2b01a3081e..edaa4d26f0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1123,6 +1123,8 @@ HeadlineJsonState HeadlineParsedText HeadlineWordEntry HeapCheckContext +HeapInsertState +HeapMultiInsertState HeapPageFreeze HeapScanDesc HeapTuple @@ -2814,6 +2816,7 @@ TableFuncScan TableFuncScanState TableInfo TableLikeClause +TableModifyState TableSampleClause TableScanDesc TableScanDescData -- 2.34.1
From 3560a49e67774f96fb2e712845c370a18f9c7a77 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Date: Wed, 3 Apr 2024 08:37:21 +0000 Subject: [PATCH v17 2/2] Optimize CTAS, CMV, RMV with multi inserts --- src/backend/commands/createas.c | 27 +++++++++------------------ src/backend/commands/matview.c | 26 +++++++++----------------- 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index afd3dace07..00c1271f93 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -53,9 +53,7 @@ typedef struct /* These fields are filled by intorel_startup: */ Relation rel; /* relation to write to */ ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -552,17 +550,19 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ myState->rel = intoRelationDesc; myState->reladdr = intoRelationAddr; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM; /* * If WITH NO DATA is specified, there is no need to set up the state for * bulk inserts as there are no tuples to insert. */ if (!into->skipData) - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(intoRelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM); else - myState->bistate = NULL; + myState->mstate = NULL; /* * Valid smgr_targblock implies something already wrote to the relation. @@ -578,7 +578,6 @@ static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_intorel *myState = (DR_intorel *) self; - bool insertIndexes; /* Nothing to insert if WITH NO DATA is specified. */ if (!myState->into->skipData) @@ -591,12 +590,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * would not be cheap either. This also doesn't allow accessing per-AM * data (say a tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate, - &insertIndexes); + table_modify_buffer_insert(myState->mstate, slot); } /* We know this is a newly created relation, so there are no indexes */ @@ -614,10 +608,7 @@ intorel_shutdown(DestReceiver *self) IntoClause *into = myState->into; if (!into->skipData) - { - FreeBulkInsertState(myState->bistate); - table_finish_bulk_insert(myState->rel, myState->ti_options); - } + table_modify_end(myState->mstate); /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 9ec13d0984..f03aa1cff3 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -48,9 +48,7 @@ typedef struct Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ Relation transientrel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int ti_options; /* table_tuple_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ + TableModifyState *mstate; /* table insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; @@ -458,9 +456,12 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * Fill private fields of myState for use by later routines */ myState->transientrel = transientrel; - myState->output_cid = GetCurrentCommandId(true); - myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; - myState->bistate = GetBulkInsertState(); + myState->mstate = table_modify_begin(transientrel, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM | + TABLE_INSERT_FROZEN); /* * Valid smgr_targblock implies something already wrote to the relation. @@ -476,7 +477,6 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; - bool insertIndexes; /* * Note that the input slot might not be of the type of the target @@ -486,13 +486,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ - - table_tuple_insert(myState->transientrel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate, - &insertIndexes); + table_modify_buffer_insert(myState->mstate, slot); /* We know this is a newly created relation, so there are no indexes */ @@ -507,9 +501,7 @@ transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; - FreeBulkInsertState(myState->bistate); - - table_finish_bulk_insert(myState->transientrel, myState->ti_options); + table_modify_end(myState->mstate); /* close transientrel, but keep lock until commit */ table_close(myState->transientrel, NoLock); -- 2.34.1