I rebased this patch to resolve a trivial 1 line conflict from c5b7ba4e6. -- Justin
>From 0987ca4f62fb8c9b43a3fe142d955d8a9cb6f36f Mon Sep 17 00:00:00 2001 From: Takayuki Tsunakawa <tsunakawa.ta...@fujitsu.com> Date: Tue, 9 Feb 2021 12:50:00 +0900 Subject: [PATCH] Fast COPY FROM into the foreign or sharded table.
This feature enables bulk COPY into foreign table when multi-insert is possible and foreign table has non-zero number of columns. The following routines are added to the FDW interface: * BeginForeignCopy * ExecForeignCopy * EndForeignCopy BeginForeignCopy and EndForeignCopy initialize and free the CopyState of bulk COPY. The ExecForeignCopy routine runs 'COPY ... FROM STDIN' command to the foreign server, in an iterative manner to send tuples using the CopyTo() machinery. Code that constructs a list of columns for a given foreign relation in the deparseAnalyzeSql() routine is split into deparseRelColumnList(). It is reused in deparseCopyFromSql(). Added TAP-tests on the specific corner cases of COPY FROM STDIN operation. By the analogy of CopyFrom() the CopyState structure was extended with data_dest_cb callback. It is used to send the text representation of a tuple to a custom destination. The PgFdwModifyState structure is extended with the cstate field. It is needed for avoid repeated initialization of CopyState. Also for this reason CopyTo() routine is split into the set of routines CopyToStart()/ CopyTo()/CopyToFinish(). When 0d5f05cde introduced support for using multi-insert mode when copying into partitioned tables, it introduced single variable of enum type CopyInsertMethod shared across all potential target relations (partitions) that, along with some target relation properties, dictated whether to engage multi-insert mode for a given target relation. Change that decision logic to the combination of ExecMultiInsertAllowed() and its caller. The former encapsulates the common criteria to allow multi-insert. The latter uses additional criteria and sets the new boolean field ri_usesMultiInsert of ResultRelInfo. That prevents repeated computation of the same information in some cases, especially for partitions, and the new arrangement results in slightly more readability. Enum CopyInsertMethod is removed. Authors: Andrey Lepikhov, Ashutosh Bapat, Amit Langote, Takayuki Tsunakawa Reviewed-by: Ashutosh Bapat, Amit Langote, Takayuki Tsunakawa Discussion: https://www.postgresql.org/message-id/flat/3d0909dc-3691-a576-208a-90986e55489f%40postgrespro.ru --- contrib/postgres_fdw/deparse.c | 63 +++- .../postgres_fdw/expected/postgres_fdw.out | 46 ++- contrib/postgres_fdw/postgres_fdw.c | 141 +++++++++ contrib/postgres_fdw/postgres_fdw.h | 1 + contrib/postgres_fdw/sql/postgres_fdw.sql | 45 +++ doc/src/sgml/fdwhandler.sgml | 71 ++++- src/backend/commands/copy.c | 2 +- src/backend/commands/copyfrom.c | 271 ++++++++---------- src/backend/commands/copyto.c | 88 ++++-- src/backend/executor/execMain.c | 44 +++ src/backend/executor/execPartition.c | 37 ++- src/include/commands/copy.h | 5 + src/include/commands/copyfrom_internal.h | 10 - src/include/executor/executor.h | 1 + src/include/foreign/fdwapi.h | 15 + src/include/nodes/execnodes.h | 8 +- 16 files changed, 637 insertions(+), 211 deletions(-) diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index bdc4c3620d..bf93c1d091 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -185,6 +185,8 @@ static void appendAggOrderBy(List *orderList, List *targetList, static void appendFunctionName(Oid funcid, deparse_expr_cxt *context); static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno, deparse_expr_cxt *context); +static List *deparseRelColumnList(StringInfo buf, Relation rel, + bool enclose_in_parens); /* * Helper functions @@ -1859,6 +1861,23 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, withCheckOptionList, returningList, retrieved_attrs); } +/* + * Deparse remote COPY FROM statement + * + * Note that this explicitly specifies the list of COPY's target columns + * to account for the fact that the remote table's columns may not match + * exactly with the columns declared in the local definition. + */ +void +deparseCopyFromSql(StringInfo buf, Relation rel) +{ + appendStringInfoString(buf, "COPY "); + deparseRelation(buf, rel); + (void) deparseRelColumnList(buf, rel, true); + + appendStringInfoString(buf, " FROM STDIN "); +} + /* * deparse remote UPDATE statement * @@ -2120,6 +2139,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel) */ void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) +{ + appendStringInfoString(buf, "SELECT "); + *retrieved_attrs = deparseRelColumnList(buf, rel, false); + + /* Don't generate bad syntax for zero-column relation. */ + if (list_length(*retrieved_attrs) == 0) + appendStringInfoString(buf, "NULL"); + + /* + * Construct FROM clause + */ + appendStringInfoString(buf, " FROM "); + deparseRelation(buf, rel); +} + +/* + * Construct the list of columns of given foreign relation in the order they + * appear in the tuple descriptor of the relation. Ignore any dropped columns. + * Use column names on the foreign server instead of local names. + * + * Optionally enclose the list in parantheses. + */ +static List * +deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens) { Oid relid = RelationGetRelid(rel); TupleDesc tupdesc = RelationGetDescr(rel); @@ -2128,10 +2171,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) List *options; ListCell *lc; bool first = true; + List *retrieved_attrs = NIL; - *retrieved_attrs = NIL; - - appendStringInfoString(buf, "SELECT "); for (i = 0; i < tupdesc->natts; i++) { /* Ignore dropped columns. */ @@ -2140,6 +2181,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) if (!first) appendStringInfoString(buf, ", "); + else if (enclose_in_parens) + appendStringInfoChar(buf, '('); + first = false; /* Use attribute name or column_name option. */ @@ -2159,18 +2203,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs) appendStringInfoString(buf, quote_identifier(colname)); - *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1); + retrieved_attrs = lappend_int(retrieved_attrs, i + 1); } - /* Don't generate bad syntax for zero-column relation. */ - if (first) - appendStringInfoString(buf, "NULL"); + if (enclose_in_parens && list_length(retrieved_attrs) > 0) + appendStringInfoChar(buf, ')'); - /* - * Construct FROM clause - */ - appendStringInfoString(buf, " FROM "); - deparseRelation(buf, rel); + return retrieved_attrs; } /* diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 7f69fa0054..b214395a78 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8078,8 +8078,9 @@ copy rem2 from stdin; copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). -CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +CONTEXT: COPY loc2, line 1: "-1 xyzzy" +remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 2 select * from rem2; f1 | f2 ----+----- @@ -8090,6 +8091,19 @@ select * from rem2; alter foreign table rem2 drop constraint rem2_f1positive; alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); +copy foo from stdin; +NOTICE: (1) -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -8198,6 +8212,34 @@ drop trigger rem2_trig_row_before on rem2; drop trigger rem2_trig_row_after on rem2; drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +ERROR: column "f1" of relation "loc2" does not exist +CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN +COPY rem2, line 3 +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(2 rows) + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(4 rows) + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index c590f374c6..c615cafd8f 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -18,6 +18,7 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/pg_class.h" +#include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" @@ -209,6 +210,7 @@ typedef struct PgFdwModifyState /* for update row movement if subplan result rel */ struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if * created */ + CopyToState cstate; /* foreign COPY state, if used */ } PgFdwModifyState; /* @@ -383,6 +385,13 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo); static void postgresEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo); +static void postgresBeginForeignCopy(EState *estate, + ResultRelInfo *resultRelInfo); +static void postgresEndForeignCopy(EState *estate, + ResultRelInfo *resultRelInfo); +static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + int nslots); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, @@ -579,6 +588,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->EndForeignModify = postgresEndForeignModify; routine->BeginForeignInsert = postgresBeginForeignInsert; routine->EndForeignInsert = postgresEndForeignInsert; + routine->BeginForeignCopy = postgresBeginForeignCopy; + routine->ExecForeignCopy = postgresExecForeignCopy; + routine->EndForeignCopy = postgresEndForeignCopy; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; routine->PlanDirectModify = postgresPlanDirectModify; routine->BeginDirectModify = postgresBeginDirectModify; @@ -2209,6 +2221,135 @@ postgresEndForeignInsert(EState *estate, finish_foreign_modify(fmstate); } +static PgFdwModifyState *copy_fmstate = NULL; + +static void +pgfdw_copy_dest_cb(void *buf, int len) +{ + PGconn *conn = copy_fmstate->conn; + + if (PQputCopyData(conn, (char *) buf, len) <= 0) + pgfdw_report_error(ERROR, NULL, conn, false, copy_fmstate->query); +} + +/* + * postgresBeginForeignCopy + * Begin a COPY operation on a foreign table + */ +static void +postgresBeginForeignCopy(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate; + StringInfoData sql; + RangeTblEntry *rte; + Relation rel = resultRelInfo->ri_RelationDesc; + + if (resultRelInfo->ri_RangeTableIndex == 0) + { + ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; + + Assert(rootResultRelInfo != NULL); + rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); + rte = copyObject(rte); + rte->relid = RelationGetRelid(rel); + rte->relkind = RELKIND_FOREIGN_TABLE; + } + else + rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, estate); + + initStringInfo(&sql); + deparseCopyFromSql(&sql, rel); + + fmstate = create_foreign_modify(estate, + rte, + resultRelInfo, + CMD_INSERT, + NULL, + sql.data, + NIL, + -1, + false, + NIL); + + fmstate->cstate = BeginCopyTo(NULL, rel, NULL, + InvalidOid, NULL, false, pgfdw_copy_dest_cb, + NIL, NIL); + CopyToStart(fmstate->cstate); + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresExecForeignCopy + * Send a number of tuples to the foreign relation. + */ +static void +postgresExecForeignCopy(ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, int nslots) +{ + PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; + PGresult *res; + PGconn *conn = fmstate->conn; + bool OK = false; + int i; + + /* Check correct use of CopyIn FDW API. */ + Assert(fmstate->cstate != NULL); + + res = PQexec(conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, conn, true, fmstate->query); + PQclear(res); + + PG_TRY(); + { + copy_fmstate = fmstate; + for (i = 0; i < nslots; i++) + CopyOneRowTo(fmstate->cstate, slots[i]); + + OK = true; + } + PG_FINALLY(); + { + /* + * Finish COPY IN protocol. It is needed to do after successful copy or + * after an error. + */ + if (PQputCopyEnd(conn, OK ? NULL : "canceled by server") <= 0) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* After successfully sending an EOF signal, check command OK. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + PQclear(res); + /* Do this to ensure we have not gotten extra results */ + if (PQgetResult(conn) != NULL) + ereport(ERROR, + (errmsg("unexpected extra results during COPY of table: %s", + PQerrorMessage(conn)))); + } + PG_END_TRY(); +} + +/* + * postgresEndForeignCopy + * Finish a COPY operation on a foreign table + */ +static void +postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + /* Check correct use of CopyIn FDW API. */ + Assert(fmstate->cstate != NULL); + CopyToFinish(fmstate->cstate); + pfree(fmstate->cstate); + fmstate->cstate = NULL; + finish_foreign_modify(fmstate); +} + /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 5d44b75314..10392f6ec2 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -179,6 +179,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte, extern void rebuildInsertSql(StringInfo buf, char *orig_query, int values_end_len, int num_cols, int num_rows); +extern void deparseCopyFromSql(StringInfo buf, Relation rel); extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 7487096eac..32062b4a55 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2237,6 +2237,23 @@ alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); + +copy foo from stdin; +1 +2 +\. + -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -2337,6 +2354,34 @@ drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +1 foo +2 bar +\. + +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 98882ddab8..fad2ff6161 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -822,8 +822,9 @@ BeginForeignInsert(ModifyTableState *mtstate, Begin executing an insert operation on a foreign table. This routine is called right before the first tuple is inserted into the foreign table - in both cases when it is the partition chosen for tuple routing and the - target specified in a <command>COPY FROM</command> command. It should + target specified in a <command>COPY FROM</command> command, or when + the foreign table is the partition chosen for tuple routing of a + partitioned table. It should perform any initialization needed prior to the actual insertion. Subsequently, <function>ExecForeignInsert</function> or <function>ExecForeignBatchInsert</function> will be called for @@ -1137,6 +1138,72 @@ ExecForeignTruncate(List *rels, List *rels_extra, <para> <programlisting> +void +BeginForeignCopy(EState *estate, + ResultRelInfo *rinfo); +</programlisting> + + Begin executing a copy operation on a foreign table. This routine is + called right before the first call of <function>ExecForeignCopy</function> + routine for the foreign table. It should perform any initialization needed + prior to the actual COPY FROM operation. + Subsequently, <function>ExecForeignCopy</function> will be called for + a batch of tuples to be copied into the foreign table. + </para> + + <para> + <literal>estate</literal> is global execution state for the query. + <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing + the target foreign table. (The <structfield>ri_FdwState</structfield> field of + <structname>ResultRelInfo</structname> is available for the FDW to store any + private state it needs for this operation.) + </para> + + <para> + If the <function>BeginForeignCopy</function> pointer is set to + <literal>NULL</literal>, no action is taken for the initialization. + </para> + + <para> +<programlisting> +void +ExecForeignCopy(ResultRelInfo *rinfo, + TupleTableSlot **slots, + int nslots); +</programlisting> + + Copy a batch of tuples into the foreign table. + <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing + the target foreign table. + <literal>slots</literal> contains the tuples to be inserted; it will match the + row-type definition of the foreign table. + <literal>nslots</literal> is the number of tuples in the <literal>slots</literal> + </para> + + <para> + If the <function>ExecForeignCopy</function> pointer is set to + <literal>NULL</literal>, the <function>ExecForeignInsert</function> routine will be used to run COPY on the foreign table. + </para> + + <para> +<programlisting> +void +EndForeignCopy(EState *estate, + ResultRelInfo *rinfo); +</programlisting> + + End the copy operation and release resources. It is normally not important + to release palloc'd memory, but for example open files and connections + to remote servers should be cleaned up. + </para> + + <para> + If the <function>EndForeignCopy</function> pointer is set to + <literal>NULL</literal>, no action is taken for the termination. + </para> + + <para> +<programlisting> RowMarkType GetForeignRowMarkType(RangeTblEntry *rte, LockClauseStrength strength); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 8265b981eb..f646770767 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -304,7 +304,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, CopyToState cstate; cstate = BeginCopyTo(pstate, rel, query, relid, - stmt->filename, stmt->is_program, + stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 20e7d57d41..b486ffd641 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -317,54 +317,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->line_buf_valid = false; save_cur_lineno = cstate->cur_lineno; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + { + /* Flush into foreign table or partition */ + resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo, + slots, + nused); + } + else { /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, false, - NULL, NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); - } + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + buffer->slots[i], estate, false, false, + NULL, NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, cstate->transition_capture); + } - ExecClearTuple(slots[i]); + ExecClearTuple(slots[i]); + } } /* Mark that all slots are free */ @@ -538,13 +548,11 @@ CopyFrom(CopyFromState cstate) CommandId mycid = GetCurrentCommandId(true); int ti_options = 0; /* start with default options for insert */ BulkInsertState bistate = NULL; - CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ int64 processed = 0; int64 excluded = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; - bool leafpart_use_multi_insert = false; Assert(cstate->rel); Assert(list_length(cstate->range_table) == 1); @@ -654,6 +662,33 @@ CopyFrom(CopyFromState cstate) resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); ExecInitResultRelation(estate, resultRelInfo, 1); + Assert(!target_resultRelInfo->ri_usesMultiInsert); + + /* + * It's generally more efficient to prepare a bunch of tuples for + * insertion, and insert them in bulk, for example, with one + * table_multi_insert() call than call table_tuple_insert() separately for + * every tuple. However, there are a number of reasons why we might not be + * able to do this. For example, if there any volatile expressions in the + * table's default values or in the statement's WHERE clause, which may + * query the table we are inserting into, buffering tuples might produce + * wrong results. Also, the relation we are trying to insert into itself + * may not be amenable to buffered inserts. + * + * Note: For partitions, this flag is set considering the target table's + * flag that is being set here and partition's own properties which are + * checked by calling ExecMultiInsertAllowed(). It does not matter + * whether partitions have any volatile default expressions as we use the + * defaults from the target of the COPY command. + * Also, the COPY command requires a non-zero input list of attributes. + * Therefore, the length of the attribute list is checked here. + */ + if (!cstate->volatile_defexprs && + list_length(cstate->attnumlist) > 0 && + !contain_volatile_functions(cstate->whereClause)) + target_resultRelInfo->ri_usesMultiInsert = + ExecMultiInsertAllowed(target_resultRelInfo); + /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); @@ -671,10 +706,22 @@ CopyFrom(CopyFromState cstate) mtstate->resultRelInfo = resultRelInfo; mtstate->rootResultRelInfo = resultRelInfo; - if (resultRelInfo->ri_FdwRoutine != NULL && - resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) - resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, - resultRelInfo); + /* + * Init copying process into foreign table. Initialization of copying into + * foreign partitions will be done later. + */ + if (target_resultRelInfo->ri_FdwRoutine != NULL) + { + if (target_resultRelInfo->ri_usesMultiInsert) + { + if (target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy != NULL) + target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(estate, + resultRelInfo); + } + else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, + resultRelInfo); + } /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -703,83 +750,9 @@ CopyFrom(CopyFromState cstate) cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), &mtstate->ps); - /* - * It's generally more efficient to prepare a bunch of tuples for - * insertion, and insert them in one table_multi_insert() call, than call - * table_tuple_insert() separately for every tuple. However, there are a - * number of reasons why we might not be able to do this. These are - * explained below. - */ - if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_before_row || - resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) - { - /* - * Can't support multi-inserts when there are any BEFORE/INSTEAD OF - * triggers on the table. Such triggers might query the table we're - * inserting into and act differently if the tuples that have already - * been processed and prepared for insertion are not there. - */ - insertMethod = CIM_SINGLE; - } - else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && - resultRelInfo->ri_TrigDesc->trig_insert_new_table) - { - /* - * For partitioned tables we can't support multi-inserts when there - * are any statement level insert triggers. It might be possible to - * allow partitioned tables with such triggers in the future, but for - * now, CopyMultiInsertInfoFlush expects that any before row insert - * and statement level insert triggers are on the same relation. - */ - insertMethod = CIM_SINGLE; - } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) - { - /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. - * - * Note: It does not matter if any partitions have any volatile - * default expressions as we use the defaults from the target of the - * COPY command. - */ - insertMethod = CIM_SINGLE; - } - else if (contain_volatile_functions(cstate->whereClause)) - { - /* - * Can't support multi-inserts if there are any volatile function - * expressions in WHERE clause. Similarly to the trigger case above, - * such expressions may query the table we're inserting into. - */ - insertMethod = CIM_SINGLE; - } - else - { - /* - * For partitioned tables, we may still be able to perform bulk - * inserts. However, the possibility of this depends on which types - * of triggers exist on the partition. We must disable bulk inserts - * if the partition is a foreign table or it has any before row insert - * or insert instead triggers (same as we checked above for the parent - * table). Since the partition's resultRelInfos are initialized only - * when we actually need to insert the first tuple into them, we must - * have the intermediate insert method of CIM_MULTI_CONDITIONAL to - * flag that we must later determine if we can use bulk-inserts for - * the partition being inserted into. - */ - if (proute) - insertMethod = CIM_MULTI_CONDITIONAL; - else - insertMethod = CIM_MULTI; - + if (resultRelInfo->ri_usesMultiInsert) CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, estate, mycid, ti_options); - } /* * If not using batch mode (which allocates slots as needed) set up a @@ -787,7 +760,7 @@ CopyFrom(CopyFromState cstate) * one, even if we might batch insert, to read the tuple in the root * partition's form. */ - if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) + if (!resultRelInfo->ri_usesMultiInsert || proute) { singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, &estate->es_tupleTable); @@ -830,7 +803,7 @@ CopyFrom(CopyFromState cstate) ResetPerTupleExprContext(estate); /* select slot to (initially) load row into */ - if (insertMethod == CIM_SINGLE || proute) + if (!target_resultRelInfo->ri_usesMultiInsert || proute) { myslot = singleslot; Assert(myslot != NULL); @@ -838,7 +811,6 @@ CopyFrom(CopyFromState cstate) else { Assert(resultRelInfo == target_resultRelInfo); - Assert(insertMethod == CIM_MULTI); myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, resultRelInfo); @@ -905,24 +877,14 @@ CopyFrom(CopyFromState cstate) has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_instead_row); - /* - * Disable multi-inserts when the partition has BEFORE/INSTEAD - * OF triggers, or if the partition is a foreign partition. - */ - leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && - !has_before_insert_row_trig && - !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; - /* Set the multi-insert buffer to use for this partition. */ - if (leafpart_use_multi_insert) + if (resultRelInfo->ri_usesMultiInsert) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, resultRelInfo); } - else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + else if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) { /* * Flush pending inserts if this partition can't use @@ -952,7 +914,7 @@ CopyFrom(CopyFromState cstate) * rowtype. */ map = resultRelInfo->ri_RootToPartitionMap; - if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) + if (!resultRelInfo->ri_usesMultiInsert) { /* non batch insert */ if (map != NULL) @@ -971,9 +933,6 @@ CopyFrom(CopyFromState cstate) */ TupleTableSlot *batchslot; - /* no other path available for partitioned table */ - Assert(insertMethod == CIM_MULTI_CONDITIONAL); - batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, resultRelInfo); @@ -1045,7 +1004,7 @@ CopyFrom(CopyFromState cstate) ExecPartitionCheck(resultRelInfo, myslot, estate, true); /* Store the slot in the multi-insert buffer, when enabled. */ - if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) + if (resultRelInfo->ri_usesMultiInsert) { /* * The slot previously might point into the per-tuple @@ -1124,11 +1083,8 @@ CopyFrom(CopyFromState cstate) } /* Flush any remaining buffered tuples */ - if (insertMethod != CIM_SINGLE) - { - if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); - } + if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); /* Done, clean up */ error_context_stack = errcallback.previous; @@ -1147,14 +1103,21 @@ CopyFrom(CopyFromState cstate) ExecResetTupleTable(estate->es_tupleTable, false); /* Allow the FDW to shut down */ - if (target_resultRelInfo->ri_FdwRoutine != NULL && - target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) - target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, - target_resultRelInfo); + if (target_resultRelInfo->ri_FdwRoutine != NULL) + { + if (target_resultRelInfo->ri_usesMultiInsert) + { + if (target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL) + target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate, + target_resultRelInfo); + } + else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) + target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, + target_resultRelInfo); + } /* Tear down the multi-insert buffer data */ - if (insertMethod != CIM_SINGLE) - CopyMultiInsertInfoCleanup(&multiInsertInfo); + CopyMultiInsertInfoCleanup(&multiInsertInfo); /* Close all the partitioned tables, leaf partitions, and their indices */ if (proute) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 7257a54e93..378233655d 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -51,6 +51,7 @@ typedef enum CopyDest { COPY_FILE, /* to file (or a piped program) */ COPY_FRONTEND, /* to frontend */ + COPY_CALLBACK /* to callback function */ } CopyDest; /* @@ -86,6 +87,7 @@ typedef struct CopyToStateData char *filename; /* filename, or NULL for STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ CopyFormatOptions opts; Node *whereClause; /* WHERE condition (or NULL) */ @@ -115,7 +117,6 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ static void EndCopy(CopyToState cstate); static void ClosePipeToProgram(CopyToState cstate); -static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot); static void CopyAttributeOutText(CopyToState cstate, char *string); static void CopyAttributeOutCSV(CopyToState cstate, char *string, bool use_quote, bool single_attr); @@ -248,6 +249,15 @@ CopySendEndOfRow(CopyToState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + Assert(!cstate->opts.binary); +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); + break; } /* Update the progress */ @@ -345,11 +355,12 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { CopyToState cstate; - bool pipe = (filename == NULL); + bool pipe = (filename == NULL) && (data_dest_cb == NULL); TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; @@ -362,7 +373,13 @@ BeginCopyTo(ParseState *pstate, 0 }; - if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) + /* + * Check whether we support copying data out of the specified relation, + * unless the caller also passed a non-NULL data_dest_cb, in which case, + * the callback will take care of it + */ + if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION && + data_dest_cb == NULL) { if (rel->rd_rel->relkind == RELKIND_VIEW) ereport(ERROR, @@ -673,6 +690,11 @@ BeginCopyTo(ParseState *pstate, if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } + else if (data_dest_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } else { cstate->filename = pstrdup(filename); @@ -773,20 +795,17 @@ EndCopyTo(CopyToState cstate) } /* - * Copy from relation or query TO file. + * Start COPY TO operation. + * Separate from the main routine to prevent duplicate operations in + * manual mode, where tuples are copied to the destination one by one, by calling + * the CopyOneRowTo() routine. */ -uint64 -DoCopyTo(CopyToState cstate) +void +CopyToStart(CopyToState cstate) { - bool pipe = (cstate->filename == NULL); - bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; int num_phys_attrs; ListCell *cur; - uint64 processed; - - if (fe_copy) - SendCopyBegin(cstate); if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); @@ -876,6 +895,39 @@ DoCopyTo(CopyToState cstate) CopySendEndOfRow(cstate); } } +} + +/* + * Finish COPY TO operation. + */ +void +CopyToFinish(CopyToState cstate) +{ + if (cstate->opts.binary) + { + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); + } + + MemoryContextDelete(cstate->rowcontext); +} + +/* + * Copy from relation or query TO file. + */ +uint64 +DoCopyTo(CopyToState cstate) +{ + bool pipe = (cstate->filename == NULL) && (cstate->data_dest_cb == NULL); + bool fe_copy = (pipe && whereToSendOutput == DestRemote); + uint64 processed; + + if (fe_copy) + SendCopyBegin(cstate); + + CopyToStart(cstate); if (cstate->rel) { @@ -914,15 +966,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - - MemoryContextDelete(cstate->rowcontext); + CopyToFinish(cstate); if (fe_copy) SendCopyEnd(cstate); @@ -933,7 +977,7 @@ DoCopyTo(CopyToState cstate) /* * Emit one row during DoCopyTo(). */ -static void +void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { bool need_delim = false; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b2e2df8773..f9049cfae4 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1254,9 +1254,53 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_PartitionTupleSlot = NULL; /* ditto */ resultRelInfo->ri_ChildToRootMap = NULL; resultRelInfo->ri_ChildToRootMapValid = false; + resultRelInfo->ri_usesMultiInsert = false; resultRelInfo->ri_CopyMultiInsertBuffer = NULL; } +/* + * ExecMultiInsertAllowed + * Does this relation allow caller to use multi-insert mode when + * inserting rows into it? + */ +bool +ExecMultiInsertAllowed(const ResultRelInfo *rri) +{ + /* + * Can't support multi-inserts when there are any BEFORE/INSTEAD OF + * triggers on the table. Such triggers might query the table we're + * inserting into and act differently if the tuples that have already + * been processed and prepared for insertion are not there. + */ + if (rri->ri_TrigDesc != NULL && + (rri->ri_TrigDesc->trig_insert_before_row || + rri->ri_TrigDesc->trig_insert_instead_row)) + return false; + + /* + * For partitioned tables we can't support multi-inserts when there are + * any statement level insert triggers. It might be possible to allow + * partitioned tables with such triggers in the future, but for now, + * CopyMultiInsertInfoFlush expects that any before row insert and + * statement level insert triggers are on the same relation. + */ + if (rri->ri_RelationDesc->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + rri->ri_TrigDesc != NULL && + rri->ri_TrigDesc->trig_insert_new_table) + return false; + + if (rri->ri_FdwRoutine != NULL && + rri->ri_FdwRoutine->ExecForeignCopy == NULL) + /* + * Foreign tables don't support multi-inserts, unless their FDW + * provides the necessary COPY interface. + */ + return false; + + /* OK, caller can use multi-insert on this relation. */ + return true; +} + /* * ExecGetTriggerResultRel * Get a ResultRelInfo for a trigger target relation. diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 99780ebb96..f402e13b9b 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -514,6 +514,14 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, rootResultRelInfo, estate->es_instrument); + /* + * If a partition's root parent isn't allowed to use it, neither is the + * partition. + */ + if (rootResultRelInfo->ri_usesMultiInsert) + leaf_part_rri->ri_usesMultiInsert = + ExecMultiInsertAllowed(leaf_part_rri); + /* * Verify result relation is a valid target for an INSERT. An UPDATE of a * partition-key becomes a DELETE+INSERT operation, so this check is still @@ -907,9 +915,16 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, * If the partition is a foreign table, let the FDW init itself for * routing tuples to the partition. */ - if (partRelInfo->ri_FdwRoutine != NULL && - partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) - partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); + if (partRelInfo->ri_FdwRoutine != NULL) + { + if (partRelInfo->ri_usesMultiInsert) + { + if (partRelInfo->ri_FdwRoutine->BeginForeignCopy != NULL) + partRelInfo->ri_FdwRoutine->BeginForeignCopy(estate, partRelInfo); + } + else if (partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); + } /* * Determine if the FDW supports batch insert and determine the batch @@ -1146,10 +1161,18 @@ ExecCleanupTupleRouting(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo = proute->partitions[i]; /* Allow any FDWs to shut down */ - if (resultRelInfo->ri_FdwRoutine != NULL && - resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) - resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state, - resultRelInfo); + if (resultRelInfo->ri_FdwRoutine != NULL) + { + if (resultRelInfo->ri_usesMultiInsert) + { + if (resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL) + resultRelInfo->ri_FdwRoutine->EndForeignCopy(mtstate->ps.state, + resultRelInfo); + } + else if (resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) + resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state, + resultRelInfo); + } /* * Close it if it's not one of the result relations borrowed from the diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 8c4748e33d..3d9d187765 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -55,6 +55,7 @@ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *outbuf, int len); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -80,10 +81,14 @@ extern DestReceiver *CreateCopyDestReceiver(void); */ extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options); extern void EndCopyTo(CopyToState cstate); extern uint64 DoCopyTo(CopyToState cstate); extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); +extern void CopyToStart(CopyToState cstate); +extern void CopyToFinish(CopyToState cstate); +extern void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot); #endif /* COPY_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 858af7a717..8f61ff3d4d 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -39,16 +39,6 @@ typedef enum EolType EOL_CRNL } EolType; -/* - * Represents the heap insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or fdw routine */ - CIM_MULTI, /* always use table_multi_insert */ - CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ -} CopyInsertMethod; - /* * This struct contains all the state variables used throughout a COPY FROM * operation. diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6eae134c08..beb8e8fcd0 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -203,6 +203,7 @@ extern void InitResultRelInfo(ResultRelInfo *resultRelInfo, Index resultRelationIndex, ResultRelInfo *partition_root_rri, int instrument_options); +extern bool ExecMultiInsertAllowed(const ResultRelInfo *rri); extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid); extern void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate); diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 4ebbca6de9..74fe6bdf5c 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -127,6 +127,16 @@ typedef TupleTableSlot *(*IterateDirectModify_function) (ForeignScanState *node) typedef void (*EndDirectModify_function) (ForeignScanState *node); +typedef void (*BeginForeignCopy_function) (EState *estate, + ResultRelInfo *rinfo); + +typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo, + TupleTableSlot **slots, + int nslots); + +typedef void (*EndForeignCopy_function) (EState *estate, + ResultRelInfo *rinfo); + typedef RowMarkType (*GetForeignRowMarkType_function) (RangeTblEntry *rte, LockClauseStrength strength); @@ -244,6 +254,11 @@ typedef struct FdwRoutine IterateDirectModify_function IterateDirectModify; EndDirectModify_function EndDirectModify; + /* Support functions for COPY into foreign tables */ + BeginForeignCopy_function BeginForeignCopy; + ExecForeignCopy_function ExecForeignCopy; + EndForeignCopy_function EndForeignCopy; + /* Functions for SELECT FOR UPDATE/SHARE row locking */ GetForeignRowMarkType_function GetForeignRowMarkType; RefetchForeignRow_function RefetchForeignRow; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e7ae21c023..8f13c92726 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -521,7 +521,13 @@ typedef struct ResultRelInfo TupleConversionMap *ri_ChildToRootMap; bool ri_ChildToRootMapValid; - /* for use by copyfrom.c when performing multi-inserts */ + /* + * The following fields are currently only relevant to copyfrom.c. + * True if okay to use multi-insert on this relation + */ + bool ri_usesMultiInsert; + + /* Buffer allocated to this relation when using multi-insert mode */ struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; } ResultRelInfo; -- 2.17.0