From 078d3a9bdc32a4e781877572d4c70c928587154b Mon Sep 17 00:00:00 2001
From: Shi Yu <shiy.fnst@fujitsu.com>
Date: Mon, 9 Jan 2023 14:24:34 +0800
Subject: [PATCH v2 1/2] Ignore dropped columns and generated columns when
 checking the column list in logical replication

There's a restriction that different column lists for same table can't be used
in the publications of single subscription. But in the check, the dropped
columns and generated columns are included when no column list is specified.
This might lead to unexpected errors in some cases.

Fix it by modifying function pg_get_publication_tables() to contain all
supported columns if no column list is specified, and excluding dropped columns
and generated columns in the check of walsender.

Bump catalog version bacause the query of system view pg_publication_tables is
changed.
---
 src/backend/catalog/pg_publication.c        | 30 +++++++++++++++++++++
 src/backend/catalog/system_views.sql        |  5 ++--
 src/backend/replication/pgoutput/pgoutput.c | 17 +++++++++++-
 src/test/regress/expected/rules.out         |  2 +-
 4 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9571c669f7..de5f6cefc9 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1153,6 +1153,36 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			nulls[2] = true;
 		}
 
+		/* If column list attribute is null, show all columns. */
+		if (nulls[1] == true)
+		{
+			Relation	rel = table_open(relid, AccessShareLock);
+			int			nattnums = 0;
+			int16	   *attnums;
+			TupleDesc	desc = RelationGetDescr(rel);
+			int			i;
+
+			attnums = (int16 *) palloc(desc->natts * sizeof(int16));
+
+			for (i = 0; i < desc->natts; i++)
+			{
+				Form_pg_attribute att = TupleDescAttr(desc, i);
+
+				if (att->attisdropped || att->attgenerated)
+					continue;
+
+				attnums[nattnums++] = att->attnum;
+			}
+
+			if (nattnums > 0)
+			{
+				values[1] = PointerGetDatum(buildint2vector(attnums, nattnums));
+				nulls[1] = false;
+			}
+
+			table_close(rel, AccessShareLock);
+		}
+
 		rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 
 		SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 447c9b970f..d2a8c82900 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -371,9 +371,8 @@ CREATE VIEW pg_publication_tables AS
         C.relname AS tablename,
         ( SELECT array_agg(a.attname ORDER BY a.attnum)
           FROM pg_attribute a
-          WHERE a.attrelid = GPT.relid AND a.attnum > 0 AND
-                NOT a.attisdropped AND
-                (a.attnum = ANY(GPT.attrs) OR GPT.attrs IS NULL)
+          WHERE a.attrelid = GPT.relid AND
+                a.attnum = ANY(GPT.attrs)
         ) AS attnames,
         pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
     FROM pg_publication P,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 19c10c028f..1a80d67bb9 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1058,16 +1058,31 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
 				/* Build the column list bitmap in the per-entry context. */
 				if (!pub_no_list)	/* when not null */
 				{
+					int			i;
+					int			nliveatts = 0;
+					TupleDesc	desc = RelationGetDescr(relation);
+
 					pgoutput_ensure_entry_cxt(data, entry);
 
 					cols = pub_collist_to_bitmapset(cols, cfdatum,
 													entry->entry_cxt);
 
+					/* Get the number of live attributes. */
+					for (i = 0; i < desc->natts; i++)
+					{
+						Form_pg_attribute att = TupleDescAttr(desc, i);
+
+						if (att->attisdropped || att->attgenerated)
+							continue;
+
+						nliveatts++;
+					}
+
 					/*
 					 * If column list includes all the columns of the table,
 					 * set it to NULL.
 					 */
-					if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
+					if (bms_num_members(cols) == nliveatts)
 					{
 						bms_free(cols);
 						cols = NULL;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6a21ce90ac..30ff45c238 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1446,7 +1446,7 @@ pg_publication_tables| SELECT p.pubname,
     c.relname AS tablename,
     ( SELECT array_agg(a.attname ORDER BY a.attnum) AS array_agg
            FROM pg_attribute a
-          WHERE ((a.attrelid = gpt.relid) AND (a.attnum > 0) AND (NOT a.attisdropped) AND ((a.attnum = ANY ((gpt.attrs)::smallint[])) OR (gpt.attrs IS NULL)))) AS attnames,
+          WHERE ((a.attrelid = gpt.relid) AND (a.attnum = ANY ((gpt.attrs)::smallint[])))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
     LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
-- 
2.31.1

