Thank you for the review.

On 03/03/26 16:47, Masahiko Sawada wrote:
Thank you for updating the patch! Here are some review comments:

+                Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+
+                if (attr->attgenerated)
+                        continue;
+
+                if (!first)
+                        appendStringInfoString(buf, ", ");
+
+                first = false;
+
+                appendStringInfoString(buf,
quote_identifier(NameStr(attr->attname)));

We need to take care of the 'column_name' option here. If it's set we
should not use attr->attname as it is.


Fixed

--
+        }
+        if (nattrs > 0)
+                appendStringInfoString(buf, ") FROM STDIN");
+        else
+                appendStringInfoString(buf, " FROM STDIN");
+}

It might be better to explicitly specify the format 'text'.


Fixed

---
+        if (useCopy)
+        {
+                deparseCopySql(&sql, rel, targetAttrs);
+                values_end_len = 0;            /* Keep compiler quiet */
+        }
+        else
+                deparseInsertSql(&sql, rte, resultRelation, rel,
targetAttrs, doNothing,
+
resultRelInfo->ri_WithCheckOptions,
+
resultRelInfo->ri_returningList,
+                                                 &retrieved_attrs,
&values_end_len);

I think we should consider whether it's okay to use the COPY command
even if resultRelInfo->ri_WithCheckOptions is non-NULL. As far as I
researched, it's okay as we currently don't support COPY to a view but
please consider it as well. We might want to explain it too in the
comment.


Good point, fixed.

How about initializing values_end_len with 0 at its declaration?


Fixed

---
+-- test that fdw also use COPY FROM as a remote sql
+set client_min_messages to 'log';
+
+create function insert_or_copy() returns trigger as $$
+declare query text;
+begin
+    query := current_query();
+    if query ~* '^COPY' then
+        raise notice 'COPY command';
+    elsif query ~* '^INSERT' then
+        raise notice 'INSERT command';
+    end if;
+return new;
+end;
+$$ language plpgsql;

On second thoughts, it might be okay to output the current_query() as it is.


Fixed

---
+copy grem1 from stdin;
+3
+\.

I think it would be good to have more tests, for example, checking if
the COPY command method can work properly with batch_size and
column_name options.


I've added new test cases for these cases. To test the batch_size case I've added an elog(DEBUG1) because using the trigger with current_query() log an entry for each row that we send for the foreign server, with the elog(DEBUG1) we can expect one log message for each batch operation. Please let me know if there is better ways to do this.

Please see the new attached version.

--
Matheus Alcantara
EDB: https://www.enterprisedb.com
From ebefff4fae344e4ca92ebd0aefd54a909f7e85c2 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Wed, 28 Jan 2026 19:55:48 -0300
Subject: [PATCH v12] postgres_fdw: Use COPY as remote SQL when possible

Previously when an user execute a COPY on a foreign table, postgres_fdw
send a INSERT as a remote SQL to the foreign server. This commit
introduce the ability to use the COPY command instead.

The COPY command will only be used when an user execute a COPY on a
foreign table and also the foreign table should not have triggers
because remote triggers might modify the inserted row and since COPY
does not support a RETURNING clause, we cannot synchronize the local
TupleTableSlot with those changes for use in local AFTER triggers, so if
the foreign table has any trigger INSERT will be used.

Author: Matheus Alcantara <[email protected]>
Reviewed-By: Tomas Vondra <[email protected]>
Reviewed-By: Jakub Wartak <[email protected]>
Reviewed-By: jian he <[email protected]>
Reviewed-By: Dewei Dai <[email protected]>
Reviewed-By: Masahiko Sawada <[email protected]>

Discussion: 
https://www.postgresql.org/message-id/flat/DDIZJ217OUDK.2R5WE4OGL5PTY%40gmail.com
---
 contrib/postgres_fdw/deparse.c                |  56 ++++++
 .../postgres_fdw/expected/postgres_fdw.out    |  40 +++-
 contrib/postgres_fdw/postgres_fdw.c           | 185 +++++++++++++++++-
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  49 +++++
 5 files changed, 322 insertions(+), 9 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ebe2c3a596a..c7d53a98525 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -2177,6 +2177,62 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
                                                 withCheckOptionList, 
returningList, retrieved_attrs);
 }
 
