Hi Andrey,

Thanks for this work.  I have been reading through your patch and
here's a what I understand it does and how:

The patch aims to fix the restriction that COPYing into a foreign
table can't use multi-insert buffer mechanism effectively.  That's
because copy.c currently uses the ExecForeignInsert() FDW API which
can be passed only 1 row at a time.  postgres_fdw's implementation
issues an `INSERT INTO remote_table VALUES (...)` statement to the
remote side for each row, which is pretty inefficient for bulk loads.
The patch introduces a new FDW API ExecForeignCopyIn() that can
receive multiple rows and copy.c now calls it every time it flushes
the multi-insert buffer so that all the flushed rows can be sent to
the remote side in one go.  postgres_fdw's now issues a `COPY
remote_table FROM STDIN` to the remote server and
postgresExecForeignCopyIn() funnels the tuples flushed by the local
copy to the server side waiting for tuples on the COPY protocol.

Here are some comments on the patch.

* Why the "In" in these API names?

+   /* COPY a bulk of tuples into a foreign relation */
+   BeginForeignCopyIn_function BeginForeignCopyIn;
+   EndForeignCopyIn_function EndForeignCopyIn;
+   ExecForeignCopyIn_function ExecForeignCopyIn;

* fdwhandler.sgml should be updated with the description of these new APIs.

* As far as I can tell, the following copy.h additions are for an FDW
to use copy.c to obtain an external representation (char string) to
send to the remote side of the individual rows that are passed to
ExecForeignCopyIn():

+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
+extern CopyState BeginForeignCopyTo(Relation rel);
+extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern void EndForeignCopyTo(CopyState cstate);

So, an FDW's ExecForeignCopyIn() calls copy.c: NextForeignCopyRow()
which in turn calls copy.c: CopyOneRowTo() which fills
CopyState.fe_msgbuf.  The data_dest_cb() callback that runs after
fe_msgbuf contains the full row simply copies it into a palloc'd char
buffer whose pointer is returned back to ExecForeignCopyIn().  I
wonder why not let FDWs implement the callback and pass it to copy.c
through BeginForeignCopyTo()?  For example, you could implement a
pgfdw_copy_data_dest_cb() in postgres_fdw.c which gets a direct
pointer of fe_msgbuf to send it to the remote server.

Do you think all FDWs would want to use copy,c like above?  If not,
maybe the above APIs are really postgres_fdw-specific?  Anyway, adding
comments above the definitions of these functions would be helpful.

* I see that the remote copy is performed from scratch on every call
of postgresExecForeignCopyIn(), but wouldn't it be more efficient to
send the `COPY remote_table FROM STDIN` in
postgresBeginForeignCopyIn() and end it in postgresEndForeignCopyIn()
when there are no errors during the copy?

I tried implementing these two changes -- pgfdw_copy_data_dest_cb()
and sending `COPY remote_table FROM STDIN` only once instead of on
every flush -- and I see significant speedup.  Please check the
attached patch that applies on top of yours.  One problem I spotted
when trying my patch but didn't spend much time debugging is that
local COPY cannot be interrupted by Ctrl+C anymore, but that should be
fixable by adjusting PG_TRY() blocks.

* ResultRelInfo.UseBulkModifying should be ri_usesBulkModify for consistency.

--
Amit Langote
EnterpriseDB: http://www.enterprisedb.com
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 0db8d74..5668977 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2063,6 +2063,21 @@ postgresEndForeignInsert(EState *estate,
 	finish_foreign_modify(fmstate);
 }
 
+static PgFdwModifyState *copy_fmstate = NULL;
+
+static void
+pgfdw_copy_dest_cb(void *buf, int len)
+{
+	PGconn *conn = copy_fmstate->conn;
+
+	if (PQputCopyData(conn, (char *) buf, len) <= 0)
+	{
+		PGresult *res = PQgetResult(conn);
+
+		pgfdw_report_error(ERROR, res, conn, true, copy_fmstate->query);
+	}
+}
+
 /*
  *
  * postgresBeginForeignCopyIn
@@ -2076,6 +2091,8 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate,
 	Relation	rel = resultRelInfo->ri_RelationDesc;
 	StringInfoData sql;
 	RangeTblEntry *rte;
+	PGconn *conn;
+	PGresult *res;
 
 	rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state);
 	initStringInfo(&sql);
@@ -2090,8 +2107,16 @@ postgresBeginForeignCopyIn(ModifyTableState *mtstate,
 									NIL,
 									false,
 									NIL);
-	fmstate->cstate = BeginForeignCopyTo(resultRelInfo->ri_RelationDesc);
+	fmstate->cstate = BeginForeignCopyTo(rel, pgfdw_copy_dest_cb);
 	resultRelInfo->ri_FdwState = fmstate;
+
+	conn = fmstate->conn;
+	res = PQexec(conn, fmstate->query);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+	PQclear(res);
+
+	copy_fmstate = fmstate;
 }
 
 /*
@@ -2102,14 +2127,40 @@ static void
 postgresEndForeignCopyIn(EState *estate, ResultRelInfo *resultRelInfo)
 {
 	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	PGconn *conn = fmstate->conn;
+	PGresult *res;
 
 	/* Check correct use of CopyIn FDW API. */
 	Assert(fmstate->cstate != NULL);
 
+	/*
+	 * Finish COPY IN protocol. It is needed to do after successful copy or
+	 * after an error.
+	 */
+	if (PQputCopyEnd(conn, NULL) <= 0 ||
+		PQflush(conn))
+		ereport(ERROR,
+				(errmsg("error returned by PQputCopyEnd: %s",
+						PQerrorMessage(conn))));
+
+	/* After successfully  sending an EOF signal, check command status. */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+	PQclear(res);
+
+	/* Do this to ensure we've pumped libpq back to idle state */
+	if (PQgetResult(conn) != NULL)
+		ereport(ERROR,
+				(errmsg("unexpected extra results during COPY of table: %s",
+						PQerrorMessage(conn))));
+
 	EndForeignCopyTo(fmstate->cstate);
 	pfree(fmstate->cstate);
 	fmstate->cstate = NULL;
 	finish_foreign_modify(fmstate);
+
+	copy_fmstate = NULL;
 }
 
 /*
@@ -2122,58 +2173,21 @@ postgresExecForeignCopyIn(ResultRelInfo *resultRelInfo,
 						  TupleTableSlot **slots, int nslots)
 {
 	PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
-	PGresult *res;
-	PGconn *conn = fmstate->conn;
 	bool status = false;
 	int i;
 
 	/* Check correct use of CopyIn FDW API. */
 	Assert(fmstate->cstate != NULL);
 
-	res = PQexec(conn, fmstate->query);
-	if (PQresultStatus(res) != PGRES_COPY_IN)
-		pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
-	PQclear(res);
-
 	PG_TRY();
 	{
 		for (i = 0; i < nslots; i++)
-		{
-			char *buf = NextForeignCopyRow(fmstate->cstate, slots[i]);
-
-			if (PQputCopyData(conn, buf, strlen(buf)) <= 0)
-			{
-				res = PQgetResult(conn);
-				pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
-			}
-		}
+			NextForeignCopyRow(fmstate->cstate, slots[i]);
 
 		status = true;
 	}
 	PG_FINALLY();
 	{
-		/* Finish COPY IN protocol. It is needed to do after successful copy or
-		 * after an error.
-		 */
-		if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 ||
-			PQflush(conn))
-			ereport(ERROR,
-					(errmsg("error returned by PQputCopyEnd: %s",
-							PQerrorMessage(conn))));
-
-		/* After successfully  sending an EOF signal, check command status. */
-		res = PQgetResult(conn);
-		if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
-			(status && PQresultStatus(res) != PGRES_COMMAND_OK))
-			pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
-
-		PQclear(res);
-		/* Do this to ensure we've pumped libpq back to idle state */
-		if (PQgetResult(conn) != NULL)
-			ereport(ERROR,
-					(errmsg("unexpected extra results during COPY of table: %s",
-							PQerrorMessage(conn))));
-
 		if (!status)
 			PG_RE_THROW();
 	}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f5f1d40..51b7233 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -594,7 +594,6 @@ CopySendEndOfRow(CopyState cstate)
 			break;
 		case COPY_CALLBACK:
 			CopySendChar(cstate, '\n');
-			CopySendChar(cstate, '\0');
 			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
 			break;
 	}
@@ -1823,16 +1822,8 @@ EndCopy(CopyState cstate)
 	pfree(cstate);
 }
 
-static char *buf = NULL;
-static void
-data_dest_cb(void *outbuf, int len)
-{
-	buf = (char *) palloc(len);
-	memcpy(buf, (char *) outbuf, len);
-}
-
 CopyState
-BeginForeignCopyTo(Relation rel)
+BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb)
 {
 	CopyState cstate;
 
@@ -1990,11 +1981,10 @@ BeginCopyTo(ParseState *pstate,
 	return cstate;
 }
 
-char *
+void
 NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot)
 {
 	CopyOneRowTo(cstate, slot);
-	return buf;
 }
 
 /*
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index ef119a7..f31ed13 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -42,8 +42,8 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
-extern CopyState BeginForeignCopyTo(Relation rel);
-extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern CopyState BeginForeignCopyTo(Relation rel, copy_data_dest_cb data_dest_cb);
+extern void NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
 extern void EndForeignCopyTo(CopyState cstate);
 
 #endif							/* COPY_H */

Reply via email to