Hi, thank you for reviewing this patch!

On Sat Jan 3, 2026 at 9:45 AM -03, Dewei Dai wrote:
> 1 - in function `execute_foreign_modify_using_copy`
>   The `res` object obtained from the first call to `pgfdw_get_result`
>   is not freed, maybe you can use `PQclear` to release it
>
Fixed.

> 2 - in function `execute_foreign_modify_using_copy`
>   After using `copy_data`,it appears it can be released by
>     calling `destroyStringInfo`.
>
I'm wondering if we should call destroyStringInfo(&copy_data) or
pfree(copy_data.data). Other functions use the pfree version, so I
decided to use the same.

(I actually tried to use destroyStringInfo() but the postgres_fdw tests
kept running for longer than usual, so I think that using the pfree is
correct)

> 3 - in function `convert_slot_to_copy_text`
>   The value returned by the OutputFunctionCall function can be
>     freed by calling pfree
>   
We have other calls to OutputFunctionCall() on postgres_fdw.c and I'm
not seeing a subsequent call to pfree. IIUC the returned valued will be
allocated on the current memory context which will be free at the end of
query execution, so I don't think that a pfree here is necessary, or I'm
missing something?

> 4 - in function `execute_foreign_modify_using_copy`
> ```Send initial COPY data if the buffer reach the limit to avoid large
> ```
> Typo: reach ->  reaches 
>
Fixed

--
Matheus Alcantara
EDB: https://www.enterprisedb.com

From f4dcd9d836137589c8345b74cb24ab3e7dc18eeb Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 26 Nov 2025 16:34:46 -0300
Subject: [PATCH v9] postgres_fdw: speed up batch inserts using COPY

This commit include a new foreign table/server option
"batch_with_copy_threshold" that enable the usage of the COPY command to
speed up batch inserts when a COPY FROM or an insert into a table
partition that is a foreign table is executed. In both cases the
BeginForeignInsert fdw routine is called, so this new option is
retrieved only on this routine. For the other cases that use the
ForeignModify routines still use the INSERT as a remote SQL.