+/*
+ *  Build a COPY FROM STDIN statement using the TEXT format
+ */
+void
+deparseCopySql(StringInfo buf, Relation rel, List *target_attrs)
+{
+       Oid                     relid = RelationGetRelid(rel);
+       TupleDesc       tupdesc = RelationGetDescr(rel);
+       bool            first = true;
+       int                     nattrs = list_length(target_attrs);
+
+       appendStringInfo(buf, "COPY ");
+       deparseRelation(buf, rel);
+       if (nattrs > 0)
+               appendStringInfo(buf, "(");
+
+       foreach_int(attnum, target_attrs)
+       {
+               Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+               char       *colname;
+               List       *options;
+               ListCell   *lc;
+
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+
+               first = false;
+
+               /* Use attribute name or column_name option. */
+               colname = NameStr(attr->attname);
+               options = GetForeignColumnOptions(relid, attnum);
+               foreach(lc, options)
+               {
+                       DefElem    *def = (DefElem *) lfirst(lc);
+
+                       if (strcmp(def->defname, "column_name") == 0)
+                       {
+                               colname = defGetString(def);
+                               break;
+                       }
+               }
+
+               appendStringInfoString(buf, quote_identifier(colname));
+       }
+       if (nattrs > 0)
+               appendStringInfoString(buf, ") FROM STDIN");
+       else
+               appendStringInfoString(buf, " FROM STDIN");
+
+       appendStringInfoString(buf, " (FORMAT TEXT)");
+}
+
+
 /*
  * rebuild remote INSERT statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2ccb72c539a..4a946a76afe 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -7603,6 +7603,28 @@ select * from grem1;
 (2 rows)
 
 delete from grem1;
+-- test that fdw also use COPY FROM as a remote sql
+set client_min_messages to 'log';
+create function insert_or_copy() returns trigger as $$
+declare query text;
+begin
+    query := current_query();
+    raise notice '%', query;
+return new;
+end;
+$$ language plpgsql;
+CREATE TRIGGER trig_row_before
+BEFORE INSERT OR UPDATE OR DELETE ON gloc1
+FOR EACH ROW EXECUTE PROCEDURE insert_or_copy();
+copy grem1 from stdin;
+LOG:  received message via remote connection: NOTICE:  COPY public.gloc1(a) 
FROM STDIN (FORMAT TEXT)
+drop trigger trig_row_before on gloc1;
+reset client_min_messages;
+-- test that copy does not fail with column_name alias
+create table gloc2(xxx int);
+create foreign table grem2(a int) server loopback options(table_name 'gloc2');
+alter foreign table grem2 alter column a options (column_name 'xxx');
+copy grem2 from stdin;
 -- test batch insert
 alter server loopback options (add batch_size '10');
 explain (verbose, costs off)
@@ -7620,16 +7642,18 @@ insert into grem1 (a) values (1), (2);
 select * from gloc1;
  a | b | c 
 ---+---+---
+ 3 | 6 |  
  1 | 2 |  
  2 | 4 |  
-(2 rows)
+(3 rows)
 
 select * from grem1;
  a | b | c 
 ---+---+---
+ 3 | 6 | 9
  1 | 2 | 3
  2 | 4 | 6
-(2 rows)
+(3 rows)
 
 delete from grem1;
 -- batch insert with foreign partitions.
@@ -7654,6 +7678,12 @@ select count(*) from tab_batch_sharded;
 drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
+-- test batch insert using copy
+set client_min_messages to 'debug1';
+copy grem1 from stdin;
+DEBUG:  foreign modify with COPY batch_size: 10
+DEBUG:  foreign modify with COPY batch_size: 10
+reset client_min_messages;
 alter server loopback options (drop batch_size);
 -- ===================================================================
 -- test local triggers
@@ -9544,7 +9574,8 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
+CONTEXT:  COPY loc2, line 1: "-1       xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT)
 COPY rem2, line 1: "-1 xyzzy"
 select * from rem2;
  f1 | f2  
@@ -9701,7 +9732,8 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
+CONTEXT:  COPY loc2, line 1: "-1       xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN (FORMAT TEXT)
 COPY rem2
 select * from rem2;
  f1 | f2  
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 60d90329a65..8602f67b7dc 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -63,6 +63,9 @@ PG_MODULE_MAGIC_EXT(
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Buffer size to send COPY IN data*/
+#define COPYBUFSIZ 8192
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -198,6 +201,8 @@ typedef struct PgFdwModifyState
        bool            has_returning;  /* is there a RETURNING clause? */
        List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING 
*/
 
+       bool            use_copy;
+
        /* info about parameters for prepared statement */
        AttrNumber      ctidAttno;              /* attnum of input resjunk ctid 
column */
        int                     p_nums;                 /* number of parameters 
to transmit */
@@ -545,6 +550,12 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
                                                          const 
PgFdwRelationInfo *fpinfo_o,
                                                          const 
PgFdwRelationInfo *fpinfo_i);
 static int     get_batch_size_option(Relation rel);
+static TupleTableSlot **execute_foreign_modify_using_copy(PgFdwModifyState 
*fmstate,
+                                                                               
                                  TupleTableSlot **slots,
+                                                                               
                                  int *numSlots);
