22.06.2020 17:11, Ashutosh Bapat пишет:
On Wed, 17 Jun 2020 at 11:54, Andrey V. Lepikhov
<a.lepik...@postgrespro.ru <mailto:a.lepik...@postgrespro.ru>> wrote:
On 6/15/20 10:26 AM, Ashutosh Bapat wrote:
> Thanks Andrey for the patch. I am glad that the patch has taken care
> of some corner cases already but there exist still more.
>
> COPY command constructed doesn't take care of dropped columns. There
> is code in deparseAnalyzeSql which constructs list of columns for a
> given foreign relation. 0002 patch attached here, moves that code
to a
> separate function and reuses it for COPY. If you find that code
change
> useful please include it in the main patch.
Thanks, i included it.
> 2. In the same case, if the foreign table declared locally didn't
have
> any non-dropped columns but the relation that it referred to on the
> foreign server had some non-dropped columns, COPY command fails. I
> added a test case for this in 0002 but haven't fixed it.
I fixed it.
This is very special corner case. The problem was that COPY FROM does
not support semantics like the "INSERT INTO .. DEFAULT VALUES". To
simplify the solution, i switched off bulk copying for this case.
> I think this work is useful. Please add it to the next commitfest so
> that it's tracked.
Ok.
It looks like we call BeginForeignInsert and EndForeignInsert even
though actual copy is performed using BeginForeignCopy, ExecForeignCopy
and EndForeignCopy. BeginForeignInsert constructs the INSERT query which
looks unnecessary. Also some of the other PgFdwModifyState members are
initialized unnecessarily. It also gives an impression that we are using
INSERT underneath the copy. Instead a better way would be to
call BeginForeignCopy instead of BeginForeignInsert and EndForeignCopy
instead of EndForeignInsert, if we are going to use COPY protocol to
copy data to the foreign server. Corresponding postgres_fdw
implementations need to change in order to do that.
Fixed.
I replaced names of CopyIn FDW API. Also the partition routing
initializer calls BeginForeignInsert or BeginForeignCopyIn routines in
accordance with value of ResultRelInfo::UseBulkModifying.
I introduced this parameter because foreign partitions can be placed at
foreign servers with different types of foreign wrapper. Not all
wrappers can support CopyIn API.
Also I ran the Tomas Vondra benchmark. At my laptop we have results:
* regular: 5000 ms.
* Tomas buffering patch: 11000 ms.
* This CopyIn patch: 8000 ms.
--
regards,
Andrey Lepikhov
Postgres Professional
>From ac43384af911acd0a07b3fae0ab25a9131a4504c Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Thu, 9 Jul 2020 11:16:56 +0500
Subject: [PATCH] Fast COPY FROM into the foreign or sharded table.
This feature enables bulk COPY into foreign table in the case of
multi inserts is possible and foreign table has non-zero number of columns.
---
contrib/postgres_fdw/deparse.c | 60 ++++-
.../postgres_fdw/expected/postgres_fdw.out | 33 ++-
contrib/postgres_fdw/postgres_fdw.c | 130 ++++++++++
contrib/postgres_fdw/postgres_fdw.h | 1 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 28 ++
src/backend/commands/copy.c | 239 ++++++++++++------
src/backend/executor/execMain.c | 1 +
src/backend/executor/execPartition.c | 34 ++-
src/include/commands/copy.h | 5 +
src/include/foreign/fdwapi.h | 15 ++
src/include/nodes/execnodes.h | 8 +
11 files changed, 456 insertions(+), 98 deletions(-)
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..a37981ff66 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList,
static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
deparse_expr_cxt *context);
+static List *deparseRelColumnList(StringInfo buf, Relation rel,
+ bool enclose_in_parens);
/*
* Helper functions
@@ -1758,6 +1760,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
withCheckOptionList, returningList, retrieved_attrs);
}
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+ appendStringInfoString(buf, "COPY ");
+ deparseRelation(buf, rel);
+ (void) deparseRelColumnList(buf, rel, true);
+
+ appendStringInfoString(buf, " FROM STDIN ");
+}
+
/*
* deparse remote UPDATE statement
*
@@ -2061,6 +2077,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
*/
void
deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
+{
+ appendStringInfoString(buf, "SELECT ");
+ *retrieved_attrs = deparseRelColumnList(buf, rel, false);
+
+ /* Don't generate bad syntax for zero-column relation. */
+ if (list_length(*retrieved_attrs) == 0)
+ appendStringInfoString(buf, "NULL");
+
+ /*
+ * Construct FROM clause
+ */
+ appendStringInfoString(buf, " FROM ");
+ deparseRelation(buf, rel);
+}
+
+/*
+ * Construct the list of columns of given foreign relation in the order they
+ * appear in the tuple descriptor of the relation. Ignore any dropped columns.
+ * Use column names on the foreign server instead of local names.
+ *
+ * Optionally enclose the list in parantheses.
+ */
+static List *
+deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens)
{
Oid relid = RelationGetRelid(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
@@ -2069,10 +2109,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
List *options;
ListCell *lc;
bool first = true;
+ List *retrieved_attrs = NIL;
- *retrieved_attrs = NIL;
-
- appendStringInfoString(buf, "SELECT ");
for (i = 0; i < tupdesc->natts; i++)
{
/* Ignore dropped columns. */
@@ -2081,6 +2119,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
if (!first)
appendStringInfoString(buf, ", ");
+ else if (enclose_in_parens)
+ appendStringInfoChar(buf, '(');
+
first = false;
/* Use attribute name or column_name option. */
@@ -2100,18 +2141,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
appendStringInfoString(buf, quote_identifier(colname));
- *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
+ retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
}
- /* Don't generate bad syntax for zero-column relation. */
- if (first)
- appendStringInfoString(buf, "NULL");
+ if (enclose_in_parens && list_length(retrieved_attrs) > 0)
+ appendStringInfoChar(buf, ')');
- /*
- * Construct FROM clause
- */
- appendStringInfoString(buf, " FROM ");
- deparseRelation(buf, rel);
+ return retrieved_attrs;
}
/*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 82fc1290ef..3a3cca5047 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8063,8 +8063,9 @@ copy rem2 from stdin;
copy rem2 from stdin; -- ERROR
ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
DETAIL: Failing row contains (-1, xyzzy).
-CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1 xyzzy"
+CONTEXT: COPY loc2, line 1: "-1 xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
+COPY rem2, line 2
select * from rem2;
f1 | f2
----+-----
@@ -8183,6 +8184,34 @@ drop trigger rem2_trig_row_before on rem2;
drop trigger rem2_trig_row_after on rem2;
drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+ERROR: column "f1" of relation "loc2" does not exist
+CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
+COPY rem2, line 3
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+ f1 | f2
+----+----
+(0 rows)
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(4 rows)
+
-- test COPY FROM with foreign table created in the same transaction
create table loc3 (f1 int, f2 text);
begin;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..0db8d74320 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
#include "access/sysattr.h"
#include "access/table.h"
#include "catalog/pg_class.h"
+#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
@@ -190,6 +191,7 @@ typedef struct PgFdwModifyState
/* for update row movement if subplan result rel */
struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
* created */
+ CopyState cstate; /* foreign COPY state, if used */
} PgFdwModifyState;
/*
@@ -356,6 +358,10 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo);
static void postgresEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo);
+static void postgresBeginForeignCopyIn(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopyIn(EState *estate, ResultRelInfo *resultRelInfo);
+static TupleTableSlot *postgresExecForeignCopyIn(ResultRelInfo *resultRelInfo, TupleTableSlot **slots, int nslots);
static int postgresIsForeignRelUpdatable(Relation rel);
static bool postgresPlanDirectModify(PlannerInfo *root,
ModifyTable *plan,
@@ -533,6 +539,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->EndForeignModify = postgresEndForeignModify;
routine->BeginForeignInsert = postgresBeginForeignInsert;
routine->EndForeignInsert = postgresEndForeignInsert;
+ routine->BeginForeignCopyIn = postgresBeginForeignCopyIn;
+ routine->EndForeignCopyIn = postgresEndForeignCopyIn;
+ routine->ExecForeignCopyIn = postgresExecForeignCopyIn;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
routine->PlanDirectModify = postgresPlanDirectModify;
routine->BeginDirectModify = postgresBeginDirectModify;
@@ -1847,6 +1856,9 @@ postgresExecForeignInsert(EState *estate,
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
TupleTableSlot *rslot;
+ Assert(!resultRelInfo->UseBulkModifying ||
+ resultRelInfo->ri_FdwRoutine->BeginForeignCopyIn == NULL);
+
/*
* If the fmstate has aux_fmstate set, use the aux_fmstate (see
* postgresBeginForeignInsert())
@@ -2051,6 +2063,124 @@ postgresEndForeignInsert(EState *estate,
finish_foreign_modify(fmstate);
}
+/*
+ *
+ * postgresBeginForeignCopyIn
+ * Begin an COPY operation on a foreign table
+ */
+static void
+postgresBeginForeignCopyIn(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo)
+{
+ PgFdwModifyState *fmstate;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ StringInfoData sql;
+ RangeTblEntry *rte;
+
+ rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state);
+ initStringInfo(&sql);
+ deparseCopyFromSql(&sql, rel);
+
+ fmstate = create_foreign_modify(mtstate->ps.state,
+ rte,
+ resultRelInfo,
+ CMD_INSERT,
+ NULL,
+ sql.data,
+ NIL,
+ false,
+ NIL);
+ fmstate->cstate = BeginForeignCopyTo(resultRelInfo->ri_RelationDesc);
+ resultRelInfo->ri_FdwState = fmstate;
+}
+
+/*
+ * postgresEndForeignCopyIn
+ * Finish an COPY operation on a foreign table
+ */
+static void
+postgresEndForeignCopyIn(EState *estate, ResultRelInfo *resultRelInfo)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+
+ /* Check correct use of CopyIn FDW API. */
+ Assert(fmstate->cstate != NULL);
+
+ EndForeignCopyTo(fmstate->cstate);
+ pfree(fmstate->cstate);
+ fmstate->cstate = NULL;
+ finish_foreign_modify(fmstate);
+}
+
+/*
+ *
+ * postgresExecForeignCopyIn
+ * Send a number of tuples to the foreign relation.
+ */
+static TupleTableSlot *
+postgresExecForeignCopyIn(ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots, int nslots)
+{
+ PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+ PGresult *res;
+ PGconn *conn = fmstate->conn;
+ bool status = false;
+ int i;
+
+ /* Check correct use of CopyIn FDW API. */
+ Assert(fmstate->cstate != NULL);
+
+ res = PQexec(conn, fmstate->query);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+ PQclear(res);
+
+ PG_TRY();
+ {
+ for (i = 0; i < nslots; i++)
+ {
+ char *buf = NextForeignCopyRow(fmstate->cstate, slots[i]);
+
+ if (PQputCopyData(conn, buf, strlen(buf)) <= 0)
+ {
+ res = PQgetResult(conn);
+ pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+ }
+ }
+
+ status = true;
+ }
+ PG_FINALLY();
+ {
+ /* Finish COPY IN protocol. It is needed to do after successful copy or
+ * after an error.
+ */
+ if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 ||
+ PQflush(conn))
+ ereport(ERROR,
+ (errmsg("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(conn))));
+
+ /* After successfully sending an EOF signal, check command status. */
+ res = PQgetResult(conn);
+ if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
+ (status && PQresultStatus(res) != PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+ PQclear(res);
+ /* Do this to ensure we've pumped libpq back to idle state */
+ if (PQgetResult(conn) != NULL)
+ ereport(ERROR,
+ (errmsg("unexpected extra results during COPY of table: %s",
+ PQerrorMessage(conn))));
+
+ if (!status)
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ return NULL;
+}
+
/*
* postgresIsForeignRelUpdatable
* Determine whether a foreign table supports INSERT, UPDATE and/or
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..73f98a3152 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2293,6 +2293,34 @@ drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+1 foo
+2 bar
+\.
+
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
-- test COPY FROM with foreign table created in the same transaction
create table loc3 (f1 int, f2 text);
begin;
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3e199bdfd0..7338c63fe5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -133,6 +133,7 @@ typedef struct CopyStateData
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
copy_data_source_cb data_source_cb; /* function for reading data */
+ copy_data_dest_cb data_dest_cb;
bool binary; /* binary format? */
bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
@@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
+ copy_data_dest_cb data_dest_cb, List *attnamelist,
+ List *options);
static void EndCopyTo(CopyState cstate);
+static void CopyToStart(CopyState cstate);
+static void CopyToFinish(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
@@ -586,7 +590,9 @@ CopySendEndOfRow(CopyState cstate)
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_CALLBACK:
- Assert(false); /* Not yet supported. */
+ CopySendChar(cstate, '\n');
+ CopySendChar(cstate, '\0');
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
break;
}
@@ -1074,7 +1080,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
else
{
cstate = BeginCopyTo(pstate, rel, query, relid,
- stmt->filename, stmt->is_program,
+ stmt->filename, stmt->is_program, NULL,
stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
@@ -1814,6 +1820,32 @@ EndCopy(CopyState cstate)
pfree(cstate);
}
+static char *buf = NULL;
+static void
+data_dest_cb(void *outbuf, int len)
+{
+ buf = (char *) palloc(len);
+ memcpy(buf, (char *) outbuf, len);
+}
+
+CopyState
+BeginForeignCopyTo(Relation rel)
+{
+ CopyState cstate;
+
+ cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL);
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ CopyToStart(cstate);
+ return cstate;
+}
+
+void
+EndForeignCopyTo(CopyState cstate)
+{
+ CopyToFinish(cstate);
+}
+
/*
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
@@ -1824,6 +1856,7 @@ BeginCopyTo(ParseState *pstate,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
@@ -1879,6 +1912,11 @@ BeginCopyTo(ParseState *pstate,
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
+ else if (data_dest_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
else
{
cstate->filename = pstrdup(filename);
@@ -1949,6 +1987,13 @@ BeginCopyTo(ParseState *pstate,
return cstate;
}
+char *
+NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot)
+{
+ CopyOneRowTo(cstate, slot);
+ return buf;
+}
+
/*
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
@@ -1965,7 +2010,9 @@ DoCopyTo(CopyState cstate)
if (fe_copy)
SendCopyBegin(cstate);
+ CopyToStart(cstate);
processed = CopyTo(cstate);
+ CopyToFinish(cstate);
if (fe_copy)
SendCopyEnd(cstate);
@@ -2004,16 +2051,12 @@ EndCopyTo(CopyState cstate)
EndCopy(cstate);
}
-/*
- * Copy from relation or query TO file.
- */
-static uint64
-CopyTo(CopyState cstate)
+static void
+CopyToStart(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
ListCell *cur;
- uint64 processed;
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
@@ -2103,6 +2146,29 @@ CopyTo(CopyState cstate)
CopySendEndOfRow(cstate);
}
}
+}
+
+static void
+CopyToFinish(CopyState cstate)
+{
+ if (cstate->binary)
+ {
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+ }
+
+ MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyState cstate)
+{
+ uint64 processed;
if (cstate->rel)
{
@@ -2134,17 +2200,6 @@ CopyTo(CopyState cstate)
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
-
- if (cstate->binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
-
- MemoryContextDelete(cstate->rowcontext);
-
return processed;
}
@@ -2444,53 +2499,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
-
- for (i = 0; i < nused; i++)
+ if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /* Flush into foreign table or partition */
+ resultRelInfo->ri_FdwRoutine->ExecForeignCopyIn(resultRelInfo,
+ slots,
+ nused);
+ }
+ else
{
/*
- * If there are any indexes, update them for all the inserted tuples,
- * and run AFTER ROW INSERT triggers.
+ * table_multi_insert may leak memory, so switch to short-lived memory
+ * context before calling it.
*/
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
- NIL);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ table_multi_insert(resultRelInfo->ri_RelationDesc,
+ slots,
+ nused,
+ mycid,
+ ti_options,
+ buffer->bistate);
+ MemoryContextSwitchTo(oldcontext);
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ for (i = 0; i < nused; i++)
{
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL, cstate->transition_capture);
- }
+ /*
+ * If there are any indexes, update them for all the inserted tuples,
+ * and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[i];
+ recheckIndexes =
+ ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+ NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], recheckIndexes,
+ cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
+
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT
+ * triggers anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[i];
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], NIL, cstate->transition_capture);
+ }
- ExecClearTuple(slots[i]);
+ ExecClearTuple(slots[i]);
+ }
}
/* Mark that all slots are free */
@@ -2800,11 +2866,6 @@ CopyFrom(CopyState cstate)
mtstate->operation = CMD_INSERT;
mtstate->resultRelInfo = estate->es_result_relations;
- if (resultRelInfo->ri_FdwRoutine != NULL &&
- resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
- resultRelInfo);
-
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -2863,14 +2924,13 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
- else if (resultRelInfo->ri_FdwRoutine != NULL ||
- cstate->volatile_defexprs)
+ else if (cstate->volatile_defexprs || list_length(cstate->attnumlist) == 0)
{
/*
- * Can't support multi-inserts to foreign tables or if there are any
- * volatile default expressions in the table. Similarly to the
- * trigger case above, such expressions may query the table we're
- * inserting into.
+ * Can't support bufferization of copy into foreign tables without any
+ * defined columns or if there are any volatile default expressions in the
+ * table. Similarly to the trigger case above, such expressions may query
+ * the table we're inserting into.
*
* Note: It does not matter if any partitions have any volatile
* default expressions as we use the defaults from the target of the
@@ -2910,6 +2970,24 @@ CopyFrom(CopyState cstate)
estate, mycid, ti_options);
}
+ if (insertMethod != CIM_SINGLE)
+ resultRelInfo->UseBulkModifying = true;
+
+ /*
+ * Init COPY into foreign table. Initialization of copying into foreign
+ * partitions will be done later.
+ */
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->UseBulkModifying &&
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignCopyIn != NULL)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignCopyIn(mtstate,
+ resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
+ resultRelInfo);
+ }
+
/*
* If not using batch mode (which allocates slots as needed) set up a
* tuple slot too. When inserting into a partitioned table, we also need
@@ -3033,7 +3111,7 @@ CopyFrom(CopyState cstate)
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
!has_before_insert_row_trig &&
!has_instead_insert_row_trig &&
- resultRelInfo->ri_FdwRoutine == NULL;
+ (resultRelInfo->ri_FdwRoutine == NULL || resultRelInfo->UseBulkModifying);
/* Set the multi-insert buffer to use for this partition. */
if (leafpart_use_multi_insert)
@@ -3292,10 +3370,17 @@ CopyFrom(CopyState cstate)
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
- if (target_resultRelInfo->ri_FdwRoutine != NULL &&
- target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
- target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
- target_resultRelInfo);
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->UseBulkModifying &&
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopyIn != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopyIn(estate,
+ target_resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+ target_resultRelInfo);
+ target_resultRelInfo->UseBulkModifying = false;
+ }
/* Tear down the multi-insert buffer data */
if (insertMethod != CIM_SINGLE)
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4fdffad6f3..d3e8f1c720 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1345,6 +1345,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_PartitionRoot = partition_root;
resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+ resultRelInfo->UseBulkModifying = false;
}
/*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index fb6ce49056..c216296811 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -526,6 +526,11 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
rootrel,
estate->es_instrument);
+ if (rootResultRelInfo->UseBulkModifying &&
+ leaf_part_rri->ri_FdwRoutine != NULL &&
+ leaf_part_rri->ri_FdwRoutine->BeginForeignCopyIn != NULL)
+ leaf_part_rri->UseBulkModifying = true;
+
/*
* Verify result relation is a valid target for an INSERT. An UPDATE of a
* partition-key becomes a DELETE+INSERT operation, so this check is still
@@ -937,9 +942,16 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
* If the partition is a foreign table, let the FDW init itself for
* routing tuples to the partition.
*/
- if (partRelInfo->ri_FdwRoutine != NULL &&
- partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+ if (partRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (partRelInfo->UseBulkModifying)
+ {
+ Assert(partRelInfo->ri_FdwRoutine->BeginForeignCopyIn != NULL);
+ partRelInfo->ri_FdwRoutine->BeginForeignCopyIn(mtstate, partRelInfo);
+ }
+ else if (partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+ }
partRelInfo->ri_PartitionInfo = partrouteinfo;
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
@@ -1121,10 +1133,18 @@ ExecCleanupTupleRouting(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo = proute->partitions[i];
/* Allow any FDWs to shut down */
- if (resultRelInfo->ri_FdwRoutine != NULL &&
- resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
- resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
- resultRelInfo);
+ if (resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (resultRelInfo->UseBulkModifying)
+ {
+ Assert(resultRelInfo->ri_FdwRoutine->EndForeignCopyIn != NULL);
+ resultRelInfo->ri_FdwRoutine->EndForeignCopyIn(mtstate->ps.state,
+ resultRelInfo);
+ }
+ else if (resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+ resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
+ resultRelInfo);
+ }
/*
* Check if this result rel is one belonging to the node's subplans,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..ef119a761a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate);
extern DestReceiver *CreateCopyDestReceiver(void);
+extern CopyState BeginForeignCopyTo(Relation rel);
+extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern void EndForeignCopyTo(CopyState cstate);
+
#endif /* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..073eeee2ca 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -104,6 +104,16 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
typedef void (*EndForeignInsert_function) (EState *estate,
ResultRelInfo *rinfo);
+typedef void (*BeginForeignCopyIn_function) (ModifyTableState *mtstate,
+ ResultRelInfo *rinfo);
+
+typedef void (*EndForeignCopyIn_function) (EState *estate,
+ ResultRelInfo *rinfo);
+
+typedef TupleTableSlot *(*ExecForeignCopyIn_function) (ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ int nslots);
+
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -220,6 +230,11 @@ typedef struct FdwRoutine
IterateDirectModify_function IterateDirectModify;
EndDirectModify_function EndDirectModify;
+ /* COPY a bulk of tuples into a foreign relation */
+ BeginForeignCopyIn_function BeginForeignCopyIn;
+ EndForeignCopyIn_function EndForeignCopyIn;
+ ExecForeignCopyIn_function ExecForeignCopyIn;
+
/* Functions for SELECT FOR UPDATE/SHARE row locking */
GetForeignRowMarkType_function GetForeignRowMarkType;
RefetchForeignRow_function RefetchForeignRow;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 0187989fd1..8ac366a659 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -491,6 +491,14 @@ typedef struct ResultRelInfo
/* For use by copy.c when performing multi-inserts */
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+
+ /*
+ * For use by copy.c:
+ * for partitioned relation "true" means that child relations are allowed for
+ * using bulk modify operations; for foreign relation (or foreign partition
+ * of) "true" value means that modify operations must use bulk FDW API.
+ */
+ bool UseBulkModifying;
} ResultRelInfo;
/* ----------------
--
2.17.1