Note that the COPY will only be used for batch inserts and only if the
current number of rows being inserted on the batch operation is >=
batch_with_copy_threshold. If batch_size=100, batch_with_copy_threshold=50
and number of rows being inserted is 120 the first 100 rows will be
inserted using the COPY command and the remaining 20 rows will be
inserted using INSERT statement because it did not reach the copy
threshold.
---
 contrib/postgres_fdw/deparse.c                |  35 +++
 .../postgres_fdw/expected/postgres_fdw.out    |  26 +++
 contrib/postgres_fdw/option.c                 |   6 +-
 contrib/postgres_fdw/postgres_fdw.c           | 215 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  23 ++
 6 files changed, 303 insertions(+), 3 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ebe2c3a596a..78335db1889 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2236,6 +2236,41 @@ rebuildInsertSql(StringInfo buf, Relation rel,
        appendStringInfoString(buf, orig_query + values_end_len);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+deparseCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       bool            first = true;
+       int                     nattrs = list_length(target_attrs);
+
+       appendStringInfo(buf, "COPY ");
+       deparseRelation(buf, rel);
+       if (nattrs > 0)
+               appendStringInfo(buf, "(");
+
+       foreach_int(attnum, target_attrs)
+       {
+               Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+
+               first = false;
+
+               appendStringInfoString(buf, 
quote_identifier(NameStr(attr->attname)));
+       }
+       if (nattrs > 0)
+               appendStringInfoString(buf, ") FROM STDIN");
+       else
+               appendStringInfoString(buf, " FROM STDIN");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index 6066510c7c0..8bbc27b7b3b 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -9215,6 +9215,19 @@ with result as (insert into itrtest values (1, 'test1'), 
(2, 'test2') returning
 
 drop trigger loct1_br_insert_trigger on loct1;
 drop trigger loct2_br_insert_trigger on loct2;
+-- Test batch insert using COPY with batch_with_copy_threshold
+delete from itrtest;
+alter server loopback options (add batch_with_copy_threshold '2', batch_size 
'3');
+insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3');
+select * from itrtest;
+ a |   b   
+---+-------
+ 1 | test1
+ 2 | test2
+ 2 | test3
+(3 rows)
+
+alter server loopback options (drop batch_with_copy_threshold, drop 
batch_size);
 drop table itrtest;
 drop table loct1;
 drop table loct2;
@@ -9524,6 +9537,19 @@ select * from rem2;
   2 | bar
 (2 rows)
 
+delete from rem2;
+-- Test COPY with batch_with_copy_threshold
+alter foreign table rem2 options (add batch_with_copy_threshold '2');
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+select * from rem2;
+ f1 | f2  
+----+-----
+  1 | foo
+  2 | bar
+  3 | baz
+(3 rows)
+
 delete from rem2;
 -- Test check constraints
 alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index b0bd72d1e58..4545c2f9ba1 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -157,7 +157,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
                        (void) ExtractExtensionList(defGetString(def), true);
                }
                else if (strcmp(def->defname, "fetch_size") == 0 ||
-                                strcmp(def->defname, "batch_size") == 0)
+                                strcmp(def->defname, "batch_size") == 0 ||
+                                strcmp(def->defname, 
"batch_with_copy_threshold") == 0)
                {
                        char       *value;
                        int                     int_val;
@@ -263,6 +264,9 @@ InitPgFdwOptions(void)
                /* batch_size is available on both server and table */
                {"batch_size", ForeignServerRelationId, false},
                {"batch_size", ForeignTableRelationId, false},
+               /* batch_with_copy_threshold is available on both server and 
table */
+               {"batch_with_copy_threshold", ForeignServerRelationId, false},
+               {"batch_with_copy_threshold", ForeignTableRelationId, false},
                /* async_capable is available on both server and table */
                {"async_capable", ForeignServerRelationId, false},
                {"async_capable", ForeignTableRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 3572689e33b..2fb95167a1c 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -198,6 +201,10 @@ typedef struct PgFdwModifyState
        bool            has_returning;  /* is there a RETURNING clause? */
        List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING 
*/
 
+       /* COPY usage stuff */
+       int                     batch_with_copy_threshold;      /* value of FDW 
option */
+       char       *cmd_copy;           /* COPY statement */
+
        /* info about parameters for prepared statement */
        AttrNumber      ctidAttno;              /* attnum of input resjunk ctid 
column */
        int                     p_nums;                 /* number of parameters 
to transmit */
@@ -545,6 +552,10 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                                                          const 
PgFdwRelationInfo *fpinfo_o,
                                                          const 
PgFdwRelationInfo *fpinfo_i);
 static int     get_batch_size_option(Relation rel);
+static int     get_batch_with_copy_threshold(Relation rel);
+static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState 
*fmstate,
+                                                                               
                                  TupleTableSlot **slots,
+                                                                               
                                  int *numSlots);
 
 
 /*
@@ -2013,8 +2024,30 @@ postgresExecForeignBatchInsert(EState *estate,
         */
        if (fmstate->aux_fmstate)
                resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
-       rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
-                                                                  slots, 
planSlots, numSlots);
+
+       /*
+        * Check if "batch_with_copy_threshold" is enable (> 0) and if the COPY
+        * can be used based on the number of rows being inserted on this batch.
+        * The original query also should not have a RETURNING clause.
+        */
+       if (fmstate->batch_with_copy_threshold > 0 &&
+               fmstate->batch_with_copy_threshold <= *numSlots &&
+               !fmstate->has_returning)
+       {
+               if (fmstate->cmd_copy == NULL)
+               {
+                       StringInfoData sql;
+
+                       initStringInfo(&sql);
+                       deparseCopySql(&sql, fmstate->rel, 
fmstate->target_attrs);
+                       fmstate->cmd_copy = sql.data;
+               }
+
+               rslot = execute_foreign_modify_using_copy(fmstate, slots, 
numSlots);
+       }
+       else
+               rslot = execute_foreign_modify(estate, resultRelInfo, 
CMD_INSERT,
+                                                                          
slots, planSlots, numSlots);
        /* Revert that change */
        if (fmstate->aux_fmstate)
                resultRelInfo->ri_FdwState = fmstate;