+static void convert_slot_to_copy_text(StringInfo buf,
+                                                                         
PgFdwModifyState *fmstate,
+                                                                         
TupleTableSlot *slot);
 
 
 /*
@@ -2165,11 +2176,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
        RangeTblEntry *rte;
        TupleDesc       tupdesc = RelationGetDescr(rel);
        int                     attnum;
-       int                     values_end_len;
+       int                     values_end_len = 0;
        StringInfoData sql;
        List       *targetAttrs = NIL;
        List       *retrieved_attrs = NIL;
        bool            doNothing = false;
+       bool            useCopy = false;
 
        /*
         * If the foreign table we are about to insert routed rows into is also 
an
@@ -2247,11 +2259,43 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                rte = exec_rt_fetch(resultRelation, estate);
        }
 
+       /*
+        * We can use COPY for remote inserts only if all the following 
conditions
+        * are met:
+        *
+        * Direct Execution: The command is a COPY FROM on the foreign table
+        * itself, not part of a partitioned table's tuple routing. (
+        * resultRelInfo->ri_RootResultRelInfo == NULL)
+        *
+        * No Check Options: There are no WITH CHECK OPTION constraints or
+        * Row-Level Security policies that need to be enforced locally
+        * (resultRelInfo->ri_WithCheckOptions == NIL).
+        *
+        * No Local AFTER Triggers: There are no AFTER ROW triggers defined
+        * locally on the foreign table.
+        *
+        * Remote triggers might modify the inserted row. Because the COPY
+        * protocol does not support a RETURNING clause, we cannot retrieve 
those
+        * changes to synchronize the local TupleTableSlot required by local 
AFTER
+        * triggers.
+        */
+       if (resultRelInfo->ri_RootResultRelInfo == NULL && 
resultRelInfo->ri_WithCheckOptions == NIL)
+       {
+               /* There is no RETURNING clause on COPY */
+               Assert(resultRelInfo->ri_returningList == NIL);
+
+               useCopy = (resultRelInfo->ri_TrigDesc == NULL ||
+                                  
!resultRelInfo->ri_TrigDesc->trig_insert_after_row);
+       }
+
        /* Construct the SQL command string. */
-       deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
-                                        resultRelInfo->ri_WithCheckOptions,
-                                        resultRelInfo->ri_returningList,
-                                        &retrieved_attrs, &values_end_len);
+       if (useCopy)
+               deparseCopySql(&sql, rel, targetAttrs);
+       else
+               deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, 
doNothing,
+                                                
resultRelInfo->ri_WithCheckOptions,
+                                                
resultRelInfo->ri_returningList,
+                                                &retrieved_attrs, 
&values_end_len);
 
        /* Construct an execution state. */
        fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2264,6 +2308,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                                                        
values_end_len,
                                                                        
retrieved_attrs != NIL,
                                                                        
retrieved_attrs);
+       fmstate->use_copy = useCopy;
 
        /*
         * If the given resultRelInfo already has PgFdwModifyState set, it means
@@ -4093,6 +4138,9 @@ execute_foreign_modify(EState *estate,
                   operation == CMD_UPDATE ||
                   operation == CMD_DELETE);
 
+       if (fmstate->use_copy)
+               return execute_foreign_modify_using_copy(fmstate, slots, 
numSlots);
+
        /* First, process a pending asynchronous request, if any. */
        if (fmstate->conn_state->pendingAreq)
                process_pending_request(fmstate->conn_state->pendingAreq);
@@ -7886,3 +7934,130 @@ get_batch_size_option(Relation rel)
 
        return batch_size;
 }
