From 9f5b7c7363a506a2b7cd922a1184df66a988c534 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 17 Oct 2025 16:37:09 +0530
Subject: [PATCH v17] Support existing publications in pg_createsubscriber

Allow pg_createsubscriber to reuse existing publications instead of failing
when they already exist on the publisher.

Previously, pg_createsubscriber would fail if any specified publication already
existed. Now, existing publications are reused as-is with their current
configuration, and non-existing publications are created automatically with
FOR ALL TABLES.

This change provides flexibility when working with mixed scenarios of existing
and new publications. Users should verify that existing publications have the
desired configuration before reusing them, and can use --dry-run to see which
publications will be reused and which will be created.

Only publications created by pg_createsubscriber are cleaned up during error
cleanup operations. Pre-existing publications are preserved unless
'--clean=publications' is explicitly specified, which drops all publications.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  11 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   |  80 +++++++--
 .../t/040_pg_createsubscriber.pl              | 162 ++++++++++++++++++
 3 files changed, 237 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..75e2edc83fd 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -285,6 +285,17 @@ PostgreSQL documentation
        a generated name is assigned to the publication name. This option cannot
        be used together with <option>--all</option>.
       </para>
+      <para>
+       If a publication with the specified name already exists on the publisher,
+       it will be reused with its current configuration, including its table
+       list, row filters, and column filters.
+       If a publication does not exist, it will be created automatically with
+       <literal>FOR ALL TABLES</literal>.
+      </para>
+      <para>
+       Use <option>--dry-run</option> to safely preview which publications will
+       be reused and which will be created.
+      </para>
      </listitem>
     </varlistentry>
 
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index f59c293d875..ab15ddea337 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -115,6 +115,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -769,6 +770,31 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Return whether a specified publication exists in the specified database.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	exists = (PQntuples(res) == 1);
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -805,13 +831,24 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		if (num_replslots == 0)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
-		/*
-		 * Create publication on publisher. This step should be executed
-		 * *before* promoting the subscriber to avoid any transactions between
-		 * consistent LSN and the new publication rows (such transactions
-		 * wouldn't see the new publication rows resulting in an error).
-		 */
-		create_publication(conn, &dbinfo[i]);
+		if (check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+		{
+			/* Reuse existing publication on publisher. */
+			pg_log_info("dry-run: would use existing publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			dbinfo[i].made_publication = false;
+		}
+		else
+		{
+			/*
+			 * Create publication on publisher. This step should be executed
+			 * *before* promoting the subscriber to avoid any transactions
+			 * between consistent LSN and the new publication rows (such
+			 * transactions wouldn't see the new publication rows resulting in
+			 * an error).
+			 */
+			create_publication(conn, &dbinfo[i]);
+		}
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1756,11 +1793,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
 /*
  * Retrieve and drop the publications.
  *
- * Since the publications were created before the consistent LSN, they
- * remain on the subscriber even after the physical replica is
- * promoted. Remove these publications from the subscriber because
- * they have no use. Additionally, if requested, drop all pre-existing
- * publications.
+ * Publications copied during physical replication remain on the subscriber
+ * after promotion. If --clean=publications is specified, drop all existing
+ * publications in the subscriber database. Otherwise, only drop publications
+ * that were created by pg_createsubscriber during this operation.
  */
 static void
 check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@@ -1794,12 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	}
 
 	/*
-	 * In dry-run mode, we don't create publications, but we still try to drop
-	 * those to provide necessary information to the user.
+	 * Handle publication created by pg_createsubscriber.
+	 *
+	 * In dry-run mode, create_publication() and drop_publication() only log
+	 * actions without modifying the database. Importantly, since no
+	 * publication is actually created in dry-run, the above query for
+	 * existing publications won't find the "would-be" made publication. Thus,
+	 * we must call drop_publication() regardless of drop_all_pubs to ensure
+	 * the user sees the intended log message.
 	 */
 	if (!drop_all_pubs || dry_run)
-		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
-						 &dbinfo->made_publication);
+	{
+		if (dbinfo->made_publication)
+			drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+							 &dbinfo->made_publication);
+		else if (!drop_all_pubs)
+			pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+						dbinfo->pubname, dbinfo->dbname);
+	}
 }
 
 /*
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 3d6086dc489..9536771fcca 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,171 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 isnt($sysid_p, $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1,
+	"CREATE PUBLICATION test_pub_existing FOR TABLE tbl1");
+
+# Initialize node_s2 and node_s3 as a fresh standby of node_p for existing/new
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+my $node_s3 = PostgreSQL::Test::Cluster->new('node_s3');
+$node_s3->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s3->start;
+$node_s3->stop;
+
+# Test publication reuse and creation behavior with --dry-run.
+# This should reuse existing 'test_pub_existing' and create new 'test_pub_new',
+# demonstrating mixed publication handling without actual changes.
+($stdout, $stderr) = run_command(
+	[
+		'pg_createsubscriber',
+		'--verbose',
+		'--dry-run',
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+	],
+	'run pg_createsubscriber --dry-run on node S2');
+
+like(
+	$stderr,
+	qr/dry-run: would use existing publication "test_pub_existing"/,
+	'dry-run logs reuse of existing publication');
+like(
+	$stderr,
+	qr/dry-run: would create publication "test_pub_new"/,
+	'dry-run logs creation of new publication');
+
+# Verify dry-run did not modify publisher state
+my $pub_names_db1 = $node_p->safe_psql($db1,
+	"SELECT pubname FROM pg_publication ORDER BY pubname");
+is( $pub_names_db1, qq(pub1
+test_pub1
+test_pub2
+test_pub_existing),
+	"existing publication remains unchanged after dry-run");
+
+my $pub_names_db2 = $node_p->safe_psql($db2,
+	"SELECT pubname FROM pg_publication ORDER BY pubname");
+is($pub_names_db2, 'pub2',
+	"dry-run did not actually create publications in db2");
+
+# Run pg_createsubscriber to test publication reuse and creation behavior.
+# This should reuse the existing 'test_pub_existing' publication in db1 and
+# create a new 'test_pub_new' publication in db2, demonstrating how the tool
+# handles mixed existing/new publication scenarios.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Verify that test_pub_new was created in db2
+$result = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($result, '1', 'test_pub_new publication was created in db2');
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub_existing}
+{test_pub_new}),
+	"subscriptions use the correct publications");
+
+# Run pg_createsubscriber with --clean=publications to test that ALL
+# publications (both pre-existing and pg_createsubscriber-created) are dropped
+# when the --clean=publications option is used.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s3->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s3->host,
+		'--subscriber-port' => $node_s3->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+		'--clean' => 'publications',
+	],
+	'run pg_createsubscriber on node S3');
+
+# Start subscriber
+$node_s3->start;
+
+# Confirm ALL publications were removed by --clean=publications
+my $pub_count_db1 =
+  $node_s3->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication");
+is($pub_count_db1, '0',
+	'all publications were removed from db1 by --clean=publications');
+
+my $pub_count_db2 =
+  $node_s3->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication");
+is($pub_count_db2, '0',
+	'all publications were removed from db2 by --clean=publications');
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
+$node_s3->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