@@ -2265,6 +2298,16 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                                                        
retrieved_attrs != NIL,
                                                                        
retrieved_attrs);
 
+
+       /*
+        * Set batch_with_copy_threshold from foreign server/table options. We 
do
+        * this outside of create_foreign_modify() because we only want to use
+        * COPY as a remote SQL when a COPY FROM on a foreign table is executed 
or
+        * an insert is being performed on a table partition. In both cases the
+        * BeginForeignInsert fdw routine is called.
+        */
+       fmstate->batch_with_copy_threshold = get_batch_with_copy_threshold(rel);
+
        /*
         * If the given resultRelInfo already has PgFdwModifyState set, it means
         * the foreign table is an UPDATE subplan result rel; in which case, 
store
@@ -4066,6 +4109,50 @@ create_foreign_modify(EState *estate,
        return fmstate;
 }
 
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+                                                 PgFdwModifyState *fmstate,
+                                                 TupleTableSlot *slot)
+{
+       TupleDesc       tupdesc = RelationGetDescr(fmstate->rel);
+       bool            first = true;
+       int                     i = 0;
+
+       foreach_int(attnum, fmstate->target_attrs)
+       {
+               CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 
1);
+               Datum           datum;
+               bool            isnull;
+
+               /* Ignore generated columns; they are set to DEFAULT */
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoCharMacro(buf, '\t');
+               first = false;
+
+               datum = slot_getattr(slot, attnum, &isnull);
+
+               if (isnull)
+                       appendStringInfoString(buf, "\\N");
+               else
+               {
+                       const char *value = 
OutputFunctionCall(&fmstate->p_flinfo[i],
+                                                                               
                   datum);
+
+                       appendStringInfoString(buf, value);
+               }
+               i++;
+       }
+
+       appendStringInfoCharMacro(buf, '\n');
+}
+
 /*
  * execute_foreign_modify
  *             Perform foreign-table modification as required, and fetch 
RETURNING
@@ -7886,3 +7973,127 @@ get_batch_size_option(Relation rel)
 
        return batch_size;
 }
+
+/*
+ * Determine COPY usage threshold for batching inserts for a given foreign
+ * table. The option specified for a table has precedence.
+ */
+static int
+get_batch_with_copy_threshold(Relation rel)
+{
+       Oid                     foreigntableid = RelationGetRelid(rel);
+       List       *options = NIL;
+       ListCell   *lc;
+       ForeignTable *table;
+       ForeignServer *server;
+
+       /*
+        * We use 0 as default, which means that COPY will not be used by 
default
+        * for batching insert.
+        */
+       int                     copy_for_batch_insert_threshold = 0;
+
+       /*
+        * Load options for table and server. We append server options after 
table
+        * options, because table options take precedence.
+        */
+       table = GetForeignTable(foreigntableid);
+       server = GetForeignServer(table->serverid);
+
+       options = list_concat(options, table->options);
+       options = list_concat(options, server->options);
+
+       /* See if either table or server specifies enable_batch_with_copy. */
+       foreach(lc, options)
+       {
+               DefElem    *def = (DefElem *) lfirst(lc);
+
+               if (strcmp(def->defname, "batch_with_copy_threshold") == 0)
+               {
+                       (void) parse_int(defGetString(def), 
&copy_for_batch_insert_threshold, 0, NULL);
+                       break;
+               }
+       }
+       return copy_for_batch_insert_threshold;
+}
+
+/*
+ * execute_foreign_modify_using_copy
+ *             Perform foreign-table modification using the COPY command.
+ */
+static TupleTableSlot **
+execute_foreign_modify_using_copy(PgFdwModifyState *fmstate,
+                                                                 
TupleTableSlot **slots,
+                                                                 int *numSlots)
+{
+       PGresult   *res;
+       StringInfoData copy_data;
+       int                     n_rows;
+       int                     i;
+
+       Assert(fmstate->cmd_copy != NULL);
+
+       /* Send COPY command */
+       if (!PQsendQuery(fmstate->conn, fmstate->cmd_copy))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy);
+
+       /* get the COPY result */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy);
+
+       /* Clean up the COPY command result */
+       PQclear(res);
+
+       /* Convert the TupleTableSlot data into a TEXT-formatted line */
+       initStringInfo(&copy_data);
+       for (i = 0; i < *numSlots; i++)
+       {
+               convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+               /*
+                * Send initial COPY data if the buffer reaches the limit to 
avoid
+                * large memory usage.
+                */
+               if (copy_data.len >= COPYBUFSIZ)
+               {
+                       if (PQputCopyData(fmstate->conn, copy_data.data, 
copy_data.len) <= 0)
+                               pgfdw_report_error(NULL, fmstate->conn, 
fmstate->cmd_copy);
+                       resetStringInfo(&copy_data);
+               }
+       }
+
+       /* Send the remaining COPY data */
+       if (copy_data.len > 0)
+       {
+               if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) 
<= 0)
+                       pgfdw_report_error(NULL, fmstate->conn, 
fmstate->cmd_copy);
+       }
+
+       pfree(copy_data.data);
+
+       /* End the COPY operation */
+       if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->cmd_copy);
+
+       /*
+        * Get the result, and check for success.
+        */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(res, fmstate->conn, fmstate->cmd_copy);
+
+       n_rows = atoi(PQcmdTuples(res));
+
+       /* And clean up */
+       PQclear(res);
+
+       MemoryContextReset(fmstate->temp_cxt);
+
+       *numSlots = n_rows;
+
+       /*
+        * Return NULL if nothing was inserted on the remote end
+        */
+       return (n_rows > 0) ? slots : NULL;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index a2bb1ff352c..fc6922ddd4f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
                                                         char *orig_query, List 