+
+/*
+ * execute_foreign_modify_using_copy
+ *             Perform foreign-table modification using the COPY command.
+ */
+static TupleTableSlot **
+execute_foreign_modify_using_copy(PgFdwModifyState *fmstate,
+                                                                 
TupleTableSlot **slots,
+                                                                 int *numSlots)
+{
+       PGresult   *res;
+       StringInfoData copy_data;
+       int                     n_rows;
+       int                     i;
+
+       Assert(fmstate->use_copy == true);
+
+       elog(DEBUG1, "foreign modify with COPY batch_size: %d", 
fmstate->batch_size);
+
+       /* Send COPY command */
+       if (!PQsendQuery(fmstate->conn, fmstate->query))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+
+       /* get the COPY result */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pgfdw_report_error(res, fmstate->conn, fmstate->query);
+
+       /* Clean up the COPY command result */
+       PQclear(res);
+
+       /* Convert the TupleTableSlot data into a TEXT-formatted line */
+       initStringInfo(&copy_data);
+       for (i = 0; i < *numSlots; i++)
+       {
+               convert_slot_to_copy_text(&copy_data, fmstate, slots[i]);
+
+               /*
+                * Send initial COPY data if the buffer reaches the limit to 
avoid
+                * large memory usage.
+                */
+               if (copy_data.len >= COPYBUFSIZ)
+               {
+                       if (PQputCopyData(fmstate->conn, copy_data.data, 
copy_data.len) <= 0)
+                               pgfdw_report_error(NULL, fmstate->conn, 
fmstate->query);
+                       resetStringInfo(&copy_data);
+               }
+       }
+
+       /* Send the remaining COPY data */
+       if (copy_data.len > 0)
+       {
+               if (PQputCopyData(fmstate->conn, copy_data.data, copy_data.len) 
<= 0)
+                       pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+       }
+
+       pfree(copy_data.data);
+
+       /* End the COPY operation */
+       if (PQputCopyEnd(fmstate->conn, NULL) < 0 || PQflush(fmstate->conn))
+               pgfdw_report_error(NULL, fmstate->conn, fmstate->query);
+
+       /*
+        * Get the result, and check for success.
+        */
+       res = pgfdw_get_result(fmstate->conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(res, fmstate->conn, fmstate->query);
+
+       n_rows = atoi(PQcmdTuples(res));
+
+       /* And clean up */
+       PQclear(res);
+
+       MemoryContextReset(fmstate->temp_cxt);
+
+       *numSlots = n_rows;
+
+       /*
+        * Return NULL if nothing was inserted on the remote end
+        */
+       return (n_rows > 0) ? slots : NULL;
+}
+
+/*
+ *  Write target attribute values from fmstate into buf buffer to be sent as
+ *  COPY FROM STDIN data
+ */
+static void
+convert_slot_to_copy_text(StringInfo buf,
+                                                 PgFdwModifyState *fmstate,
+                                                 TupleTableSlot *slot)
+{
+       TupleDesc       tupdesc = RelationGetDescr(fmstate->rel);
+       bool            first = true;
+       int                     i = 0;
+
+       foreach_int(attnum, fmstate->target_attrs)
+       {
+               CompactAttribute *attr = TupleDescCompactAttr(tupdesc, attnum - 
1);
+               Datum           datum;
+               bool            isnull;
+
+               /* Ignore generated columns; they are set to DEFAULT */
+               if (attr->attgenerated)
+                       continue;
+
+               if (!first)
+                       appendStringInfoCharMacro(buf, '\t');
+               first = false;
+
+               datum = slot_getattr(slot, attnum, &isnull);
+
+               if (isnull)
+                       appendStringInfoString(buf, "\\N");
+               else
+               {
+                       const char *value = 
OutputFunctionCall(&fmstate->p_flinfo[i],
+                                                                               
                   datum);
+
+                       appendStringInfoString(buf, value);
+               }
+               i++;
+       }
+
+       appendStringInfoCharMacro(buf, '\n');
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index a2bb1ff352c..fc6922ddd4f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -204,6 +204,7 @@ extern void rebuildInsertSql(StringInfo buf, Relation rel,
                                                         char *orig_query, List 
*target_attrs,
                                                         int values_end_len, 
int num_params,
                                                         int num_rows);
+extern void deparseCopySql(StringInfo buf, Relation rel, List *target_attrs);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql 
b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 72d2d9c311b..1a1ecb043bf 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1929,6 +1929,37 @@ select * from gloc1;
 select * from grem1;
 delete from grem1;
 
+-- test that fdw also use COPY FROM as a remote sql
+set client_min_messages to 'log';
+
+create function insert_or_copy() returns trigger as $$
+declare query text;
+begin
+    query := current_query();
+    raise notice '%', query;
+return new;
+end;
+$$ language plpgsql;
+
+CREATE TRIGGER trig_row_before
+BEFORE INSERT OR UPDATE OR DELETE ON gloc1
+FOR EACH ROW EXECUTE PROCEDURE insert_or_copy();
+
+copy grem1 from stdin;
+3
+\.
+
+drop trigger trig_row_before on gloc1;
+reset client_min_messages;
+
+-- test that copy does not fail with column_name alias
+create table gloc2(xxx int);
+create foreign table grem2(a int) server loopback options(table_name 'gloc2');
+alter foreign table grem2 alter column a options (column_name 'xxx');
+copy grem2 from stdin;
+1
+\.
+
 -- test batch insert
 alter server loopback options (add batch_size '10');
 explain (verbose, costs off)
@@ -1955,6 +1986,24 @@ drop table tab_batch_local;
 drop table tab_batch_sharded;
 drop table tab_batch_sharded_p1_remote;
 
+-- test batch insert using copy
+set client_min_messages to 'debug1';
+copy grem1 from stdin;
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+\.
+reset client_min_messages;
+
 alter server loopback options (drop batch_size);
 
 -- ===================================================================
-- 
2.52.0

Reply via email to