From 15678a86b82207bd91aa0b596e112ebaed1be915 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 26 Jun 2025 11:11:48 +0530
Subject: [PATCH v3 1/2] Support tables via pg_createsubscriber

This patch adds support for specifying tables to be included in logical
replication publications via pg_createsubscriber. Users can now pass multiple
'--database' and '--table' options to define which tables should be published
and subscribed for each database.

Features:
1. Supports per-database table mapping using multiple '--database'/'--table'
pairs.
2. Allows optional column lists and row filters.
3. If '--table' is omitted for a database, a 'FOR ALL TABLES' publication is
created.
4. Adds TAP tests to validate combinations of database and table arguments.

This improves fine-grained control over logical replication setup and aligns
pg_createsubscriber CLI design with other tools like vacuumdb and pg_restore.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  57 +++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 236 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  83 ++++++
 3 files changed, 366 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..ddc8777e138 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -39,6 +39,10 @@ PostgreSQL documentation
      <arg choice="plain"><option>--publisher-server</option></arg>
     </group>
     <replaceable>connstr</replaceable>
+    <group choice="req">
+     <arg choice="plain"><option>--table</option></arg>
+    </group>
+    <replaceable>table-name</replaceable>
    </group>
   </cmdsynopsis>
  </refsynopsisdiv>
@@ -321,6 +325,59 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>--table=<replaceable class="parameter">table</replaceable></option></term>
+     <listitem>
+      <para>
+       Adds one or more specific tables to the publication for the most recently
+       specified <option>--database</option>. This option can be given multiple
+       times to include additional tables.
+      </para>
+
+      <para>
+       The argument must be a fully qualified table name in one of the
+       following forms:
+        <itemizedlist><listitem><para><literal>schema.table</literal></para></listitem>
+        <listitem><para><literal>db.schema.table</literal></para></listitem></itemizedlist>
+       If the database name is provided, it must match the most recent
+       <option>--database</option> argument.
+      </para>
+
+      <para>
+       A table specification may also include an optional column list and/or
+       row filter:
+       <itemizedlist>
+       <listitem>
+         <para>
+          <literal>schema.table(col1, col2, ...)</literal> &mdash; publishes
+          only the specified columns.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>schema.table WHERE (predicate)</literal> &mdash; publishes
+          only rows that satisfy the given condition.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          Both forms can be combined, e.g.
+          <literal>schema.table(col1, col2) WHERE (id &gt; 100)</literal>.
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para>
+
+      <para>
+       When <option>--table</option> is specified, only the listed tables are
+       included in the publication. It cannot be combined with
+       <option>--all</option> (which publishes all databases and all tables).
+       Within a database, if no <option>--table</option> options are given, all
+       tables are included by default.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-V</option></term>
      <term><option>--version</option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..0017a740b72 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -31,6 +31,21 @@
 #define	DEFAULT_SUB_PORT	"50432"
 #define	OBJECTTYPE_PUBLICATIONS  0x0001
 
+typedef struct TableSpec
+{
+	char	   *spec;
+	char	   *dbname;
+	char	   *pattern_regex;
+	char	   *pattern_part1_regex;
+	char	   *pattern_part2_regex;
+	char	   *pattern_part3_regex;
+	struct TableSpec *next;
+}			TableSpec;
+
+static TableSpec * table_list_head = NULL;
+static TableSpec * table_list_tail = NULL;
+static char *current_dbname = NULL;
+
 /* Command-line options */
 struct CreateSubscriberOptions
 {
@@ -61,6 +76,7 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	TableSpec  *tables;			/* list of tables to be subscribed */
 };
 
 /*
@@ -161,7 +177,6 @@ enum WaitPMResult
 	POSTMASTER_STILL_STARTING
 };
 
-
 /*
  * Cleanup objects that were created by pg_createsubscriber if there is an
  * error.
@@ -265,6 +280,7 @@ usage(void)
 	printf(_("      --publication=NAME          publication name\n"));
 	printf(_("      --replication-slot=NAME     replication slot name\n"));
 	printf(_("      --subscription=NAME         subscription name\n"));
+	printf(_("      --table                     table to subscribe to; can be specified multiple times\n"));
 	printf(_("  -V, --version                   output version information, then exit\n"));
 	printf(_("  -?, --help                      show this help, then exit\n"));
 	printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
@@ -505,6 +521,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		else
 			dbinfo[i].subname = NULL;
 		/* Other fields will be filled later */
+		dbinfo[i].tables = NULL;
 
 		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
@@ -525,6 +542,40 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		i++;
 	}
 
+	for (i = 0; i < num_dbs; i++)
+	{
+		TableSpec  *prev = NULL;
+		TableSpec  *cur = table_list_head;
+		TableSpec  *filtered_head = NULL;
+		TableSpec  *filtered_tail = NULL;
+
+		while (cur != NULL)
+		{
+			TableSpec  *next = cur->next;
+
+			if (strcmp(cur->dbname, dbinfo[i].dbname) == 0)
+			{
+				if (prev)
+					prev->next = next;
+				else
+					table_list_head = next;
+
+				cur->next = NULL;
+				if (!filtered_head)
+					filtered_head = filtered_tail = cur;
+				else
+				{
+					filtered_tail->next = cur;
+					filtered_tail = cur;
+				}
+			}
+			else
+				prev = cur;
+			cur = next;
+		}
+		dbinfo[i].tables = filtered_head;
+	}
+
 	return dbinfo;
 }
 
