I was trying to extract some preparatory work from the remaining patches and came up with the attached. This is part of your patch 0003, but also relevant for part 0004. The problem was that COPY (SELECT *) is not sufficient when the table has generated columns, so we need to build the column list explicitly.

Thoughts?

--
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From c52672ff1de4d7c55c4b50eb7986f45801ea4c60 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Mon, 16 Mar 2020 13:27:51 +0100
Subject: [PATCH] Prepare to support non-tables in publications

This by itself doesn't change any functionality but prepares the way
for having relations other than base tables in publications.

Make arrangements for COPY handling the initial table sync.  For
non-tables we have to use COPY (SELECT ...) instead of directly
copying from the table, but then we have to take care to omit
generated columns from the column list.

Also, remove a hardcoded reference to relkind = 'r' and rely on
pg_relation_is_publishable(), which matches what the publisher can
actually publish and is correct even in cross-version scenarios.

Discussion: 
https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_ejmoo...@mail.gmail.com
---
 src/backend/replication/logical/tablesync.c | 43 ++++++++++++++++-----
 src/include/replication/logicalproto.h      |  1 +
 2 files changed, 35 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index 98825f01e9..358b0a3726 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -639,26 +639,27 @@ fetch_remote_table_info(char *nspname, char *relname,
        WalRcvExecResult *res;
        StringInfoData cmd;
        TupleTableSlot *slot;
-       Oid                     tableRow[2] = {OIDOID, CHAROID};
-       Oid                     attrRow[4] = {TEXTOID, OIDOID, INT4OID, 
BOOLOID};
+       Oid                     tableRow[] = {OIDOID, CHAROID, CHAROID, 
BOOLOID};
+       Oid                     attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
        bool            isnull;
        int                     natt;
+       bool            remote_is_publishable;
 
        lrel->nspname = nspname;
        lrel->relname = relname;
 
        /* First fetch Oid and replica identity. */
        initStringInfo(&cmd);
-       appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
+       appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind,"
+                                        "    pg_relation_is_publishable(c.oid)"
                                         "  FROM pg_catalog.pg_class c"
                                         "  INNER JOIN pg_catalog.pg_namespace 
n"
                                         "        ON (c.relnamespace = n.oid)"
                                         " WHERE n.nspname = %s"
-                                        "   AND c.relname = %s"
-                                        "   AND c.relkind = 'r'",
+                                        "   AND c.relname = %s",
                                         quote_literal_cstr(nspname),
                                         quote_literal_cstr(relname));
-       res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+       res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -675,6 +676,13 @@ fetch_remote_table_info(char *nspname, char *relname,
        Assert(!isnull);
        lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
        Assert(!isnull);
+       lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+       Assert(!isnull);
+       remote_is_publishable = DatumGetBool(slot_getattr(slot, 4, &isnull));
+       if (isnull || !remote_is_publishable)
+               ereport(ERROR,
+                               (errmsg("table \"%s.%s\" on the publisher is 
not publishable",
+                                               nspname, relname)));
 
        ExecDropSingleTupleTableSlot(slot);
        walrcv_clear_result(res);
@@ -696,7 +704,7 @@ fetch_remote_table_info(char *nspname, char *relname,
                                         lrel->remoteid,
                                         (walrcv_server_version(wrconn) >= 
120000 ? "AND a.attgenerated = ''" : ""),
                                         lrel->remoteid);
-       res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+       res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -765,8 +773,25 @@ copy_table(Relation rel)
 
        /* Start copy on the publisher. */
        initStringInfo(&cmd);
-       appendStringInfo(&cmd, "COPY %s TO STDOUT",
-                                        
quote_qualified_identifier(lrel.nspname, lrel.relname));
+       if (lrel.relkind == RELKIND_RELATION)
+               appendStringInfo(&cmd, "COPY %s TO STDOUT",
+                                                
quote_qualified_identifier(lrel.nspname, lrel.relname));
+       else
+       {
+               /*
+                * For non-tables, we need to do COPY (SELECT ...), but we 
can't just
+                * do SELECT * because we need to not copy generated columns.
+                */
+               appendStringInfo(&cmd, "COPY (SELECT ");
+               for (int i = 0; i < lrel.natts; i++)
+               {
+                       appendStringInfoString(&cmd, 
quote_identifier(lrel.attnames[i]));
+                       if (i < lrel.natts - 1)
+                               appendStringInfoString(&cmd, ", ");
+               }
+               appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+                                                
quote_qualified_identifier(lrel.nspname, lrel.relname));
+       }
        res = walrcv_exec(wrconn, cmd.data, 0, NULL);
        pfree(cmd.data);
        if (res->status != WALRCV_OK_COPY_OUT)
diff --git a/src/include/replication/logicalproto.h 
b/src/include/replication/logicalproto.h
index 2cc2dc4db3..4860561be9 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -49,6 +49,7 @@ typedef struct LogicalRepRelation
        char      **attnames;           /* column names */
        Oid                *atttyps;            /* column types */
        char            replident;              /* replica identity */
+       char            relkind;                /* remote relation kind */
        Bitmapset  *attkeys;            /* Bitmap of key columns */
 } LogicalRepRelation;
 
-- 
2.25.0

Reply via email to