From 5cb30d2b2af6064684247d75e00b414f537a21b9 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 26 Jun 2025 11:11:48 +0530
Subject: [PATCH v2] Support tables via pg_createsubscriber

This patch adds support for specifying tables to be included in logical
replication publications via pg_createsubscriber. Users can now pass multiple
'--database' and '--table' options to define which tables should be published
and subscribed for each database.

Features:
1. Supports per-database table mapping using multiple '--database'/'--table'
pairs.
2. Allows optional column lists and row filters.
3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is
created.
4. Adds TAP tests to validate combinations of database and table arguments.

This improves fine-grained control over logical replication setup and aligns
pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  11 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 173 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  80 ++++++++
 3 files changed, 262 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..2b70fc8851f 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -321,6 +321,17 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>--table=<replaceable class="parameter">table</replaceable></option></term>
+     <listitem>
+      <para>
+       Adds a table to be included in the publication for the most recently
+       specified database. Can be repeated multiple times. The syntax
+       supports optional column lists and WHERE clauses.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-V</option></term>
      <term><option>--version</option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..7fe71ece6e9 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -31,6 +31,27 @@
 #define	DEFAULT_SUB_PORT	"50432"
 #define	OBJECTTYPE_PUBLICATIONS  0x0001
 
+typedef struct TableSpec
+{
+	char	   *spec;
+	char	   *pattern_regex;
+	char	   *pattern_db_regex;
+	char	   *pattern_schema_regex;
+	char	   *pattern_table_regex;
+	struct TableSpec *next;
+}			TableSpec;
+
+typedef struct TableListPerDB
+{
+	char	   *dbname;
+	TableSpec  *tables;
+	struct TableListPerDB *next;
+}			TableListPerDB;
+
+static TableListPerDB * dblist_head = NULL;
+static TableListPerDB * dblist_tail = NULL;
+static TableListPerDB * dblist_cur = NULL;
+
 /* Command-line options */
 struct CreateSubscriberOptions
 {
@@ -61,6 +82,7 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	TableSpec  *tables;			/* list of tables to be subscribed */
 };
 
 /*
@@ -265,6 +287,7 @@ usage(void)
 	printf(_("      --publication=NAME          publication name\n"));
 	printf(_("      --replication-slot=NAME     replication slot name\n"));
 	printf(_("      --subscription=NAME         subscription name\n"));
+	printf(_("      --table                     table to subscribe to; can be specified multiple times\n"));
 	printf(_("  -V, --version                   output version information, then exit\n"));
 	printf(_("  -?, --help                      show this help, then exit\n"));
 	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
@@ -505,6 +528,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		else
 			dbinfo[i].subname = NULL;
 		/* Other fields will be filled later */
+		dbinfo[i].tables = NULL;
 
 		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
@@ -525,6 +549,20 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		i++;
 	}
 
+	for (int j = 0; j < num_dbs; j++)
+	{
+		const char *dbname = dbinfo[j].dbname;
+
+		for (TableListPerDB * cur = dblist_head; cur != NULL; cur = cur->next)
+		{
+			if (strcmp(cur->dbname, dbname) == 0)
+			{
+				dbinfo[j].tables = cur->tables;
+				break;
+			}
+		}
+	}
+
 	return dbinfo;
 }
 
@@ -1654,11 +1692,79 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	pg_log_info("creating publication \"%s\" in database \"%s\"",
 				dbinfo->pubname, dbinfo->dbname);
 
-	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
-					  ipubname_esc);
+	if (dbinfo->tables == NULL)
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc);
+	else
+	{
+		bool		first = true;
+
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc);
+		for (TableSpec * tbl = dbinfo->tables; tbl != NULL; tbl = tbl->next)
+		{
+			const char *params[2] = {
+				tbl->pattern_schema_regex,
+				tbl->pattern_table_regex
+			};
+
+			PGresult   *tres = PQexecParams(conn, "SELECT n.nspname, c.relname "
+											"FROM pg_class c "
+											"JOIN pg_namespace n ON n.oid = c.relnamespace "
+											"WHERE n.nspname ~ $1 "
+											"AND c.relname ~ $2 "
+											"AND c.relkind IN ('r','p') "
+											"ORDER BY 1, 2",
+											2, NULL, params, NULL, NULL, 0);
+
+			if (PQresultStatus(tres) != PGRES_TUPLES_OK)
+				pg_fatal("could not fetch tables for pattern \"%s\": %s",
+						 tbl->spec, PQerrorMessage(conn));
+
+			if (PQntuples(tres) == 0)
+				pg_fatal("no matching tables found for pattern \"%s\"", tbl->spec);
+
+			for (int i = 0; i < PQntuples(tres); i++)
+			{
+				char	   *escaped_schema = PQescapeIdentifier(conn, PQgetvalue(tres, i, 0),
+																strlen(PQgetvalue(tres, i, 0)));
+				char	   *escaped_table = PQescapeIdentifier(conn, PQgetvalue(tres, i, 1),
+															   strlen(PQgetvalue(tres, i, 1)));
+
+				appendPQExpBuffer(str, "%s%s.%s", first ? "" : ", ",
+								  escaped_schema, escaped_table);
+
+				PQfreemem(escaped_schema);
+				PQfreemem(escaped_table);
+				first = false;
+			}
+			PQclear(tres);
+		}
+	}
 
 	pg_log_debug("command is: %s", str->data);
 