@@ -1615,6 +1666,7 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	PGresult   *res;
 	char	   *ipubname_esc;
 	char	   *spubname_esc;
+	bool		first_table = true;
 
 	Assert(conn != NULL);
 
@@ -1654,12 +1706,80 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	pg_log_info("creating publication \"%s\" in database \"%s\"",
 				dbinfo->pubname, dbinfo->dbname);
 
-	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
-					  ipubname_esc);
+	if (dbinfo->tables == NULL)
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc);
+	else
+	{
+		appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR TABLE ", ipubname_esc);
+
+		for (TableSpec * tbl = dbinfo->tables; tbl != NULL; tbl = tbl->next)
+		{
+			const char *params[2] = {
+				tbl->pattern_part2_regex,
+				tbl->pattern_part3_regex
+			};
+
+			PGresult   *tres = PQexecParams(conn, "SELECT n.nspname, c.relname "
+											"FROM pg_class c "
+											"JOIN pg_namespace n ON n.oid = c.relnamespace "
+											"WHERE n.nspname ~ $1 "
+											"AND c.relname ~ $2 "
+											"AND c.relkind IN ('r','p') "
+											"ORDER BY 1, 2",
+											2, NULL, params, NULL, NULL, 0);
+
+			if (PQresultStatus(tres) != PGRES_TUPLES_OK)
+				pg_fatal("could not fetch tables for pattern \"%s\": %s",
+						 tbl->spec, PQerrorMessage(conn));
+
+			if (PQntuples(tres) == 0)
+				pg_fatal("no matching tables found for pattern \"%s\"", tbl->spec);
+
+			for (int i = 0; i < PQntuples(tres); i++)
+			{
+				char	   *sch = PQgetvalue(tres, i, 0);
+				char	   *relname = PQgetvalue(tres, i, 1);
+				char	   *escaped_schema = PQescapeIdentifier(conn, sch, strlen(sch));
+				char	   *escaped_table = PQescapeIdentifier(conn, relname, strlen(relname));
+
+				appendPQExpBuffer(str, "%s%s.%s",
+								  first_table ? "" : ", ",
+								  escaped_schema, escaped_table);
+
+				first_table = false;
+
+				PQfreemem(escaped_schema);
+				PQfreemem(escaped_table);
+			}
+			PQclear(tres);
+		}
+	}
 
 	pg_log_debug("command is: %s", str->data);
 
-	if (!dry_run)
+	if (dry_run)
+	{
+		res = PQexec(conn, "BEGIN");
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not begin transaction: %s", PQerrorMessage(conn));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, str->data);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
+						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+		PQclear(res);
+
+		res = PQexec(conn, "ROLLBACK");
+		PQclear(res);
+	}
+	else
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -2047,6 +2167,7 @@ main(int argc, char **argv)
 		{"replication-slot", required_argument, NULL, 3},
 		{"subscription", required_argument, NULL, 4},
 		{"clean", required_argument, NULL, 5},
+		{"table", required_argument, NULL, 6},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2127,14 +2248,20 @@ main(int argc, char **argv)
 				opt.all_dbs = true;
 				break;
 			case 'd':
-				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
-					simple_string_list_append(&opt.database_names, optarg);
-					num_dbs++;
+					if (current_dbname)
+						pg_free(current_dbname);
+					current_dbname = pg_strdup(optarg);
+
+					if (!simple_string_list_member(&opt.database_names, optarg))
+					{
+						simple_string_list_append(&opt.database_names, optarg);
+						num_dbs++;
+					}
+					else
+						pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
+					break;
 				}
-				else
-					pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
-				break;
 			case 'D':
 				subscriber_dir = pg_strdup(optarg);
 				canonicalize_path(subscriber_dir);
@@ -2200,6 +2327,95 @@ main(int argc, char **argv)
 				else
 					pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
 				break;
