Hi Andrey, Thanks for this work. I have been reading through your patch and here's a what I understand it does and how:
The patch aims to fix the restriction that COPYing into a foreign table can't use multi-insert buffer mechanism effectively. That's because copy.c currently uses the ExecForeignInsert() FDW API which can be passed only 1 row at a time. postgres_fdw's implementation issues an `INSERT INTO remote_table VALUES (...)` statement to the remote side for each row, which is pretty inefficient for bulk loads. The patch introduces a new FDW API ExecForeignCopyIn() that can receive multiple rows and copy.c now calls it every time it flushes the multi-insert buffer so that all the flushed rows can be sent to the remote side in one go. postgres_fdw's now issues a `COPY remote_table FROM STDIN` to the remote server and postgresExecForeignCopyIn() funnels the tuples flushed by the local copy to the server side waiting for tuples on the COPY protocol. Here are some comments on the patch. * Why the "In" in these API names? + /* COPY a bulk of tuples into a foreign relation */ + BeginForeignCopyIn_function BeginForeignCopyIn; + EndForeignCopyIn_function EndForeignCopyIn; + ExecForeignCopyIn_function ExecForeignCopyIn; * fdwhandler.sgml should be updated with the description of these new APIs. * As far as I can tell, the following copy.h additions are for an FDW to use copy.c to obtain an external representation (char string) to send to the remote side of the individual rows that are passed to ExecForeignCopyIn(): +typedef void (*copy_data_dest_cb) (void *outbuf, int len); +extern CopyState BeginForeignCopyTo(Relation rel); +extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern void EndForeignCopyTo(CopyState cstate); So, an FDW's ExecForeignCopyIn() calls copy.c: NextForeignCopyRow() which in turn calls copy.c: CopyOneRowTo() which fills CopyState.fe_msgbuf. The data_dest_cb() callback that runs after fe_msgbuf contains the full row simply copies it into a palloc'd char buffer whose pointer is returned back to ExecForeignCopyIn(). I wonder why not let FDWs implement the callback and pass it to copy.c through BeginForeignCopyTo()? For example, you could implement a pgfdw_copy_data_dest_cb() in postgres_fdw.c which gets a direct pointer of fe_msgbuf to send it to the remote server. Do you think all FDWs would want to use copy,c like above? If not, maybe the above APIs are really postgres_fdw-specific? Anyway, adding comments above the definitions of these functions would be helpful. * I see that the remote copy is performed from scratch on every call of postgresExecForeignCopyIn(), but wouldn't it be more efficient to send the `COPY remote_table FROM STDIN` in postgresBeginForeignCopyIn() and end it in postgresEndForeignCopyIn() when there are no errors during the copy? I tried implementing these two changes -- pgfdw_copy_data_dest_cb() and sending `COPY remote_table FROM STDIN` only once instead of on every flush -- and I see significant speedup. Please check the attached patch that applies on top of yours. One problem I spotted when trying my patch but didn't spend much time debugging is that local COPY cannot be interrupted by Ctrl+C anymore, but that should be fixable by adjusting PG_TRY() blocks. * ResultRelInfo.UseBulkModifying should be ri_usesBulkModify for consistency. -- Amit Langote EnterpriseDB: http://www.enterprisedb.com
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 0db8d74..5668977 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2063,6 +2063,21 @@ postgresEndForeignInsert(EState *estate, finish_foreign_modify(fmstate); } +static PgFdwModifyState *copy_fmstate = NULL; + +static void +pgfdw_copy_dest_cb(void *buf, int len) +{ + PGconn *conn = copy_fmstate->conn; + + if (PQputCopyData(conn, (char *) buf, len) <= 0) + { + PGresult *res = PQgetResult(conn); + + pgfdw_report_error(ERROR, res, conn, true, copy_fmstate->query); + } +} + /* * * postgresBeginForeignCopyIn @@ -2076,6 +2091,8 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate, Relation rel = resultRelInfo->ri_RelationDesc; StringInfoData sql; RangeTblEntry *rte; + PGconn *conn; + PGresult *res; rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state); initStringInfo(&sql); @@ -2090,8 +2107,16 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate, NIL, false, NIL); - fmstate->cstate = BeginForeignCopyTo(resultRelInfo->ri_RelationDesc); + fmstate->cstate = BeginForeignCopyTo(rel, pgfdw_copy_dest_cb); resultRelInfo->ri_FdwState = fmstate; + + conn = fmstate->conn; + res = PQexec(conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COPY_IN) + pgfdw_report_error(ERROR, res, conn, true, fmstate->query); + PQclear(res); + + copy_fmstate = fmstate; } /* @@ -2102,14 +2127,40 @@ static void postgresEndForeignCopyIn(EState *estate, ResultRelInfo *resultRelInfo) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + PGconn *conn = fmstate->conn; + PGresult *res; /* Check correct use of CopyIn FDW API. */ Assert(fmstate->cstate != NULL); + /* + * Finish COPY IN protocol. It is needed to do after successful copy or + * after an error. + */ + if (PQputCopyEnd(conn, NULL) <= 0 || + PQflush(conn)) + ereport(ERROR, + (errmsg("error returned by PQputCopyEnd: %s", + PQerrorMessage(conn)))); + + /* After successfully sending an EOF signal, check command status. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + PQclear(res); + + /* Do this to ensure we've pumped libpq back to idle state */ + if (PQgetResult(conn) != NULL) + ereport(ERROR, + (errmsg("unexpected extra results during COPY of table: %s", + PQerrorMessage(conn)))); + EndForeignCopyTo(fmstate->cstate); pfree(fmstate->cstate); fmstate->cstate = NULL; finish_foreign_modify(fmstate); + + copy_fmstate = NULL; } /* @@ -2122,58 +2173,21 @@ postgresExecForeignCopyIn(ResultRelInfo *resultRelInfo, TupleTableSlot **slots, int nslots) { PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState; - PGresult *res; - PGconn *conn = fmstate->conn; bool status = false; int i; /* Check correct use of CopyIn FDW API. */ Assert(fmstate->cstate != NULL); - res = PQexec(conn, fmstate->query); - if (PQresultStatus(res) != PGRES_COPY_IN) - pgfdw_report_error(ERROR, res, conn, true, fmstate->query); - PQclear(res); - PG_TRY(); { for (i = 0; i < nslots; i++) - { - char *buf = NextForeignCopyRow(fmstate->cstate, slots[i]); - - if (PQputCopyData(conn, buf, strlen(buf)) <= 0) - { - res = PQgetResult(conn); - pgfdw_report_error(ERROR, res, conn, true, fmstate->query); - } - } + NextForeignCopyRow(fmstate->cstate, slots[i]); status = true; } PG_FINALLY(); { - /* Finish COPY IN protocol. It is needed to do after successful copy or - * after an error. - */ - if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 || - PQflush(conn)) - ereport(ERROR, - (errmsg("error returned by PQputCopyEnd: %s", - PQerrorMessage(conn)))); - - /* After successfully sending an EOF signal, check command status. */ - res = PQgetResult(conn); - if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) || - (status && PQresultStatus(res) != PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - - PQclear(res); - /* Do this to ensure we've pumped libpq back to idle state */ - if (PQgetResult(conn) != NULL) - ereport(ERROR, - (errmsg("unexpected extra results during COPY of table: %s", - PQerrorMessage(conn)))); - if (!status) PG_RE_THROW(); } diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f5f1d40..51b7233 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -594,7 +594,6 @@ CopySendEndOfRow(CopyState cstate) break; case COPY_CALLBACK: CopySendChar(cstate, '\n'); - CopySendChar(cstate, '\0'); cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -1823,16 +1822,8 @@ EndCopy(CopyState cstate) pfree(cstate); } -static char *buf = NULL; -static void -data_dest_cb(void *outbuf, int len) -{ - buf = (char *) palloc(len); - memcpy(buf, (char *) outbuf, len); -} - CopyState -BeginForeignCopyTo(Relation rel) +BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb) { CopyState cstate; @@ -1990,11 +1981,10 @@ BeginCopyTo(ParseState *pstate, return cstate; } -char * +void NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot) { CopyOneRowTo(cstate, slot); - return buf; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index ef119a7..f31ed13 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -42,8 +42,8 @@ extern uint64 CopyFrom(CopyState cstate); extern DestReceiver *CreateCopyDestReceiver(void); -extern CopyState BeginForeignCopyTo(Relation rel); -extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); +extern CopyState BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb); +extern void NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot); extern void EndForeignCopyTo(CopyState cstate); #endif /* COPY_H */