Hi, On Sun, Apr 6, 2025 at 8:55 PM Jingtang Zhang <mrdrivingd...@gmail.com> wrote: > > It was quite a while since I last looked at the patch. I've tested it again, > and still get regression on patched version where a table has many columns. > And it is totally CPU-bounded on tts_virtual_copyslot. > > Unpatched version: > 1 col: > Time: 8909.714 ms (00:08.910) > Time: 8803.579 ms (00:08.804) > Time: 8600.415 ms (00:08.600) > 32 cols: > Time: 12911.699 ms (00:12.912) > Time: 13543.491 ms (00:13.543) > Time: 13325.368 ms (00:13.325) > > Patched version: > 1 col: > Time: 3532.841 ms (00:03.533) > Time: 3598.223 ms (00:03.598) > Time: 3515.858 ms (00:03.516) > 32 cols: > Time: 35647.724 ms (00:35.648) > Time: 35596.233 ms (00:35.596) > Time: 35669.106 ms (00:35.669) >
Hm, maybe I didn't choose the best way to measure performance. Can you please share how you do it? > I've tested your patch with tuplestore and found the regression does not exist > anymore, but I haven't look deep inside it. > > Patched version (with tuplestore): > 1 col: > Time: 3500.502 ms (00:03.501) > Time: 3486.886 ms (00:03.487) > Time: 3514.233 ms (00:03.514) > 32 cols: > Time: 10375.391 ms (00:10.375) > Time: 10248.256 ms (00:10.248) > Time: 10248.289 ms (00:10.248) > > It seems to be a good idea if there is no other issue with your patch. As far as I understand, the use of multi inserts for queries like "INSERT INTO ... SELECT FROM" is not discussed here anymore due to the fact that in such cases we will have to take into account the volatile functions and ROW triggers. I've been thinking about this for a while and made a patch as an experiment. The principles that the patch works on are listed below. 1) Since performance decreases for single INSERTs (within a multi inserts mechanism), I designed this feature as an option for the table. Thus, if the user knows that he will perform a lot of inserts on the table, he can specify "WITH (append_optimized=true)". 2) The availability of volatile functions is monitored during the construction of a subtree for a ModifyTable node. I'm not that familiar with the query plan construction mechanism, but it seems to me that this way we can track any occurrence of volatile functions. Of course, most volatile functions don't stop us from using multi inserts, but checking each such function would take a very long time, so the very fact of having a volatile function is enough for us to abandon multi-inserts. 3) Default expressions of the target table are also checked for volatile functions. The same rules apply to them as in (2). As an exception, I allowed the use of SERIAL in the column data type, since this is a fairly common use case. 4) If the target table contains any ROW triggers, we don't use multi insert. 5) Patch also contains a regression test. This is a "sandbox" where you can do some experiments with append-optimized tables. I hope that patch (targeted on 'master' branch, 2c7bd2ba507e273f2d7fe1b2f6d30775ed4f3c09) will be useful for this thread. -- Best regards, Daniil Davydov
From 224378c11d270aabe28bdd32efacd37ed1984bd1 Mon Sep 17 00:00:00 2001 From: Daniil Davidov <d.davy...@postgrespro.ru> Date: Mon, 7 Apr 2025 12:55:50 +0700 Subject: [PATCH v1] Meet append optimized tables --- src/backend/access/common/reloptions.c | 11 + src/backend/access/heap/heapam.c | 205 ++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableamapi.c | 5 + src/backend/commands/explain.c | 5 +- src/backend/executor/execExpr.c | 17 +- src/backend/executor/execProcnode.c | 9 + src/backend/executor/nodeModifyTable.c | 194 ++++++++++++++++- src/backend/optimizer/plan/createplan.c | 1 + src/backend/optimizer/util/clauses.c | 28 ++- src/include/access/heapam.h | 41 ++++ src/include/access/tableam.h | 84 +++++++ src/include/nodes/execnodes.h | 6 + src/include/nodes/plannodes.h | 2 + src/include/optimizer/optimizer.h | 3 + src/include/utils/rel.h | 10 + .../regress/expected/append_optimized.out | 161 ++++++++++++++ src/test/regress/parallel_schedule | 2 + src/test/regress/sql/append_optimized.sql | 105 +++++++++ 19 files changed, 879 insertions(+), 15 deletions(-) create mode 100644 src/test/regress/expected/append_optimized.out create mode 100644 src/test/regress/sql/append_optimized.sql diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 46c1dce222d..9652cf4179b 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -166,6 +166,15 @@ static relopt_bool boolRelOpts[] = }, true }, + { + { + "append_optimized", + "Enables using batching for insertion algorithm whenever it possible", + RELOPT_KIND_HEAP, + AccessExclusiveLock + }, + false + }, /* list terminator */ {{NULL}} }; @@ -1905,6 +1914,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) offsetof(StdRdOptions, vacuum_index_cleanup)}, {"vacuum_truncate", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, vacuum_truncate), offsetof(StdRdOptions, vacuum_truncate_set)}, + {"append_optimized", RELOPT_TYPE_BOOL, + offsetof(StdRdOptions, append_optimized)}, {"vacuum_max_eager_freeze_failure_rate", RELOPT_TYPE_REAL, offsetof(StdRdOptions, vacuum_max_eager_freeze_failure_rate)} }; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ed2e3021799..415eef4c35d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -51,6 +51,7 @@ #include "utils/datum.h" #include "utils/injection_point.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/spccache.h" #include "utils/syscache.h" @@ -106,6 +107,7 @@ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); +static void heap_modify_insert_end(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2674,6 +2676,209 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + Assert(RelationIsAppendOptimized(rel)); + context = AllocSetContextCreate(TopTransactionContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc(sizeof(TableModifyState)); + state->rel = rel; + state->cid = cid; + state->options = options; + state->mem_ctx = context; + state->buffer_flush_cb = buffer_flush_cb; + state->buffer_flush_ctx = buffer_flush_ctx; + state->data = NULL; /* To be set lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + Assert(RelationIsAppendOptimized(state->rel)); + oldcontext = MemoryContextSwitchTo(state->mem_ctx); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + mistate = + (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState)); + mistate->slots = + (TupleTableSlot **) palloc0(sizeof(void *) * HEAP_MAX_BUFFERED_SLOTS); + mistate->tstore = tuplestore_begin_heap(false, false, work_mem); + mistate->nused = 0; + istate->mistate = mistate; + + /* + * heap_multi_insert() can leak memory. So switch to this memory + * context before every heap_multi_insert() call and reset when + * finished. + */ + mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert memory context", + ALLOCSET_DEFAULT_SIZES); + istate->bistate = GetBulkInsertState(); + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + tuplestore_puttupleslot(mistate->tstore, slot); + mistate->nused += 1; + + if (mistate->nused >= HEAP_MAX_BUFFERED_SLOTS) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + TupleDesc tupdesc; + + Assert(RelationIsAppendOptimized(state->rel)); + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + tupdesc = RelationGetDescr(state->rel); + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + /* Quick exit if we have flushed already */ + if (mistate->nused == 0) + return; + + for (int i = 0; i < mistate->nused; i++) + { + bool ok; + + if (istate->mistate->slots[i] == NULL) + { + istate->mistate->slots[i] = + MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple); + } + ok = tuplestore_gettupleslot(mistate->tstore, true, false, + istate->mistate->slots[i]); + Assert(ok); + } + + /* + * heap_multi_insert() can leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(mistate->mem_ctx); + heap_multi_insert(state->rel, + mistate->slots, + mistate->nused, + state->cid, + state->options, + istate->bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->mem_ctx); + + /* + * Invoke caller-supplied buffer flush callback after inserting rows from + * the buffers to heap. + */ + if (state->buffer_flush_cb != NULL) + { + for (int i = 0; i < mistate->nused; i++) + { + state->buffer_flush_cb(state->buffer_flush_ctx, + mistate->slots[i]); + } + } + + tuplestore_clear(mistate->tstore); + mistate->nused = 0; +} + +/* + * Heap insert specific function used for performing work at the end like + * flushing remaining buffered tuples, cleaning up the insert state and tuple + * table slots used for buffered tuples etc. + */ +static void +heap_modify_insert_end(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->nused == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + tuplestore_end(mistate->tstore); + MemoryContextDelete(mistate->mem_ctx); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + heap_modify_insert_end(state); + MemoryContextDelete(state->mem_ctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index ac082fefa77..56880165ed0 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2643,6 +2643,11 @@ static const TableAmRoutine heapam_methods = { .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_fetch_row_version = heapam_fetch_row_version, .tuple_get_latest_tid = heap_get_latest_tid, .tuple_tid_valid = heapam_tuple_tid_valid, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 476663b66aa..ae30c5a21a8 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -94,6 +94,11 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + Assert(routine->tuple_modify_begin != NULL); + Assert(routine->tuple_modify_buffer_insert != NULL); + Assert(routine->tuple_modify_buffer_flush != NULL); + Assert(routine->tuple_modify_end != NULL); + return routine; } diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index ef8aa489af8..31ce1fa7acb 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1399,7 +1399,10 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (((ModifyTable *) plan)->operation) { case CMD_INSERT: - pname = operation = "Insert"; + if (((ModifyTable *) plan)->canUseBatching) + pname = operation = "MultiInsert"; + else + pname = operation = "Insert"; break; case CMD_UPDATE: pname = operation = "Update"; diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index f1569879b52..f2d3a236fbc 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -103,7 +103,11 @@ static void ExecInitJsonCoercion(ExprState *state, JsonReturning *returning, ErrorSaveContext *escontext, bool omit_quotes, bool exists_coerce, Datum *resv, bool *resnull); - +/* + * Every time when we find volatile function during expresstion evaluating, we + * must set this flag, so higher level code can process it appropriately. + */ +static bool volatile_func_flag = false; /* * ExecInitExpr: prepare an expression tree for execution @@ -264,6 +268,9 @@ ExecInitQual(List *qual, PlanState *parent) scratch.resvalue = &state->resvalue; scratch.resnull = &state->resnull; + /* Reset flag indicating the presence of volatile functions in qual */ + volatile_func_flag = false; + foreach_ptr(Expr, node, qual) { /* first evaluate expression */ @@ -276,6 +283,10 @@ ExecInitQual(List *qual, PlanState *parent) state->steps_len - 1); } + /* Possibly update information about batch-insert-capability */ + if (parent && !parent->has_volatile) + parent->has_volatile = volatile_func_flag; + /* adjust jump targets */ foreach_int(jump, adjust_jumps) { @@ -1193,6 +1204,10 @@ ExecInitExprRec(Expr *node, ExprState *state, { FuncExpr *func = (FuncExpr *) node; + /* Higher level code will handle it */ + if (func_volatile(func->funcid)) + volatile_func_flag = true; + ExecInitFunc(&scratch, node, func->args, func->funcid, func->inputcollid, state); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..2383ef7ea4b 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -416,6 +416,15 @@ ExecInitNode(Plan *node, EState *estate, int eflags) result->instrument = InstrAlloc(1, estate->es_instrument, result->async_capable); + /* Check whether some nodes below has volatile functions */ + if ((outerPlanState(result) != NULL && + outerPlanState(result)->has_volatile) || + (innerPlanState(result) != NULL && + innerPlanState(result)->has_volatile)) + { + result->has_volatile = true; + } + return result; } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 309e27f8b5f..bbaf91bcbac 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -55,6 +55,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_proc.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -67,6 +68,8 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/datum.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -130,6 +133,18 @@ typedef struct UpdateContext LockTupleMode lockmode; } UpdateContext; +typedef struct InsertModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; + ModifyTableState *mtstate; +} InsertModifyBufferFlushContext; + +static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL; +static TableModifyState *table_modify_state = NULL; + +static void InsertModifyBufferFlushCallback(void *context, + TupleTableSlot *slot); static void ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, @@ -174,6 +189,8 @@ static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, bool canSetTag); +static bool ContainVolatileFunctionsChecker(Oid func_id, void *context); +static bool IsMultiInsertCapable(ModifyTableState *mtstate); /* * Verify that the tuples to be produced by INSERT match the @@ -806,6 +823,31 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, return ExecProject(newProj); } +static void +InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot) +{ + InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + + /* Caller must take care of opening and closing the indices */ + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, false); + list_free(recheckIndexes); + } +} + /* ---------------------------------------------------------------- * ExecInsert * @@ -1209,17 +1251,22 @@ ExecInsert(ModifyTableContext *context, } else { - /* insert the tuple normally */ - table_tuple_insert(resultRelationDesc, slot, - estate->es_output_cid, - 0, NULL); - - /* insert index entries for tuple */ - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, - false, NULL, NIL, - false); + if (table_modify_state != NULL) + table_modify_buffer_insert(table_modify_state, slot); + else + { + /* insert the tuple normally */ + table_tuple_insert(resultRelationDesc, slot, + estate->es_output_cid, + 0, NULL); + + /* insert index entries for tuple */ + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, + false); + } } } @@ -4586,6 +4633,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_mergeActionLists = mergeActionLists; mtstate->mt_mergeJoinConditions = mergeJoinConditions; + /* + * Previous ModifyTable node execution (if any) should have released + * these resources. + */ + Assert(insert_modify_buffer_flush_context == NULL && + table_modify_state == NULL); + /*---------- * Resolve the target relation. This is the same as: * @@ -4999,6 +5053,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) * * We only do this for INSERT, so that for UPDATE/DELETE the batch size * remains set to 0. + * + * Also determine whether we can use batching for this INSERT command. */ if (operation == CMD_INSERT) { @@ -5016,6 +5072,27 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } else resultRelInfo->ri_BatchSize = 1; + + if (IsMultiInsertCapable(mtstate)) + { + insert_modify_buffer_flush_context = + (InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext)); + insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo; + insert_modify_buffer_flush_context->estate = estate; + insert_modify_buffer_flush_context->mtstate = mtstate; + + Assert(estate->es_output_cid != InvalidCommandId); + + table_modify_state = + table_modify_begin(resultRelInfo->ri_RelationDesc, + estate->es_output_cid, + 0, + InsertModifyBufferFlushCallback, + insert_modify_buffer_flush_context); + + /* For more accurate EXPLAIN output */ + node->canUseBatching = true; + } } /* @@ -5034,6 +5111,90 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) return mtstate; } +/* + * Returns true if batch insert can be performed in table whithin current query. + * We impose the following rules: + * 1) Batching is supported only for ordinary tables without ROW triggers + * and with append_optimized option set. + * 2) Batching is not supported for queries, containing RETURNING clause. + * 3) Batching is not supported for queries, containing any volatile + * functions in plan tree. + * 4) Batching is supported only for tables, that hasn't volatile default + * expressions. + */ +static bool +IsMultiInsertCapable(ModifyTableState *mtstate) +{ + ResultRelInfo *relinfo = mtstate->resultRelInfo; + TupleDesc tdesc = RelationGetDescr(relinfo->ri_RelationDesc); + bool has_row_triggers; + + Assert(mtstate->operation == CMD_INSERT); + + has_row_triggers = + (relinfo->ri_TrigDesc != NULL && + (relinfo->ri_TrigDesc->trig_insert_after_row || + relinfo->ri_TrigDesc->trig_insert_before_row || + relinfo->ri_TrigDesc->trig_insert_instead_row)); + + /* Check (1) - (3) conditions. */ + if (!RelationIsAppendOptimized(relinfo->ri_RelationDesc) || + relinfo->ri_projectReturning || + has_row_triggers) + { + return false; + } + + /* Check last condition. */ + + /* + * By default, this variable is calculated in the end of ExecInitNode + * processing, but we need it now. + */ + if ((outerPlanState(mtstate) != NULL && + outerPlanState(mtstate)->has_volatile) || + (innerPlanState(mtstate) != NULL && + innerPlanState(mtstate)->has_volatile)) + { + mtstate->ps.has_volatile = true; + return false; + } + + for (AttrNumber i = 0; i < tdesc->natts; i++) + { + Node *defexpr; + if (!TupleDescAttr(tdesc, i)->atthasdef) + continue; + + defexpr = TupleDescGetDefault(tdesc, i + 1); + if (contain_volatile_functions_extended(defexpr, + ContainVolatileFunctionsChecker)) + { + return false; + } + } + + /* All conditions are met - we can perform batch insert on table. */ + return true; +} + +/* + * Supportive function for IsMultiInsertCapable. + * + * To decide whether we can use batching, we should iterate across all default + * expressions in target table and check if they contain any volatile functions. + * + * But not all functions are considered dangerous in terms of batching. We can + * allow some volatile functions to appear in default expressions. For now, we + * only allow to use nextval (in order not to dismiss batching if target table + * has SERIAL filed). + */ +static bool ContainVolatileFunctionsChecker(Oid func_id, void *context) +{ + return (func_volatile(func_id) == PROVOLATILE_VOLATILE && + func_id != F_NEXTVAL); +} + /* ---------------------------------------------------------------- * ExecEndModifyTable * @@ -5047,6 +5208,17 @@ ExecEndModifyTable(ModifyTableState *node) { int i; + if (table_modify_state != NULL) + { + Assert(node->operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Allow any FDWs to shut down */ diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index a8f22a8c154..7bf13de1e93 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -7133,6 +7133,7 @@ make_modifytable(PlannerInfo *root, Plan *subplan, node->operation = operation; node->canSetTag = canSetTag; + node->canUseBatching = false; node->nominalRelation = nominalRelation; node->rootRelation = rootRelation; node->partColsUpdated = partColsUpdated; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 26a3e050086..91ee85e9157 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -157,6 +157,14 @@ static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); static bool pull_paramids_walker(Node *node, Bitmapset **context); +/* + * Allow user to supply specific checker for "contain_volatile_functions" call. + * In general it is not used, but for example append-optimized tables needs to + * ignore some types of volatile functions during default expressions check. + */ + +static bool contain_volatile_functions_checker(Oid func_id, void *context); +static check_function_callback checker = contain_volatile_functions_checker; /***************************************************************************** * Aggregate-function clause manipulation @@ -541,6 +549,23 @@ contain_volatile_functions(Node *clause) return contain_volatile_functions_walker(clause, NULL); } +/* + * Same as above, but allows to specify user-defined check_function_callback. + */ +bool +contain_volatile_functions_extended(Node *clause, + check_function_callback ud_checker) +{ + bool res; + check_function_callback prev_checker = checker; + + checker = ud_checker; + res = contain_volatile_functions_walker(clause, NULL); + checker = prev_checker; + + return res; +} + static bool contain_volatile_functions_checker(Oid func_id, void *context) { @@ -553,8 +578,7 @@ contain_volatile_functions_walker(Node *node, void *context) if (node == NULL) return false; /* Check for volatile functions in node itself */ - if (check_functions_in_node(node, contain_volatile_functions_checker, - context)) + if (check_functions_in_node(node, checker, context)) return true; if (IsA(node, NextValueExpr)) diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e48fe434cd3..96b9e925e66 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -30,6 +30,7 @@ #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" +#include "utils/tuplestore.h" /* "options" flag bits for heap_insert */ @@ -270,6 +271,35 @@ typedef enum PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */ } PruneReason; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Holds the tuple set */ + Tuplestorestate *tstore; + + /* Number of buffered tuples currently held */ + int nused; + + /* Memory context for dealing with multi inserts */ + MemoryContext mem_ctx; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -320,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void heap_modify_buffer_flush(TableModifyState *state); +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8713e12cbfb..3942463b715 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -248,12 +248,44 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +struct TableModifyState; + +/* Callback invoked upon flushing each buffered tuple */ +typedef void (*TableModifyBufferFlushCb) (void *context, + TupleTableSlot *slot); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + /* These fields are used for inserts for now */ + + Relation rel; /* Relation to insert to */ + CommandId cid; /* Command ID for insert */ + int options; /* TABLE_INSERT options */ + + /* Memory context for dealing with modify state variables */ + MemoryContext mem_ctx; + + /* Flush callback and its context used for multi inserts */ + TableModifyBufferFlushCb buffer_flush_cb; + void *buffer_flush_ctx; + + /* Table AM specific data */ + void *data; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 #define TABLE_INSERT_FROZEN 0x0004 #define TABLE_INSERT_NO_LOGICAL 0x0008 +/* + * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for + * HEAP_INSERT_SPECULATIVE. + */ +#define TABLE_INSERT_BAS_BULKWRITE 0x0020 + /* flag bits for table_tuple_lock */ /* Follow tuples whose update is in progress if lock modes don't conflict */ #define TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS (1 << 0) @@ -571,6 +603,21 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_buffer_flush) (TableModifyState *state); + void (*tuple_modify_end) (TableModifyState *state); + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1560,6 +1607,43 @@ table_finish_bulk_insert(Relation rel, int options) } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +static inline TableModifyState * +table_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + return rel->rd_tableam->tuple_modify_begin(rel, + cid, + options, + buffer_flush_cb, + buffer_flush_ctx); +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_buffer_flush(state); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_end(state); +} + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5b6cadb5a6c..cbd798187eb 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1200,6 +1200,12 @@ typedef struct PlanState bool async_capable; /* true if node is async-capable */ + /* + * Qual of current node or any qual of nodes lower down the plan tree has + * at least one volatile function. + */ + bool has_volatile; + /* * Scanslot's descriptor if known. This is a bit of a hack, but otherwise * it's hard for expression compilation to optimize based on the diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 658d76225e4..3a38040d991 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -292,6 +292,8 @@ typedef struct ModifyTable CmdType operation; /* do we set the command tag/es_processed? */ bool canSetTag; + /* do we use batching during INSERT? */ + bool canUseBatching; /* Parent RT index for use of EXPLAIN */ Index nominalRelation; /* Root RT index, if partitioned/inherited */ diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 546828b54bd..9bda34d21bc 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -22,6 +22,7 @@ #ifndef OPTIMIZER_H #define OPTIMIZER_H +#include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" /* @@ -142,6 +143,8 @@ extern Expr *canonicalize_qual(Expr *qual, bool is_check); extern bool contain_mutable_functions(Node *clause); extern bool contain_mutable_functions_after_planning(Expr *expr); extern bool contain_volatile_functions(Node *clause); +extern bool contain_volatile_functions_extended(Node *clause, + check_function_callback ud_checker); extern bool contain_volatile_functions_after_planning(Expr *expr); extern bool contain_volatile_functions_not_nextval(Node *clause); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b552359915f..e548954d81d 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -348,6 +348,7 @@ typedef struct StdRdOptions StdRdOptIndexCleanup vacuum_index_cleanup; /* controls index vacuuming */ bool vacuum_truncate; /* enables vacuum to truncate a relation */ bool vacuum_truncate_set; /* whether vacuum_truncate is set */ + bool append_optimized; /* use optimized insertion algorithm */ /* * Fraction of pages in a relation that vacuum can eagerly scan and fail @@ -367,6 +368,15 @@ typedef struct StdRdOptions ((relation)->rd_options ? \ ((StdRdOptions *) (relation)->rd_options)->toast_tuple_target : (defaulttarg)) +/* + * RelationIsAppendOptimized + * Check whether relation can use batching for insertion + */ + #define RelationIsAppendOptimized(relation) \ + (AssertMacro(RelationIsValid(relation)), \ + (relation)->rd_options ? \ + ((StdRdOptions *) (relation)->rd_options)->append_optimized : false) + /* * RelationGetFillFactor * Returns the relation's fillfactor. Note multiple eval of argument! diff --git a/src/test/regress/expected/append_optimized.out b/src/test/regress/expected/append_optimized.out new file mode 100644 index 00000000000..57b45a20e61 --- /dev/null +++ b/src/test/regress/expected/append_optimized.out @@ -0,0 +1,161 @@ +-- Not all INSERT queries are suitable for using batching. All conditions are +-- listed in nodeModifyTable.c +-- In this test we want to check whether append_optimized table correcly +-- determines when to use batching. +CREATE TABLE optimized_tbl ( + int_data INT DEFAULT random() +) WITH (append_optimized=true); +CREATE TABLE rows_source (int_data INT); +INSERT INTO rows_source SELECT generate_series(1, 10); +-- Must not use batching here, because optimized_tbl has volatile function +-- whithin default expression. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +-- Now default expression not prevent us from using batching. +ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0; +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +-- Must not use batching here, because WHERE clause contains volatile function. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > random(); + QUERY PLAN +-------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..54.63 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..54.63 rows=850 width=4) + Filter: ((int_data)::double precision > random()) +(3 rows) + +-- Now WHERE clause not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > 2; + QUERY PLAN +-------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..41.88 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..41.88 rows=850 width=4) + Filter: (int_data > 2) +(3 rows) + +-- Create ROW trigger on optimized_tbl. +CREATE OR REPLACE FUNCTION my_trigger_function() +RETURNS TRIGGER AS $$ +BEGIN + NEW.int_data := NEW.int_data * 10; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER my_row_trigger +BEFORE INSERT ON optimized_tbl +FOR EACH ROW +EXECUTE FUNCTION my_trigger_function(); +-- Must not use batching here, because optimized_tbl has ROW trigger. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +DROP TRIGGER my_row_trigger ON optimized_tbl; +DROP FUNCTION my_trigger_function(); +-- Must not use batching here, because RETURNING clause is specified. +EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data; + QUERY PLAN +----------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..0.01 rows=1 width=4) + -> Result (cost=0.00..0.01 rows=1 width=4) +(2 rows) + +-- Now RETURNING not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl VALUES (100); + QUERY PLAN +---------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..0.01 rows=0 width=0) + -> Result (cost=0.00..0.01 rows=1 width=4) +(2 rows) + +TRUNCATE optimized_tbl; +CREATE INDEX idx_test_int_data ON optimized_tbl (int_data); +-- Fill source table with more data, so there will be several buffers flushs +-- during INSERT opration. +INSERT INTO rows_source SELECT generate_series(11, 10000); +-- It is OK to use batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +----------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..159.75 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..159.75 rows=11475 width=4) +(2 rows) + +INSERT INTO optimized_tbl +SELECT int_data FROM rows_source; +-- Check whether both index and table contains all inserted rows. +SELECT COUNT(*) FROM optimized_tbl; + count +------- + 10000 +(1 row) + +ANALYZE optimized_tbl; +SELECT c.relname, c.reltuples +FROM pg_class c +JOIN pg_index i ON c.oid = i.indexrelid +WHERE i.indrelid = 'optimized_tbl'::regclass; + relname | reltuples +-------------------+----------- + idx_test_int_data | 10000 +(1 row) + +-- We allow to use SERIAL field in append_optimized table. Check whether such +-- fields behave correctly. +CREATE TABLE test_serial( + id SERIAL, + int_data INT +) WITH (append_optimized=true); +CREATE TABLE small_source(int_data INT); +INSERT INTO small_source SELECT generate_series(1, 10); +EXPLAIN INSERT INTO test_serial(int_data) + SELECT int_data FROM small_source; + QUERY PLAN +---------------------------------------------------------------------- + MultiInsert on test_serial (cost=0.00..48.25 rows=0 width=0) + -> Seq Scan on small_source (cost=0.00..48.25 rows=2550 width=8) +(2 rows) + +INSERT INTO test_serial(int_data) +SELECT int_data FROM small_source; +SELECT * FROM test_serial; + id | int_data +----+---------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 +(10 rows) + +-- Cleanup +DROP TABLE optimized_tbl; +DROP TABLE rows_source; +DROP TABLE test_serial; +DROP TABLE small_source; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 0a35f2f8f6a..0cda71a358d 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -136,3 +136,5 @@ test: fast_default # run tablespace test at the end because it drops the tablespace created during # setup that other tests may use. test: tablespace + +test: append_optimized diff --git a/src/test/regress/sql/append_optimized.sql b/src/test/regress/sql/append_optimized.sql new file mode 100644 index 00000000000..ce3ffab2d52 --- /dev/null +++ b/src/test/regress/sql/append_optimized.sql @@ -0,0 +1,105 @@ +-- Not all INSERT queries are suitable for using batching. All conditions are +-- listed in nodeModifyTable.c +-- In this test we want to check whether append_optimized table correcly +-- determines when to use batching. + +CREATE TABLE optimized_tbl ( + int_data INT DEFAULT random() +) WITH (append_optimized=true); + +CREATE TABLE rows_source (int_data INT); +INSERT INTO rows_source SELECT generate_series(1, 10); + +-- Must not use batching here, because optimized_tbl has volatile function +-- whithin default expression. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +-- Now default expression not prevent us from using batching. +ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0; +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +-- Must not use batching here, because WHERE clause contains volatile function. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > random(); + +-- Now WHERE clause not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > 2; + +-- Create ROW trigger on optimized_tbl. +CREATE OR REPLACE FUNCTION my_trigger_function() +RETURNS TRIGGER AS $$ +BEGIN + NEW.int_data := NEW.int_data * 10; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER my_row_trigger +BEFORE INSERT ON optimized_tbl +FOR EACH ROW +EXECUTE FUNCTION my_trigger_function(); + +-- Must not use batching here, because optimized_tbl has ROW trigger. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +DROP TRIGGER my_row_trigger ON optimized_tbl; +DROP FUNCTION my_trigger_function(); + +-- Must not use batching here, because RETURNING clause is specified. +EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data; + +-- Now RETURNING not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl VALUES (100); + +TRUNCATE optimized_tbl; +CREATE INDEX idx_test_int_data ON optimized_tbl (int_data); + +-- Fill source table with more data, so there will be several buffers flushs +-- during INSERT opration. +INSERT INTO rows_source SELECT generate_series(11, 10000); + +-- It is OK to use batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +INSERT INTO optimized_tbl +SELECT int_data FROM rows_source; + +-- Check whether both index and table contains all inserted rows. +SELECT COUNT(*) FROM optimized_tbl; +ANALYZE optimized_tbl; + +SELECT c.relname, c.reltuples +FROM pg_class c +JOIN pg_index i ON c.oid = i.indexrelid +WHERE i.indrelid = 'optimized_tbl'::regclass; + +-- We allow to use SERIAL field in append_optimized table. Check whether such +-- fields behave correctly. +CREATE TABLE test_serial( + id SERIAL, + int_data INT +) WITH (append_optimized=true); + +CREATE TABLE small_source(int_data INT); +INSERT INTO small_source SELECT generate_series(1, 10); + +EXPLAIN INSERT INTO test_serial(int_data) + SELECT int_data FROM small_source; + +INSERT INTO test_serial(int_data) +SELECT int_data FROM small_source; + +SELECT * FROM test_serial; + +-- Cleanup +DROP TABLE optimized_tbl; +DROP TABLE rows_source; +DROP TABLE test_serial; +DROP TABLE small_source; -- 2.43.0