+			case 6:
+				{
+					char	   *copy_arg;
+					char	   *first_dot;
+					char	   *second_dot;
+					char	   *dbname_arg = NULL;
+					char	   *schema_table_part;
+					TableSpec  *ts;
+					PQExpBuffer dbbuf;
+					PQExpBuffer schemabuf;
+					PQExpBuffer namebuf;
+					int			encoding;
+					int			dotcnt;
+
+					if (!current_dbname)
+						pg_fatal("--table specified without a preceding --database");
+
+					copy_arg = pg_strdup(optarg);
+
+					first_dot = strchr(copy_arg, '.');
+					if (first_dot != NULL)
+						second_dot = strchr(first_dot + 1, '.');
+					else
+						second_dot = NULL;
+
+					if (second_dot != NULL)
+					{
+						*first_dot = '\0';
+						dbname_arg = copy_arg;
+						schema_table_part = first_dot + 1;
+					}
+					else
+					{
+						dbname_arg = NULL;
+						schema_table_part = copy_arg;
+					}
+
+					if (dbname_arg != NULL && strcmp(dbname_arg, current_dbname) != 0)
+						pg_fatal("database name in --table argument \"%s\" does not match most recent --database \"%s\"",
+								 dbname_arg, current_dbname);
+
+					ts = pg_malloc0(sizeof(TableSpec));
+					dbbuf = createPQExpBuffer();
+					schemabuf = createPQExpBuffer();
+					namebuf = createPQExpBuffer();
+					encoding = pg_get_encoding_from_locale(NULL, false);
+					dotcnt = 0;
+
+					ts->spec = pg_strdup(optarg);
+					ts->dbname = pg_strdup(current_dbname);
+
+					patternToSQLRegex(encoding, dbbuf, schemabuf, namebuf,
+									  schema_table_part, false, false, &dotcnt);
+
+					if (dbname_arg != NULL)
+						dotcnt++;
+
+					if (dotcnt == 2)
+					{
+						ts->pattern_part1_regex = pg_strdup(dbbuf->data);
+						ts->pattern_part2_regex = pg_strdup(schemabuf->data);
+						ts->pattern_part3_regex = namebuf->len > 0 ? pg_strdup(namebuf->data) : NULL;
+					}
+					else if (dotcnt == 1)
+					{
+						ts->pattern_part1_regex = NULL;
+						ts->pattern_part2_regex = pg_strdup(dbbuf->data);
+						ts->pattern_part3_regex = NULL;
+					}
+					else
+						pg_fatal("invalid table specification \"%s\"", optarg);
+
+					destroyPQExpBuffer(dbbuf);
+					destroyPQExpBuffer(schemabuf);
+					destroyPQExpBuffer(namebuf);
+					pg_free(copy_arg);
+
+					ts->next = NULL;
+
+					if (!table_list_head)
+						table_list_head = table_list_tail = ts;
+					else
+						table_list_tail->next = ts;
+
+					table_list_tail = ts;
+
+					break;
+				}
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..62e75af4bb5 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,92 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Declare database names
+my $db3 = 'db3';
+my $db4 = 'db4';
+
+# Create databases
+$node_p->safe_psql('postgres', "CREATE DATABASE $db3");
+$node_p->safe_psql('postgres', "CREATE DATABASE $db4");
+
+# Create additional schemas
+$node_p->safe_psql($db3, "CREATE SCHEMA myschema");
+$node_p->safe_psql($db4, "CREATE SCHEMA otherschema");
+
+# Test: Table-level publication creation
+$node_p->safe_psql($db3, "CREATE TABLE public.t1 (id int, val text)");
+$node_p->safe_psql($db3, "CREATE TABLE public.t2 (id int, val text)");
+$node_p->safe_psql($db3, "CREATE TABLE myschema.t4 (id int, val text)");
+
+$node_p->safe_psql($db4,
+	"CREATE TABLE public.t3 (id int, val text, extra int)");
+$node_p->safe_psql($db4,
+	"CREATE TABLE otherschema.t5 (id serial primary key, info text)");
+
+# Create explicit publications
+my $pubname1 = 'pub1';
+my $pubname2 = 'pub2';
+
+# Initialize node_s2 as a fresh standby of node_p for table-level
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber with table-level options
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db3),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--publication' => $pubname1,
+		'--publication' => $pubname2,
+		'--database' => $db3,
+		'--table' => "$db3.public.t1",
+		'--table' => "$db3.public.t2",
+		'--table' => "$db3.myschema.t4",
+		'--database' => $db4,
+		'--table' => "$db4.public.t3",
+		'--table' => "$db4.otherschema.t5",
+	],
+	'pg_createsubscriber runs with table-level publication (existing nodes)');
+
+# Check publication tables for db3 with public schema first
+my $actual1 = $node_p->safe_psql(
+	$db3, qq(
+        SELECT pubname || '|' || schemaname || '|' || tablename
+        FROM pg_publication_tables
+        WHERE pubname = '$pubname1'
+        ORDER BY schemaname, tablename
+    )
+);
+is( $actual1,
+	"$pubname1|myschema|t4\n$pubname1|public|t1\n$pubname1|public|t2",
+	'publication includes tables in public and myschema schemas on db3');
+
+# Check publication tables for db4, with public schema first
+my $actual2 = $node_p->safe_psql(
+	$db4, qq(
+        SELECT pubname || '|' || schemaname || '|' || tablename
+        FROM pg_publication_tables
+        WHERE pubname = '$pubname2'
+        ORDER BY schemaname, tablename
+    )
+);
+is( $actual2,
+	"$pubname2|otherschema|t5\n$pubname2|public|t3",
+	'publication includes tables in public and otherschema schemas on db4');
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.41.0.windows.3

