On Mon, Nov 23, 2020 at 08:00:20PM -0600, Justin Pryzby wrote: > On Mon, Nov 02, 2020 at 12:45:51PM -0600, Justin Pryzby wrote: > > On Mon, Nov 02, 2020 at 07:53:45AM +0100, Luc Vlaming wrote: > > > On 30.10.20 05:51, Justin Pryzby wrote: > > > > On Thu, Oct 22, 2020 at 01:29:53PM +0100, Simon Riggs wrote: > > > > > On Fri, 16 Oct 2020 at 22:05, Justin Pryzby <pry...@telsasoft.com> > > > > > wrote: > > > > > > > > > > > > > I made this conditional on BEGIN BULK/SET bulk, so I'll solicit > > > > > > > > comments on that. > > > > > > > > > > I think it would be better if this was self-tuning. So that we don't > > > > > allocate a bulkinsert state until we've done say 100 (?) rows > > > > > inserted. > > > > > > > > I made it an optional, non-default behavior in response to the > > > > legitimate > > > > concern for performance regression for the cases where a loader needs > > > > to be as > > > > fast as possible - as compared with our case, where we want instead to > > > > optimize > > > > for our reports by making the loaders responsible for their own writes, > > > > rather > > > > than leaving behind many dirty pages, and clobbering the cache, too. > > > > > > > > Also, INSERT SELECT doesn't immediately help us (telsasoft), since we > > > > use > > > > INSERT .. VALUES () .. ON CONFLICT. This would handle that case, which > > > > is > > > > great, even though that wasn't a design goal. It could also be an > > > > integer GUC > > > > to allow configuring the size of the ring buffer. > > > > > > > > > You should also use table_multi_insert() since that will give further > > > > > performance gains by reducing block access overheads. Switching from > > > > > single row to multi-row should also only happen once we've loaded a > > > > > few rows, so we don't introduce overahads for smaller SQL statements. > > > > > > > > Good idea...multi_insert (which reduces the overhead of individual > > > > inserts) is > > > > mostly independent from BulkInsert state (which uses a ring-buffer to > > > > avoid > > > > dirtying the cache). I made this 0002. > > > > > > > > This makes INSERT SELECT several times faster, and not clobber the > > > > cache too. > > - Rebased on Heikki's copy.c split; > - Rename structures without "Copy" prefix; > - Move MultiInsert* from copyfrom.c to (tentatively) nodeModifyTable.h; > - Move cur_lineno and transition_capture into MultiInsertInfo; > > This switches to multi insert after a configurable number of tuples. > If set to -1, that provides the historic behavior that bulk inserts > can leave behind many dirty buffers. Perhaps that should be the default. > > I guess this shouldn't be in copy.h or in commands/* at all. > It'll be included by both: commands/copyfrom_internal.h and > executor/nodeModifyTable.h. Maybe it should go in util or lib... > I don't know how to do it without including executor.h, which seems > to be undesirable.
Attached resolves issue with FDW contrib by including the MultiInsertInfo structure rather than a pointer and makes the logic more closely match copyfrom.c related to partition/triggers. I had made this a conditional based on the concern that bulk insert state would cause regression. But then it occurred to me that COPY uses a bulk insert unconditionally. Should COPY be conditional, too ? Or maybe that's ok, since COPY is assumed to be a bulk operation. -- Justin
>From 886709926523f480255b4897d5bb08984be26a29 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Fri, 8 May 2020 02:17:32 -0500 Subject: [PATCH v7 1/3] Allow INSERT SELECT to use a BulkInsertState --- src/backend/executor/nodeModifyTable.c | 22 ++++++++++++++++++++-- src/backend/parser/gram.y | 7 ++++++- src/backend/tcop/utility.c | 4 ++++ src/backend/utils/misc/guc.c | 11 +++++++++++ src/include/executor/nodeModifyTable.h | 2 ++ src/include/nodes/execnodes.h | 3 +++ src/include/parser/kwlist.h | 1 + src/test/regress/expected/insert.out | 23 +++++++++++++++++++++++ src/test/regress/sql/insert.sql | 13 +++++++++++++ 9 files changed, 83 insertions(+), 3 deletions(-) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e0f24283b8..f65ae2c0d6 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -72,6 +72,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, ResultRelInfo *targetRelInfo, TupleTableSlot *slot, ResultRelInfo **partRelInfo); +/* guc */ +bool insert_in_bulk = false; /* * Verify that the tuples to be produced by INSERT or UPDATE match the @@ -594,7 +596,7 @@ ExecInsert(ModifyTableState *mtstate, table_tuple_insert_speculative(resultRelationDesc, slot, estate->es_output_cid, 0, - NULL, + NULL, /* Bulk insert not supported */ specToken); /* insert index entries for tuple */ @@ -631,10 +633,17 @@ ExecInsert(ModifyTableState *mtstate, } else { + if (proute && mtstate->prevResultRelInfo != resultRelInfo) + { + if (mtstate->bistate) + ReleaseBulkInsertStatePin(mtstate->bistate); + mtstate->prevResultRelInfo = resultRelInfo; + } + /* insert the tuple normally */ table_tuple_insert(resultRelationDesc, slot, estate->es_output_cid, - 0, NULL); + 0, mtstate->bistate); /* insert index entries for tuple */ if (resultRelInfo->ri_NumIndices > 0) @@ -2229,6 +2238,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; + mtstate->bistate = NULL; + if (operation == CMD_INSERT && insert_in_bulk) + mtstate->bistate = GetBulkInsertState(); /* set up epqstate with dummy subplan data for the moment */ EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam); @@ -2695,6 +2707,12 @@ ExecEndModifyTable(ModifyTableState *node) resultRelInfo); } + if (node->bistate) + { + FreeBulkInsertState(node->bistate); + table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0); + } + /* * Close all the partitioned tables, leaf partitions, and their indices * and release the slot used for tuple routing, if set. diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index efc9c99754..5915c8c414 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT - BOOLEAN_P BOTH BY + BOOLEAN_P BOTH BULK BY CACHE CALL CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE @@ -9886,6 +9886,9 @@ transaction_mode_item: | NOT DEFERRABLE { $$ = makeDefElem("transaction_deferrable", makeIntConst(false, @1), @1); } + | BULK + { $$ = makeDefElem("bulk", + makeIntConst(true, @1), @1); } ; /* Syntax with commas is SQL-spec, without commas is Postgres historical */ @@ -15157,6 +15160,7 @@ unreserved_keyword: | BACKWARD | BEFORE | BEGIN_P + | BULK | BY | CACHE | CALL @@ -15668,6 +15672,7 @@ bare_label_keyword: | BIT | BOOLEAN_P | BOTH + | BULK | BY | CACHE | CALL diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 81ac9b1cb2..a0a4034409 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -611,6 +611,10 @@ standard_ProcessUtility(PlannedStmt *pstmt, SetPGVariable("transaction_deferrable", list_make1(item->arg), true); + else if (strcmp(item->defname, "bulk") == 0) + SetPGVariable("bulk_insert", + list_make1(item->arg), + true); } } break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 245a3472bc..c470314134 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -54,6 +54,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/cost.h" +#include "executor/nodeModifyTable.h" #include "optimizer/geqo.h" #include "optimizer/optimizer.h" #include "optimizer/paths.h" @@ -2036,6 +2037,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"bulk_insert", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Sets the transaction to bulk insert mode."), + gettext_noop("A ring buffer of limited size will be used."), + }, + &insert_in_bulk, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 46a2dc9511..09c312a052 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -15,6 +15,8 @@ #include "nodes/execnodes.h" +extern PGDLLIMPORT bool insert_in_bulk; + extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 61ba4c3666..024b0e4da4 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -14,6 +14,7 @@ #ifndef EXECNODES_H #define EXECNODES_H +#include "access/heapam.h" #include "access/tupconvert.h" #include "executor/instrument.h" #include "fmgr.h" @@ -1165,6 +1166,8 @@ typedef struct ModifyTableState List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ + BulkInsertState bistate; /* state for bulk insert like INSERT SELECT */ + ResultRelInfo *prevResultRelInfo; /* last child inserted with bistate */ /* * Slot for storing tuples in the root partitioned table's rowtype during diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 71dcdf2889..0991da11e7 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -60,6 +60,7 @@ PG_KEYWORD("binary", BINARY, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("bit", BIT, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("boolean", BOOLEAN_P, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("both", BOTH, RESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("bulk", BULK, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("by", BY, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("cache", CACHE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("call", CALL, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out index da50ee3b67..da0dae6240 100644 --- a/src/test/regress/expected/insert.out +++ b/src/test/regress/expected/insert.out @@ -462,6 +462,29 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'), part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED, part_default DEFAULT, PARTITIONED +-- bulk inserts +truncate hash_parted; +begin bulk; +create index on hash_parted(a); +-- make sure small inserts are flushed +insert into hash_parted values(11); +insert into hpart0 values(12); +select * from hash_parted; + a +---- + 12 + 11 +(2 rows) + +-- exercise bulk insert to partitions +insert into hash_parted select generate_series(1,9999); +select count(1) from hash_parted; + count +------- + 10001 +(1 row) + +commit; -- cleanup drop table range_parted, list_parted; drop table hash_parted; diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql index 963faa1614..d3a94f053b 100644 --- a/src/test/regress/sql/insert.sql +++ b/src/test/regress/sql/insert.sql @@ -280,6 +280,19 @@ from hash_parted order by part; -- partitions \d+ list_parted +-- bulk inserts +truncate hash_parted; +begin bulk; +create index on hash_parted(a); +-- make sure small inserts are flushed +insert into hash_parted values(11); +insert into hpart0 values(12); +select * from hash_parted; +-- exercise bulk insert to partitions +insert into hash_parted select generate_series(1,9999); +select count(1) from hash_parted; +commit; + -- cleanup drop table range_parted, list_parted; drop table hash_parted; -- 2.17.0
>From e7d23b52c67a8ffb471c9f21f38ab20816ac2309 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 24 Oct 2020 22:49:01 -0500 Subject: [PATCH v7 2/3] Make INSERT SELECT use multi_insert Renames structures; Move MultipleInsert functions from copyfrom.c to (tentatively) nodeModifyTable.h; Move into MultiInsertInfo: transition_capture and cur_lineno (via cstate->miinfo); See also: 86b85044e823a304d2a265abc030254d39efe7df --- src/backend/commands/copyfrom.c | 394 +---------------------- src/backend/commands/copyfromparse.c | 10 +- src/backend/executor/execMain.c | 2 +- src/backend/executor/execPartition.c | 2 +- src/backend/executor/nodeModifyTable.c | 156 +++++++-- src/include/commands/copyfrom_internal.h | 5 +- src/include/executor/nodeModifyTable.h | 367 +++++++++++++++++++++ src/include/nodes/execnodes.h | 14 +- src/test/regress/expected/insert.out | 16 + src/test/regress/sql/insert.sql | 5 + src/tools/pgindent/typedefs.list | 4 +- 11 files changed, 559 insertions(+), 416 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1b14e9a6eb..c4fe75df8e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -44,54 +44,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * No more than this many tuples per CopyMultiInsertBuffer - * - * Caution: Don't make this too big, as we could end up with this many - * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's - * multiInsertBuffers list. Increasing this can cause quadratic growth in - * memory requirements during copies into partitioned tables with a large - * number of partitions. - */ -#define MAX_BUFFERED_TUPLES 1000 - -/* - * Flush buffers if there are >= this many bytes, as counted by the input - * size, of tuples stored. - */ -#define MAX_BUFFERED_BYTES 65535 - -/* Trim the list of buffers back down to this number after flushing */ -#define MAX_PARTITION_BUFFERS 32 - -/* Stores multi-insert data related to a single relation in CopyFrom. */ -typedef struct CopyMultiInsertBuffer -{ - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ - ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel */ - int nused; /* number of 'slots' containing tuples */ - uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy - * stream */ -} CopyMultiInsertBuffer; - -/* - * Stores one or many CopyMultiInsertBuffers and details about the size and - * number of tuples which are stored in them. This allows multiple buffers to - * exist at once when COPYing into a partitioned table. - */ -typedef struct CopyMultiInsertInfo -{ - List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ - int bufferedTuples; /* number of tuples buffered over all buffers */ - int bufferedBytes; /* number of bytes from all buffered tuples */ - CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */ - EState *estate; /* Executor state used for COPY */ - CommandId mycid; /* Command Id used for COPY */ - int ti_options; /* table insert options */ -} CopyMultiInsertInfo; - - /* non-export function prototypes */ static char *limit_printout_length(const char *str); @@ -109,7 +61,7 @@ CopyFromErrorCallback(void *arg) char curlineno_str[32]; snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, - cstate->cur_lineno); + cstate->miinfo.cur_lineno); if (cstate->opts.binary) { @@ -204,317 +156,6 @@ limit_printout_length(const char *str) return res; } -/* - * Allocate memory and initialize a new CopyMultiInsertBuffer for this - * ResultRelInfo. - */ -static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer; - - buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); - buffer->resultRelInfo = rri; - buffer->bistate = GetBulkInsertState(); - buffer->nused = 0; - - return buffer; -} - -/* - * Make a new buffer for this ResultRelInfo. - */ -static inline void -CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer; - - buffer = CopyMultiInsertBufferInit(rri); - - /* Setup back-link so we can easily find this buffer again */ - rri->ri_CopyMultiInsertBuffer = buffer; - /* Record that we're tracking this buffer */ - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); -} - -/* - * Initialize an already allocated CopyMultiInsertInfo. - * - * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up - * for that table. - */ -static void -CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - CopyFromState cstate, EState *estate, CommandId mycid, - int ti_options) -{ - miinfo->multiInsertBuffers = NIL; - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; - miinfo->cstate = cstate; - miinfo->estate = estate; - miinfo->mycid = mycid; - miinfo->ti_options = ti_options; - - /* - * Only setup the buffer when not dealing with a partitioned table. - * Buffers for partitioned tables will just be setup when we need to send - * tuples their way for the first time. - */ - if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); -} - -/* - * Returns true if the buffers are full - */ -static inline bool -CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) -{ - if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || - miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) - return true; - return false; -} - -/* - * Returns true if we have no buffered tuples - */ -static inline bool -CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) -{ - return miinfo->bufferedTuples == 0; -} - -/* - * Write the tuples stored in 'buffer' out to the table. - */ -static inline void -CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) -{ - MemoryContext oldcontext; - int i; - uint64 save_cur_lineno; - CopyFromState cstate = miinfo->cstate; - EState *estate = miinfo->estate; - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; - bool line_buf_valid = cstate->line_buf_valid; - int nused = buffer->nused; - ResultRelInfo *resultRelInfo = buffer->resultRelInfo; - TupleTableSlot **slots = buffer->slots; - - /* - * Print error context information correctly, if one of the operations - * below fail. - */ - cstate->line_buf_valid = false; - save_cur_lineno = cstate->cur_lineno; - - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } - - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } - - ExecClearTuple(slots[i]); - } - - /* Mark that all slots are free */ - buffer->nused = 0; - - /* reset cur_lineno and line_buf_valid to what they were */ - cstate->line_buf_valid = line_buf_valid; - cstate->cur_lineno = save_cur_lineno; -} - -/* - * Drop used slots and free member for this buffer. - * - * The buffer must be flushed before cleanup. - */ -static inline void -CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) -{ - int i; - - /* Ensure buffer was flushed */ - Assert(buffer->nused == 0); - - /* Remove back-link to ourself */ - buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - - FreeBulkInsertState(buffer->bistate); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); - - table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, - miinfo->ti_options); - - pfree(buffer); -} - -/* - * Write out all stored tuples in all buffers out to the tables. - * - * Once flushed we also trim the tracked buffers list down to size by removing - * the buffers created earliest first. - * - * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being - * used. When cleaning up old buffers we'll never remove the one for - * 'curr_rri'. - */ -static inline void -CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) -{ - ListCell *lc; - - foreach(lc, miinfo->multiInsertBuffers) - { - CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); - - CopyMultiInsertBufferFlush(miinfo, buffer); - } - - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; - - /* - * Trim the list of tracked buffers down if it exceeds the limit. Here we - * remove buffers starting with the ones we created first. It seems less - * likely that these older ones will be needed than the ones that were - * just created. - */ - while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) - { - CopyMultiInsertBuffer *buffer; - - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); - - /* - * We never want to remove the buffer that's currently being used, so - * if we happen to find that then move it to the end of the list. - */ - if (buffer->resultRelInfo == curr_rri) - { - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); - } - - CopyMultiInsertBufferCleanup(miinfo, buffer); - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - } -} - -/* - * Cleanup allocated buffers and free memory - */ -static inline void -CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) -{ - ListCell *lc; - - foreach(lc, miinfo->multiInsertBuffers) - CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); - - list_free(miinfo->multiInsertBuffers); -} - -/* - * Get the next TupleTableSlot that the next tuple should be stored in. - * - * Callers must ensure that the buffer is not full. - * - * Note: 'miinfo' is unused but has been included for consistency with the - * other functions in this area. - */ -static inline TupleTableSlot * -CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; - int nused = buffer->nused; - - Assert(buffer != NULL); - Assert(nused < MAX_BUFFERED_TUPLES); - - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; -} - -/* - * Record the previously reserved TupleTableSlot that was reserved by - * CopyMultiInsertInfoNextFreeSlot as being consumed. - */ -static inline void -CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - TupleTableSlot *slot, int tuplen, uint64 lineno) -{ - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; - - Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); - - /* Store the line number so we can properly report any errors later */ - buffer->linenos[buffer->nused] = lineno; - - /* Record this slot as being used */ - buffer->nused++; - - /* Update how many tuples are stored and their size */ - miinfo->bufferedTuples++; - miinfo->bufferedBytes += tuplen; -} - /* * Copy FROM file to relation. */ @@ -536,7 +177,6 @@ CopyFrom(CopyFromState cstate) int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ uint64 processed = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; @@ -723,7 +363,7 @@ CopyFrom(CopyFromState cstate) * For partitioned tables we can't support multi-inserts when there * are any statement level insert triggers. It might be possible to * allow partitioned tables with such triggers in the future, but for - * now, CopyMultiInsertInfoFlush expects that any before row insert + * now, MultiInsertInfoFlush expects that any before row insert * and statement level insert triggers are on the same relation. */ insertMethod = CIM_SINGLE; @@ -771,7 +411,8 @@ CopyFrom(CopyFromState cstate) else insertMethod = CIM_MULTI; - CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, + MultiInsertInfoInit(&cstate->miinfo, resultRelInfo, + cstate->transition_capture, estate, mycid, ti_options); } @@ -834,7 +475,7 @@ CopyFrom(CopyFromState cstate) Assert(resultRelInfo == target_resultRelInfo); Assert(insertMethod == CIM_MULTI); - myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, + myslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo, resultRelInfo); } @@ -903,18 +544,18 @@ CopyFrom(CopyFromState cstate) /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) { - if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) - CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, + if (resultRelInfo->ri_MultiInsertBuffer == NULL) + MultiInsertInfoSetupBuffer(&cstate->miinfo, resultRelInfo); } else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + !MultiInsertInfoIsEmpty(&cstate->miinfo)) { /* * Flush pending inserts if this partition can't use * batching, so rows are visible to triggers etc. */ - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo); } if (bistate != NULL) @@ -960,7 +601,7 @@ CopyFrom(CopyFromState cstate) /* no other path available for partitioned table */ Assert(insertMethod == CIM_MULTI_CONDITIONAL); - batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, + batchslot = MultiInsertInfoNextFreeSlot(&cstate->miinfo, resultRelInfo); if (map != NULL) @@ -1040,17 +681,17 @@ CopyFrom(CopyFromState cstate) ExecMaterializeSlot(myslot); /* Add this tuple to the tuple buffer */ - CopyMultiInsertInfoStore(&multiInsertInfo, + MultiInsertInfoStore(&cstate->miinfo, resultRelInfo, myslot, cstate->line_buf.len, - cstate->cur_lineno); + cstate->miinfo.cur_lineno); /* * If enough inserts have queued up, then flush all * buffers out to their tables. */ - if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + if (MultiInsertInfoIsFull(&cstate->miinfo)) + MultiInsertInfoFlush(&cstate->miinfo, resultRelInfo); } else { @@ -1109,8 +750,8 @@ CopyFrom(CopyFromState cstate) /* Flush any remaining buffered tuples */ if (insertMethod != CIM_SINGLE) { - if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); + if (!MultiInsertInfoIsEmpty(&cstate->miinfo)) + MultiInsertInfoFlush(&cstate->miinfo, NULL); } /* Done, clean up */ @@ -1144,7 +785,7 @@ CopyFrom(CopyFromState cstate) /* Tear down the multi-insert buffer data */ if (insertMethod != CIM_SINGLE) - CopyMultiInsertInfoCleanup(&multiInsertInfo); + MultiInsertInfoCleanup(&cstate->miinfo); /* Close all the partitioned tables, leaf partitions, and their indices */ if (proute) @@ -1323,7 +964,6 @@ BeginCopyFrom(ParseState *pstate, cstate->reached_eof = false; cstate->eol_type = EOL_UNKNOWN; cstate->cur_relname = RelationGetRelationName(cstate->rel); - cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index b7a37bcdbd..6eb3c1be79 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -456,14 +456,14 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) Assert(!cstate->opts.binary); /* on input just throw the header line away */ - if (cstate->cur_lineno == 0 && cstate->opts.header_line) + if (cstate->miinfo.cur_lineno == 0 && cstate->opts.header_line) { - cstate->cur_lineno++; + cstate->miinfo.cur_lineno++; if (CopyReadLine(cstate)) return false; /* done */ } - cstate->cur_lineno++; + cstate->miinfo.cur_lineno++; /* Actually read the line into memory here */ done = CopyReadLine(cstate); @@ -605,7 +605,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, int16 fld_count; ListCell *cur; - cstate->cur_lineno++; + cstate->miinfo.cur_lineno++; if (!CopyGetInt16(cstate, &fld_count)) { @@ -913,7 +913,7 @@ CopyReadLineText(CopyFromState cstate) * at all --- is cur_lineno a physical or logical count?) */ if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) - cstate->cur_lineno++; + cstate->miinfo.cur_lineno++; } /* Process \r */ diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 7179f589f9..855a89b570 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1247,7 +1247,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, * ExecInitRoutingInfo */ resultRelInfo->ri_PartitionTupleSlot = NULL; /* ditto */ resultRelInfo->ri_ChildToRootMap = NULL; - resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + resultRelInfo->ri_MultiInsertBuffer = NULL; } /* diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 86594bd056..1f8ba785db 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -994,7 +994,7 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); - partRelInfo->ri_CopyMultiInsertBuffer = NULL; + partRelInfo->ri_MultiInsertBuffer = NULL; /* * Keep track of it in the PartitionTupleRouting->partitions array. diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index f65ae2c0d6..05f70f140e 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -43,6 +43,7 @@ #include "access/xact.h" #include "catalog/catalog.h" #include "commands/trigger.h" +#include "commands/copy.h" #include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeModifyTable.h" @@ -391,6 +392,8 @@ ExecInsert(ModifyTableState *mtstate, ModifyTable *node = (ModifyTable *) mtstate->ps.plan; OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; + TupleTableSlot *batchslot = NULL; + bool use_multi_insert = false; /* * If the input result relation is a partitioned table, find the leaf @@ -410,6 +413,37 @@ ExecInsert(ModifyTableState *mtstate, resultRelationDesc = resultRelInfo->ri_RelationDesc; + if (!mtstate->miinfo || + mtstate->operation != CMD_INSERT || onconflict != ONCONFLICT_NONE) + ; /* If multi-inserts aren't possible at all, don't check further .. */ + else if (proute == NULL) + use_multi_insert = true; + else + { + /* + * If a partitioned table itself allows multi-insert, and bistate + * indicates we've inserted the threshold number of tuples, check if + * the partition also supports it. + */ + + /* Determine which triggers exist on this partition */ + // XXX copyfrom.c only checks triggers when the partition changes, + // so maybe use_multi_insert should be in mtstate ? + bool has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row); + + bool has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_instead_row); + + /* + * Disable multi-inserts when the partition has BEFORE/INSTEAD + * OF triggers, or if the partition is a foreign partition. + */ + use_multi_insert = !has_before_insert_row_trig && + !has_instead_insert_row_trig && + resultRelInfo->ri_FdwRoutine == NULL; + } + /* * BEFORE ROW INSERT Triggers. * @@ -631,6 +665,19 @@ ExecInsert(ModifyTableState *mtstate, /* Since there was no insertion conflict, we're done */ } + else if (use_multi_insert) + { + if (resultRelInfo->ri_MultiInsertBuffer == NULL) + MultiInsertInfoSetupBuffer(mtstate->miinfo, resultRelInfo); + + batchslot = MultiInsertInfoNextFreeSlot(mtstate->miinfo, resultRelInfo); + ExecCopySlot(batchslot, slot); + + MultiInsertInfoStore(mtstate->miinfo, resultRelInfo, batchslot, 0, 0); // XXX: tuplen/lineno + + if (MultiInsertInfoIsFull(mtstate->miinfo)) + MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo); + } else { if (proute && mtstate->prevResultRelInfo != resultRelInfo) @@ -640,6 +687,13 @@ ExecInsert(ModifyTableState *mtstate, mtstate->prevResultRelInfo = resultRelInfo; } + /* + * Flush pending inserts if this partition can't use + * batching, so rows are visible to triggers etc. + */ + if (mtstate->miinfo) + MultiInsertInfoFlush(mtstate->miinfo, resultRelInfo); + /* insert the tuple normally */ table_tuple_insert(resultRelationDesc, slot, estate->es_output_cid, @@ -656,32 +710,36 @@ ExecInsert(ModifyTableState *mtstate, if (canSetTag) (estate->es_processed)++; - /* - * If this insert is the result of a partition key update that moved the - * tuple to a new partition, put this row into the transition NEW TABLE, - * if there is one. We need to do this separately for DELETE and INSERT - * because they happen on different tables. - */ - ar_insert_trig_tcs = mtstate->mt_transition_capture; - if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture - && mtstate->mt_transition_capture->tcs_update_new_table) + /* Triggers were already run in the batch insert case */ + if (batchslot == NULL) { - ExecARUpdateTriggers(estate, resultRelInfo, NULL, - NULL, - slot, - NULL, - mtstate->mt_transition_capture); - /* - * We've already captured the NEW TABLE row, so make sure any AR - * INSERT trigger fired below doesn't capture it again. + * If this insert is the result of a partition key update that moved the + * tuple to a new partition, put this row into the transition NEW TABLE, + * if there is one. We need to do this separately for DELETE and INSERT + * because they happen on different tables. */ - ar_insert_trig_tcs = NULL; - } + ar_insert_trig_tcs = mtstate->mt_transition_capture; + if (mtstate->operation == CMD_UPDATE && mtstate->mt_transition_capture + && mtstate->mt_transition_capture->tcs_update_new_table) + { + ExecARUpdateTriggers(estate, resultRelInfo, NULL, + NULL, + slot, + NULL, + mtstate->mt_transition_capture); - /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, - ar_insert_trig_tcs); + /* + * We've already captured the NEW TABLE row, so make sure any AR + * INSERT trigger fired below doesn't capture it again. + */ + ar_insert_trig_tcs = NULL; + } + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, + ar_insert_trig_tcs); + } list_free(recheckIndexes); @@ -2238,9 +2296,52 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; - mtstate->bistate = NULL; - if (operation == CMD_INSERT && insert_in_bulk) + + if (insert_in_bulk && operation == CMD_INSERT && + node->onConflictAction == ONCONFLICT_NONE) mtstate->bistate = GetBulkInsertState(); + else + mtstate->bistate = NULL; + + /* + * Set miinfo if it can support multi-insert. This is the equivalent of + * CIM_SINGLE et al in copyfrom.c + */ + + if (operation != CMD_INSERT || + node->onConflictAction != ONCONFLICT_NONE || + !insert_in_bulk) + mtstate->miinfo = NULL; + else if (mtstate->rootResultRelInfo->ri_TrigDesc != NULL && + (mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_before_row || + // mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_after_row || // XXX or any row level triggers at all? + mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_instead_row)) + /* + * Can't support multi-inserts when there are any BEFORE/INSTEAD OF + * triggers on the table. + */ + mtstate->miinfo = NULL; + else if (node->rootRelation > 0 && + mtstate->rootResultRelInfo->ri_TrigDesc != NULL && + mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_new_table) + + /* + * For partitioned tables we can't support multi-inserts when there + * are any statement level insert triggers. + */ + mtstate->miinfo = NULL; + else if (mtstate->rootResultRelInfo->ri_FdwRoutine != NULL + /* || cstate->volatile_defexprs */ ) + // XXX contain_volatile_functions_not_nextval((Node *) defexpr); + /* Can't support multi-inserts to foreign tables or if there are any */ + mtstate->miinfo = NULL; + else + { + mtstate->miinfo = calloc(sizeof(*mtstate->miinfo), 1); + MultiInsertInfoInit(mtstate->miinfo, mtstate->rootResultRelInfo, + mtstate->mt_transition_capture, + estate, GetCurrentCommandId(true), 0); + } /* set up epqstate with dummy subplan data for the moment */ EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam); @@ -2713,6 +2814,13 @@ ExecEndModifyTable(ModifyTableState *node) table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0); } + if (node->miinfo) + { + if (!MultiInsertInfoIsEmpty(node->miinfo)) + MultiInsertInfoFlush(node->miinfo, node->resultRelInfo); // root ? + MultiInsertInfoCleanup(node->miinfo); + } + /* * Close all the partitioned tables, leaf partitions, and their indices * and release the slot used for tuple routing, if set. diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c15ea803c3..c0603e13ea 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,6 +15,7 @@ #define COPYFROM_INTERNAL_H #include "commands/copy.h" +#include "executor/nodeModifyTable.h" #include "commands/trigger.h" /* @@ -92,10 +93,12 @@ typedef struct CopyFromStateData /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ + /* For bulk inserts and for error callback */ + MultiInsertInfo miinfo; + /* * Working state */ diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 09c312a052..ebe62c2e40 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -13,6 +13,8 @@ #ifndef NODEMODIFYTABLE_H #define NODEMODIFYTABLE_H +#include "commands/trigger.h" +#include "executor/executor.h" // XXX #include "nodes/execnodes.h" extern PGDLLIMPORT bool insert_in_bulk; @@ -25,4 +27,369 @@ extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); +/* Bulk insert stuff which used to live in copy.c */ + +/* + * No more than this many tuples per MultiInsertBuffer + * + * Caution: Don't make this too big, as we could end up with this many + * MultiInsertBuffer items stored in MultiInsertInfo's + * multiInsertBuffers list. Increasing this can cause quadratic growth in + * memory requirements during copies into partitioned tables with a large + * number of partitions. + */ +#define MAX_BUFFERED_TUPLES 1000 + +/* + * Flush buffers if there are >= this many bytes, as counted by the input + * size, of tuples stored. + */ +#define MAX_BUFFERED_BYTES 65535 + +/* Trim the list of buffers back down to this number after flushing */ +#define MAX_PARTITION_BUFFERS 32 + +/* Stores multi-insert data related to a single relation in CopyFrom. */ +typedef struct MultiInsertBuffer +{ + TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ + BulkInsertState bistate; /* BulkInsertState for this rel */ + int nused; /* number of 'slots' containing tuples */ + uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy + * stream */ +} MultiInsertBuffer; + +/* + * Stores one or many MultiInsertBuffers and details about the size and + * number of tuples which are stored in them. This allows multiple buffers to + * exist at once when COPY/INSERTing into a partitioned table. + */ +typedef struct MultiInsertInfo +{ + List *multiInsertBuffers; /* List of tracked MultiInsertBuffers */ + int bufferedTuples; /* number of tuples buffered over all buffers */ + int bufferedBytes; /* number of bytes from all buffered tuples */ + TransitionCaptureState *transition_capture; + EState *estate; /* Executor state */ + CommandId mycid; /* Command Id */ + int ti_options; /* table insert options */ + + /* Line number for errors in copyfrom.c */ + uint64 cur_lineno; + bool line_buf_valid; +} MultiInsertInfo; + + +/* + * Allocate memory and initialize a new MultiInsertBuffer for this + * ResultRelInfo. + */ +static MultiInsertBuffer * +MultiInsertBufferInit(ResultRelInfo *rri) +{ + MultiInsertBuffer *buffer; + + buffer = (MultiInsertBuffer *) palloc(sizeof(MultiInsertBuffer)); + memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->resultRelInfo = rri; + buffer->bistate = GetBulkInsertState(); + buffer->nused = 0; + + return buffer; +} + +/* + * Make a new buffer for this ResultRelInfo. + */ +static inline void +MultiInsertInfoSetupBuffer(MultiInsertInfo *miinfo, + ResultRelInfo *rri) +{ + MultiInsertBuffer *buffer; + + buffer = MultiInsertBufferInit(rri); + + /* Setup back-link so we can easily find this buffer again */ + rri->ri_MultiInsertBuffer = buffer; + /* Record that we're tracking this buffer */ + miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); +} + +/* + * Initialize an already allocated MultiInsertInfo. + * + * If rri is a non-partitioned table then a MultiInsertBuffer is set up + * for that table. + */ +static inline void +MultiInsertInfoInit(MultiInsertInfo *miinfo, ResultRelInfo *rri, + TransitionCaptureState *transition_capture, + EState *estate, CommandId mycid, int ti_options) +{ + miinfo->multiInsertBuffers = NIL; + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; + miinfo->transition_capture = transition_capture; + miinfo->estate = estate; + miinfo->mycid = mycid; + miinfo->ti_options = ti_options; + miinfo->cur_lineno = 0; + + /* + * Only setup the buffer when not dealing with a partitioned table. + * Buffers for partitioned tables will just be setup when we need to send + * tuples their way for the first time. + */ + if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + MultiInsertInfoSetupBuffer(miinfo, rri); +} + +/* + * Returns true if the buffers are full + */ +static inline bool +MultiInsertInfoIsFull(MultiInsertInfo *miinfo) +{ + if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || + miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) + return true; + return false; +} + +/* + * Returns true if we have no buffered tuples + */ +static inline bool +MultiInsertInfoIsEmpty(MultiInsertInfo *miinfo) +{ + return miinfo->bufferedTuples == 0; +} + +/* + * Write the tuples stored in 'buffer' out to the table. + */ +static inline void +MultiInsertBufferFlush(MultiInsertInfo *miinfo, + MultiInsertBuffer *buffer) +{ + MemoryContext oldcontext; + int i; + uint64 save_cur_lineno; + EState *estate = miinfo->estate; + CommandId mycid = miinfo->mycid; + int ti_options = miinfo->ti_options; + bool line_buf_valid = miinfo->line_buf_valid; + int nused = buffer->nused; + ResultRelInfo *resultRelInfo = buffer->resultRelInfo; + TupleTableSlot **slots = buffer->slots; + + /* + * Print error context information correctly, if one of the operations + * below fail. + */ + miinfo->line_buf_valid = false; + save_cur_lineno = miinfo->cur_lineno; + + /* + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); // XXX requires executor.h + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + miinfo->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + buffer->slots[i], estate, false, NULL, + NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + miinfo->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + miinfo->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, miinfo->transition_capture); + } + + ExecClearTuple(slots[i]); + } + + /* Mark that all slots are free */ + buffer->nused = 0; + + /* reset cur_lineno and line_buf_valid to what they were */ + miinfo->line_buf_valid = line_buf_valid; + miinfo->cur_lineno = save_cur_lineno; +} + +/* + * Drop used slots and free member for this buffer. + * + * The buffer must be flushed before cleanup. + */ +static inline void +MultiInsertBufferCleanup(MultiInsertInfo *miinfo, + MultiInsertBuffer *buffer) +{ + int i; + + /* Ensure buffer was flushed */ + Assert(buffer->nused == 0); + + /* Remove back-link to ourself */ + buffer->resultRelInfo->ri_MultiInsertBuffer = NULL; + + FreeBulkInsertState(buffer->bistate); + + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); + + table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, + miinfo->ti_options); + + pfree(buffer); +} + +/* + * Write out all stored tuples in all buffers out to the tables. + * + * Once flushed we also trim the tracked buffers list down to size by removing + * the buffers created earliest first. + * + * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being + * used. When cleaning up old buffers we'll never remove the one for + * 'curr_rri'. + */ +static inline void +MultiInsertInfoFlush(MultiInsertInfo *miinfo, ResultRelInfo *curr_rri) +{ + ListCell *lc; + + foreach(lc, miinfo->multiInsertBuffers) + { + MultiInsertBuffer *buffer = (MultiInsertBuffer *) lfirst(lc); + + MultiInsertBufferFlush(miinfo, buffer); + } + + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; + + /* + * Trim the list of tracked buffers down if it exceeds the limit. Here we + * remove buffers starting with the ones we created first. It seems less + * likely that these older ones will be needed than the ones that were + * just created. + */ + while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) + { + MultiInsertBuffer *buffer; + + buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + + /* + * We never want to remove the buffer that's currently being used, so + * if we happen to find that then move it to the end of the list. + */ + if (buffer->resultRelInfo == curr_rri) + { + miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); + miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); + buffer = (MultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + } + + MultiInsertBufferCleanup(miinfo, buffer); + miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); + } +} + +/* + * Cleanup allocated buffers and free memory + */ +static inline void +MultiInsertInfoCleanup(MultiInsertInfo *miinfo) +{ + ListCell *lc; + + foreach(lc, miinfo->multiInsertBuffers) + MultiInsertBufferCleanup(miinfo, lfirst(lc)); + + list_free(miinfo->multiInsertBuffers); +} + +/* + * Get the next TupleTableSlot that the next tuple should be stored in. + * + * Callers must ensure that the buffer is not full. + * + * Note: 'miinfo' is unused but has been included for consistency with the + * other functions in this area. + */ +static inline TupleTableSlot * +MultiInsertInfoNextFreeSlot(MultiInsertInfo *miinfo, + ResultRelInfo *rri) +{ + MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer; + int nused = buffer->nused; + + Assert(buffer != NULL); + Assert(nused < MAX_BUFFERED_TUPLES); + + if (buffer->slots[nused] == NULL) + buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); + return buffer->slots[nused]; +} + +/* + * Record the previously reserved TupleTableSlot that was reserved by + * MultiInsertInfoNextFreeSlot as being consumed. + */ +static inline void +MultiInsertInfoStore(MultiInsertInfo *miinfo, ResultRelInfo *rri, + TupleTableSlot *slot, int tuplen, uint64 lineno) +{ + MultiInsertBuffer *buffer = rri->ri_MultiInsertBuffer; + + Assert(buffer != NULL); + Assert(slot == buffer->slots[buffer->nused]); + + /* Store the line number so we can properly report any errors later */ + buffer->linenos[buffer->nused] = lineno; + + /* Record this slot as being used */ + buffer->nused++; + + /* Update how many tuples are stored and their size */ + miinfo->bufferedTuples++; + miinfo->bufferedBytes += tuplen; +} + #endif /* NODEMODIFYTABLE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 024b0e4da4..ab7b8fb51b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -33,6 +33,9 @@ #include "utils/tuplesort.h" #include "utils/tuplestore.h" +/* This would be a circular inclusion */ +// #include "executor/nodeModifyTable.h" + struct PlanState; /* forward references in this file */ struct ParallelHashJoinState; struct ExecRowMark; @@ -40,8 +43,8 @@ struct ExprState; struct ExprContext; struct RangeTblEntry; /* avoid including parsenodes.h here */ struct ExprEvalStep; /* avoid including execExpr.h everywhere */ -struct CopyMultiInsertBuffer; - +// struct MultiInsertBuffer; +// struct MultiInsertInfo; /* ---------------- * ExprState node @@ -499,8 +502,8 @@ typedef struct ResultRelInfo */ TupleConversionMap *ri_ChildToRootMap; - /* for use by copyfrom.c when performing multi-inserts */ - struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; + /* for use by copyfrom.c/modifyTable when performing multi-inserts */ + struct MultiInsertBuffer *ri_MultiInsertBuffer; } ResultRelInfo; /* ---------------- @@ -1166,8 +1169,9 @@ typedef struct ModifyTableState List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ - BulkInsertState bistate; /* state for bulk insert like INSERT SELECT */ + BulkInsertState bistate; /* state for bulk insert like INSERT SELECT, when miinfo cannot be used */ ResultRelInfo *prevResultRelInfo; /* last child inserted with bistate */ + struct MultiInsertInfo *miinfo; /* * Slot for storing tuples in the root partitioned table's rowtype during diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out index da0dae6240..e0c83d7427 100644 --- a/src/test/regress/expected/insert.out +++ b/src/test/regress/expected/insert.out @@ -485,6 +485,22 @@ select count(1) from hash_parted; (1 row) commit; +-- test that index was updated +vacuum analyze hash_parted; +explain(costs off) +select * from hash_parted where a=13; + QUERY PLAN +---------------------------------------------------------- + Index Only Scan using hpart1_a_idx on hpart1 hash_parted + Index Cond: (a = 13) +(2 rows) + +select * from hash_parted where a=13; + a +---- + 13 +(1 row) + -- cleanup drop table range_parted, list_parted; drop table hash_parted; diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql index d3a94f053b..99ec18d9a2 100644 --- a/src/test/regress/sql/insert.sql +++ b/src/test/regress/sql/insert.sql @@ -292,6 +292,11 @@ select * from hash_parted; insert into hash_parted select generate_series(1,9999); select count(1) from hash_parted; commit; +-- test that index was updated +vacuum analyze hash_parted; +explain(costs off) +select * from hash_parted where a=13; +select * from hash_parted where a=13; -- cleanup drop table range_parted, list_parted; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b8ca8cffd9..42415c9c4c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -422,8 +422,6 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyInsertMethod -CopyMultiInsertBuffer -CopyMultiInsertInfo CopyState CopyStateData CopyStmt @@ -1388,6 +1386,8 @@ ModifyTableState MorphOpaque MsgType MultiAssignRef +MultiInsertBuffer +MultiInsertInfo MultiSortSupport MultiSortSupportData MultiXactId -- 2.17.0
>From 24d46071082aa4a87c39829ea4908c298799a940 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Mon, 23 Nov 2020 17:57:24 -0600 Subject: [PATCH v7 3/3] Dynamically switch to multi-insert mode.. by popular request --- src/backend/executor/nodeModifyTable.c | 45 +++++++++++++++++++------- src/backend/tcop/utility.c | 4 --- src/backend/utils/misc/guc.c | 19 ++++++----- src/include/executor/nodeModifyTable.h | 3 +- src/include/nodes/execnodes.h | 1 + src/test/regress/expected/insert.out | 4 +++ src/test/regress/sql/insert.sql | 2 ++ 7 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 05f70f140e..9b774d502a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -74,7 +74,7 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, TupleTableSlot *slot, ResultRelInfo **partRelInfo); /* guc */ -bool insert_in_bulk = false; +int bulk_insert_ntuples = 1000; /* * Verify that the tuples to be produced by INSERT or UPDATE match the @@ -413,11 +413,29 @@ ExecInsert(ModifyTableState *mtstate, resultRelationDesc = resultRelInfo->ri_RelationDesc; + /* Use bulk insert after a threshold number of tuples */ + // XXX: maybe this should only be done if it's not a partitioned table or + // if the partitions don't support miinfo, which uses its own bistates + mtstate->ntuples++; + if (mtstate->bistate == NULL && + mtstate->operation == CMD_INSERT && + onconflict == ONCONFLICT_NONE && + mtstate->ntuples > bulk_insert_ntuples && + bulk_insert_ntuples >= 0) + { + elog(DEBUG1, "enabling bulk insert"); + mtstate->bistate = GetBulkInsertState(); + } + if (!mtstate->miinfo || mtstate->operation != CMD_INSERT || onconflict != ONCONFLICT_NONE) ; /* If multi-inserts aren't possible at all, don't check further .. */ else if (proute == NULL) - use_multi_insert = true; + { + if (mtstate->miinfo->ntuples++ >= bulk_insert_ntuples && + bulk_insert_ntuples >= 0) + use_multi_insert = true; + } else { /* @@ -438,12 +456,21 @@ ExecInsert(ModifyTableState *mtstate, /* * Disable multi-inserts when the partition has BEFORE/INSTEAD * OF triggers, or if the partition is a foreign partition. + * The number of tuples eligible for multi-insert is tracked separately + * from the total number of tuples in case it's not supported for some + * partitions. */ - use_multi_insert = !has_before_insert_row_trig && + if (!has_before_insert_row_trig && !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + resultRelInfo->ri_FdwRoutine == NULL && + mtstate->miinfo->ntuples++ >= bulk_insert_ntuples && + bulk_insert_ntuples >= 0) + use_multi_insert = true; } + if (use_multi_insert && mtstate->miinfo->ntuples - 1 == bulk_insert_ntuples) + elog(DEBUG1, "enabling multi insert"); + /* * BEFORE ROW INSERT Triggers. * @@ -2296,12 +2323,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; - - if (insert_in_bulk && operation == CMD_INSERT && - node->onConflictAction == ONCONFLICT_NONE) - mtstate->bistate = GetBulkInsertState(); - else - mtstate->bistate = NULL; + mtstate->bistate = NULL; /* * Set miinfo if it can support multi-insert. This is the equivalent of @@ -2309,8 +2331,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) */ if (operation != CMD_INSERT || - node->onConflictAction != ONCONFLICT_NONE || - !insert_in_bulk) + node->onConflictAction != ONCONFLICT_NONE) mtstate->miinfo = NULL; else if (mtstate->rootResultRelInfo->ri_TrigDesc != NULL && (mtstate->rootResultRelInfo->ri_TrigDesc->trig_insert_before_row || diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index a0a4034409..81ac9b1cb2 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -611,10 +611,6 @@ standard_ProcessUtility(PlannedStmt *pstmt, SetPGVariable("transaction_deferrable", list_make1(item->arg), true); - else if (strcmp(item->defname, "bulk") == 0) - SetPGVariable("bulk_insert", - list_make1(item->arg), - true); } } break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index c470314134..1126740021 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2037,16 +2037,6 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, - { - {"bulk_insert", PGC_USERSET, CLIENT_CONN_STATEMENT, - gettext_noop("Sets the transaction to bulk insert mode."), - gettext_noop("A ring buffer of limited size will be used."), - }, - &insert_in_bulk, - false, - NULL, NULL, NULL - }, - /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3410,6 +3400,15 @@ static struct config_int ConfigureNamesInt[] = check_huge_page_size, NULL, NULL }, + { + {"bulk_insert_ntuples", PGC_USERSET, CLIENT_CONN_STATEMENT, + gettext_noop("Enable bulk insertions after this number of tuples."), + gettext_noop("A ring buffer of limited size will be used and updates done in batch"), + }, + &bulk_insert_ntuples, + 1000, -1, INT_MAX, + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index ebe62c2e40..71de7cf80e 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -17,7 +17,7 @@ #include "executor/executor.h" // XXX #include "nodes/execnodes.h" -extern PGDLLIMPORT bool insert_in_bulk; +extern PGDLLIMPORT int bulk_insert_ntuples; extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, @@ -74,6 +74,7 @@ typedef struct MultiInsertInfo EState *estate; /* Executor state */ CommandId mycid; /* Command Id */ int ti_options; /* table insert options */ + size_t ntuples; /* Number of rows *eligible* for multi-insert */ /* Line number for errors in copyfrom.c */ uint64 cur_lineno; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index ab7b8fb51b..477b326d06 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1172,6 +1172,7 @@ typedef struct ModifyTableState BulkInsertState bistate; /* state for bulk insert like INSERT SELECT, when miinfo cannot be used */ ResultRelInfo *prevResultRelInfo; /* last child inserted with bistate */ struct MultiInsertInfo *miinfo; + size_t ntuples; /* Number of tuples inserted; */ /* * Slot for storing tuples in the root partitioned table's rowtype during diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out index e0c83d7427..b894180152 100644 --- a/src/test/regress/expected/insert.out +++ b/src/test/regress/expected/insert.out @@ -477,7 +477,11 @@ select * from hash_parted; (2 rows) -- exercise bulk insert to partitions +SET client_min_messages=debug; insert into hash_parted select generate_series(1,9999); +DEBUG: enabling bulk insert +DEBUG: enabling multi insert +RESET client_min_messages; select count(1) from hash_parted; count ------- diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql index 99ec18d9a2..ff9b57af5e 100644 --- a/src/test/regress/sql/insert.sql +++ b/src/test/regress/sql/insert.sql @@ -289,7 +289,9 @@ insert into hash_parted values(11); insert into hpart0 values(12); select * from hash_parted; -- exercise bulk insert to partitions +SET client_min_messages=debug; insert into hash_parted select generate_series(1,9999); +RESET client_min_messages; select count(1) from hash_parted; commit; -- test that index was updated -- 2.17.0