*target_attrs,
                                                         int values_end_len, 
int num_params,
                                                         int num_rows);
+extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4f7ab2ed0ac..840e97fed2f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2643,6 +2643,16 @@ with result as (insert into itrtest values (1, 'test1'), 
(2, 'test2') returning
 drop trigger loct1_br_insert_trigger on loct1;
 drop trigger loct2_br_insert_trigger on loct2;
 
+-- Test batch insert using COPY with batch_with_copy_threshold
+delete from itrtest;
+alter server loopback options (add batch_with_copy_threshold '2', batch_size 
'3');
+
+insert into itrtest values (1, 'test1'), (2, 'test2'), (2, 'test3');
+
+select * from itrtest;
+
+alter server loopback options (drop batch_with_copy_threshold, drop 
batch_size);
+
 drop table itrtest;
 drop table loct1;
 drop table loct2;
@@ -2815,6 +2825,19 @@ select * from rem2;
 
 delete from rem2;
 
+-- Test COPY with batch_with_copy_threshold
+alter foreign table rem2 options (add batch_with_copy_threshold '2');
+
+-- Insert 3 rows so that the third row fallback to normal INSERT statement path
+copy rem2 from stdin;
+1      foo
+2      bar
+3      baz
+\.
+select * from rem2;
+
+delete from rem2;
+
 -- Test check constraints
 alter table loc2 add constraint loc2_f1positive check (f1 >= 0);
 alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0);
-- 
2.51.2

Reply via email to