+	if (dry_run)
+	{
+		res = PQexec(conn, "BEGIN");
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not begin transaction: %s", PQerrorMessage(conn));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, "ROLLBACK");
+		PQclear(res);
+	}
+
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
@@ -2047,6 +2153,7 @@ main(int argc, char **argv)
 		{"replication-slot", required_argument, NULL, 3},
 		{"subscription", required_argument, NULL, 4},
 		{"clean", required_argument, NULL, 5},
+		{"table", required_argument, NULL, 6},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2127,6 +2234,7 @@ main(int argc, char **argv)
 				opt.all_dbs = true;
 				break;
 			case 'd':
+				TableListPerDB * newdb;
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
 					simple_string_list_append(&opt.database_names, optarg);
@@ -2134,6 +2242,18 @@ main(int argc, char **argv)
 				}
 				else
 					pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
+
+				newdb = pg_malloc0(sizeof(TableListPerDB));
+				newdb->dbname = pg_strdup(optarg);
+				newdb->tables = NULL;
+				newdb->next = NULL;
+				if (dblist_tail)
+					dblist_tail->next = newdb;
+				else
+					dblist_head = newdb;
+
+				dblist_tail = newdb;
+				dblist_cur = newdb;
 				break;
 			case 'D':
 				subscriber_dir = pg_strdup(optarg);
@@ -2200,6 +2320,55 @@ main(int argc, char **argv)
 				else
 					pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
 				break;
+			case 6:
+				TableSpec * ts = pg_malloc0(sizeof(TableSpec));
+				PQExpBuffer dbbuf;
+				PQExpBuffer schemabuf;
+				PQExpBuffer namebuf;
+				int			encoding;
+				int			dotcnt = 0;
+
+				if (!dblist_cur)
+					pg_fatal("--table specified without a preceding --database");
+
+				ts->spec = pg_strdup(optarg);
+				dbbuf = createPQExpBuffer();
+				schemabuf = createPQExpBuffer();
+				namebuf = createPQExpBuffer();
+				encoding = pg_get_encoding_from_locale(NULL, false);
+
+				patternToSQLRegex(encoding, dbbuf, schemabuf, namebuf, optarg,
+								  false, false, &dotcnt);
+				if (dotcnt == 2)
+				{
+					ts->pattern_db_regex = NULL;
+					ts->pattern_schema_regex = pg_strdup(schemabuf->data);
+					ts->pattern_table_regex = pg_strdup(namebuf->data);
+				}
+				else if (dotcnt == 1)
+				{
+					ts->pattern_db_regex = NULL;
+					ts->pattern_schema_regex = pg_strdup(dbbuf->data);
+					ts->pattern_table_regex = pg_strdup(schemabuf->data);
+				}
+				else
+					pg_fatal("invalid --table specification: %s", optarg);
+
+				destroyPQExpBuffer(dbbuf);
+				destroyPQExpBuffer(schemabuf);
+				destroyPQExpBuffer(namebuf);
+				ts->next = NULL;
+				if (!dblist_cur->tables)
+					dblist_cur->tables = ts;
+				else
+				{
+					TableSpec  *tail = dblist_cur->tables;
+
+					while (tail->next)
+						tail = tail->next;
+					tail->next = ts;
+				}
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..d80c3f1470f 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,89 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Declare database names
+my $db3 = 'db3';
+my $db4 = 'db4';
+
+# Create databases
+$node_p->safe_psql('postgres', "CREATE DATABASE $db3");
+$node_p->safe_psql('postgres', "CREATE DATABASE $db4");
+
+# Test: Table-level publication creation
+$node_p->safe_psql($db3, "CREATE TABLE public.t1 (id int, val text)");
+$node_p->safe_psql($db3, "CREATE TABLE public.t2 (id int, val text)");
+$node_p->safe_psql($db4,
+	"CREATE TABLE public.t3 (id int, val text, extra int)");
+
+# Initialize node_s2 as a fresh standby of node_p for table-level
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber with table-level options
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db3),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db3,
+		'--table' => "$db3.public.t1",
+		'--table' => "$db3.public.t2",
+		'--database' => $db4,
+		'--table' => "$db4.public.t3",
+	],
+	'pg_createsubscriber runs with table-level publication (existing nodes)');
+
+# Get the publication name created by pg_createsubscriber for db3
+my $pubname1 = $node_p->safe_psql(
+	$db3, qq(
+    SELECT pubname FROM pg_publication
+    WHERE pubname LIKE 'pg_createsubscriber_%'
+    ORDER BY pubname LIMIT 1
+));
+
+# Check publication tables for db3
+my $actual1 = $node_p->safe_psql(
+	$db3, qq(
+	SELECT pubname || '|public|' || tablename
+	FROM pg_publication_tables
+	WHERE pubname = '$pubname1'
+	ORDER BY tablename
+));
+is($actual1, "$pubname1|public|t1\n$pubname1|public|t2",
+	'single publication for both tables created successfully on database db3'
+);
+
+# Get the publication name created by pg_createsubscriber for db4
+my $pubname2 = $node_p->safe_psql(
+	$db4, qq(
+	SELECT pubname FROM pg_publication
+	WHERE pubname LIKE 'pg_createsubscriber_%'
+	ORDER BY pubname LIMIT 1
+));
+
+# Check publication tables for db4
+my $actual2 = $node_p->safe_psql(
+	$db4, qq(
+	SELECT pubname || '|public|' || tablename
+	FROM pg_publication_tables
+	WHERE pubname = '$pubname2'
+	ORDER BY tablename
+));
+is($actual2, "$pubname2|public|t3",
+	'single publication for t3 created successfully on database db4');
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

