From bb5eb539af928ab3fead7ee34c07e770fb8d944e Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Mon, 8 Aug 2022 09:47:26 +0530
Subject: [PATCH v44 1/2] Check and log a warning if the publisher has
 subscribed to the same table from some other publisher.

Checks and log a warning if 'copy_data = true' and 'origin = none' but the
publication tables were also replicated from other publishers.
-------------------------------------------------------------------------------
The steps below help to demonstrate how the new warning is useful:

The initial copy phase has no way to know the origin of the row data,
so if 'copy_data = true' in step 4 below, log a warning to notify the user
that potentially non-local data might have been copied.

e.g.
step 1:
node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1;
CREATE PUBLICATION

step 2:
node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1;
CREATE PUBLICATION

step 3:
node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '<node2 details>'
node1-# PUBLICATION pub_node2 WITH (copy_data = true, origin = none);
CREATE SUBSCRIPTION

step 4:
node1=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
node1-# PUBLICATION pub_node1 WITH (copy_data = true, origin = none);
WARNING:  subscription "sub_node2_node1" requested origin=NONE but might copy data that had a different origin
DETAIL:  Subscribed publication "pub_node1" is subscribing to other publications.
HINT:  Verify that initial data copied from the publisher tables did not come from other origins. Some corrective action may be necessary.
NOTICE:  created replication slot "sub_node2_node1" on publisher
CREATE SUBSCRIPTION

Author: Vignesh C
Reviewed-By: Peter Smith, Amit Kapila, Jonathan Katz, Shi yu, Wang wei
Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com
---
 doc/src/sgml/logical-replication.sgml     |  32 +++++
 doc/src/sgml/ref/alter_subscription.sgml  |   5 +
 doc/src/sgml/ref/create_subscription.sgml |  10 ++
 src/backend/commands/subscriptioncmds.c   | 158 +++++++++++++++++++++-
 src/test/subscription/t/030_origin.pl     | 114 ++++++++++++----
 5 files changed, 290 insertions(+), 29 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bdf1e7b727..2b50b600c9 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -514,6 +514,38 @@ test_sub=# SELECT * FROM t3;
 
  </sect1>
 
+ <sect1 id="specifying-origins-for-subscription">
+  <title>Specifying origins for subscription</title>
+
+  <para>
+   When using a subscription parameter combination of
+   <literal>copy_data=true</literal> and <literal>origin=NONE</literal>, the
+   initial sync table data is copied directly from the publisher, meaning that
+   knowledge of the true origin of that data is not possible. If the publisher
+   also has subscriptions then the copied table data might have originated from
+   further upstream. This scenario is detected and a WARNING is logged to the
+   user, but the warning is only an indication of a potential problem; it is
+   the user reponsibility to make the necessary checks to ensure the copied
+   data origins are really as wanted or not. It is recommended to create the
+   subscription using <literal>enabled=false</literal>, so that if the origin
+   WARNING occurs no copy has happened yet. Otherwise some corrective steps
+   might be needed to remove any unwanted data that got copied.
+   To find which tables might potentially include non-local origins (due to
+   other subscriptions created on the publisher) try this SQL query by
+   specifying the publications in IN condition:
+<programlisting>
+SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename
+FROM pg_publication P,
+     LATERAL pg_get_publication_tables(P.pubname) GPT
+     LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),
+     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND
+      P.pubname IN (...);
+</programlisting>
+  </para>
+
+ </sect1>
+
  <sect1 id="logical-replication-row-filter">
   <title>Row Filters</title>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 64efc21f53..055cc607c4 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -168,6 +168,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
           that are being subscribed to when the replication starts.
           The default is <literal>true</literal>.
          </para>
+         <para>
+          Refer to the <xref linkend="specifying-origins-for-subscription"/> about how
+          <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
          <para>
           Previously subscribed tables are not copied, even if a table's row
           filter <literal>WHERE</literal> clause has since been modified.
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 7390c715bc..240465c812 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -213,6 +213,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+         <para>
+          Refer to the <xref linkend="specifying-origins-for-subscription"/>
+          about how <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -315,6 +320,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           to <literal>any</literal> means that the publisher sends changes
           regardless of their origin. The default is <literal>any</literal>.
          </para>
+         <para>
+          Refer to the <xref linkend="specifying-origins-for-subscription"/>
+          about how <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
       </variablelist></para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f87796e5af..f22cd37439 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -92,6 +92,10 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin(WalReceiverConn *wrconn,
+									  List *publications, bool copydata,
+									  char *origin, Oid *subrel_local_oids,
+									  int subrel_count, char *subname);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		PG_TRY();
 		{
 			check_publications(wrconn, publications);
+			check_publications_origin(wrconn, publications, opts.copy_data,
+									  opts.origin, NULL, 0, stmt->subname);
 
 			/*
 			 * Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	ListCell   *lc;
 	int			off;
 	int			remove_rel_len;
+	int			subrel_count;
 	Relation	rel = NULL;
 	typedef struct SubRemoveRels
 	{
@@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
+		subrel_count = list_length(subrel_states);
 
 		/*
 		 * Build qsorted array of local table oids for faster lookup. This can
 		 * potentially contain all tables in the database so speed of lookup
 		 * is important.
 		 */
-		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		subrel_local_oids = palloc(subrel_count * sizeof(Oid));
 		off = 0;
 		foreach(lc, subrel_states)
 		{
@@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 			subrel_local_oids[off++] = relstate->relid;
 		}
-		qsort(subrel_local_oids, list_length(subrel_states),
+		qsort(subrel_local_oids, subrel_count,
 			  sizeof(Oid), oid_cmp);
 
+		check_publications_origin(wrconn, sub->publications, copy_data,
+								  sub->origin, subrel_local_oids,
+								  subrel_count, sub->name);
+
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
 		 * and origins corresponding to them.
 		 */
-		sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+		sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
 
 		/*
 		 * Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			pubrel_local_oids[off++] = relid;
 
 			if (!bsearch(&relid, subrel_local_oids,
-						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+						 subrel_count, sizeof(Oid), oid_cmp))
 			{
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			  sizeof(Oid), oid_cmp);
 
 		remove_rel_len = 0;
-		for (off = 0; off < list_length(subrel_states); off++)
+		for (off = 0; off < subrel_count; off++)
 		{
 			Oid			relid = subrel_local_oids[off];
 
@@ -1784,6 +1796,142 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 	table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Check and log a warning if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if "copy_data = true"
+ * and "origin = none" for CREATE SUBSCRIPTION and
+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
+ * having origin might have been copied.
+ *
+ * This check need not be performed on the tables that are already added
+ * because incremental sync for those tables will happen through WAL and the
+ * origin of the data can be identified from the WAL records.
+ *
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber.
+ */
+static void
+check_publications_origin(WalReceiverConn *wrconn, List *publications,
+						   bool copydata, char *origin, Oid *subrel_local_oids,
+						   int subrel_count, char *subname)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, TEXTOID};
+	List		*publist = NIL;
+
+	if (!copydata || !origin ||
+		(pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+		return;
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT N.nspname AS schemaname,\n"
+						   "				C.relname AS tablename,\n"
+						   "				P.pubname AS pubname\n"
+						   "FROM pg_publication P,\n"
+						   "	 LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "	 LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+						   "	 pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						   "WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND P.pubname IN (");
+	get_publications_str(publications, &cmd, true);
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		char	   *pubname;
+		bool		isnull;
+		bool		isnewtable = true;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+		pubname = TextDatumGetCString(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		/* Skip already added tables */
+		if (subrel_count)
+		{
+			RangeVar   *rv;
+			Oid			relid;
+
+			rv = makeRangeVar(nspname, relname, -1);
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
+
+			if (bsearch(&relid, subrel_local_oids,
+						subrel_count, sizeof(Oid), oid_cmp))
+				isnewtable = false;
+		}
+
+		ExecClearTuple(slot);
+
+		if (!isnewtable)
+		{
+			pfree(nspname);
+			pfree(relname);
+			pfree(pubname);
+			continue;
+		}
+
+		publist = list_append_unique(publist, makeString(pubname));
+		pfree(nspname);
+		pfree(relname);
+	}
+
+	/*
+	 * Log a warning if the publisher has subscribed to the same table
+	 * from some other publisher. We cannot know the origin of data during
+	 * the initial sync. Data origins can be found only from the WAL by
+	 * looking at the origin id.
+	 *
+	 * XXX: For simplicity, we don't check whether the table has any data
+	 * or not. If the table doesn't have any data then we don't need to
+	 * distinguish between data having origin and data not having origin so
+	 * we can avoid logging a warning in that case.
+	 */
+	if (list_length(publist))
+	{
+		StringInfo	pubnames = makeStringInfo();
+
+		/* Prepare the list of publication(s) for warning message. */
+		get_publications_str(publist, pubnames, false);
+		ereport(WARNING,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("subscription \"%s\" requested origin=NONE but might copy data that had a different origin",
+						subname),
+				errdetail_plural("Subscribed publication %s is subscribing to other publications.",
+								 "Subscribed publications %s are subscribing to other publications.",
+								 list_length(publist), pubnames->data);
+				errhint("Verify that initial data copied from the publisher tables did not come from other origins. Some corrective action may be necessary."));
+		list_free_deep(publist);
+		pfree(pubnames->data);
+		pfree(pubnames);
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index b297a51f7c..453b43a10b 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -1,13 +1,23 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test the CREATE SUBSCRIPTION 'origin' parameter.
+# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
+# 'copy_data' parameter.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+my $subname_AB = 'tap_sub_A_B';
+my $subname_AB2 = 'tap_sub_A_B_2';
+my $subname_BA = 'tap_sub_B_A';
+my $subname_BC = 'tap_sub_B_C';
+
+my $result;
+my $stdout;
+my $stderr;
+
 ###############################################################################
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
@@ -32,33 +42,29 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_A (pub) -> node_B (sub)
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
-my $appname_B1 = 'tap_sub_B1';
 $node_B->safe_psql(
 	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B1
-	CONNECTION '$node_A_connstr application_name=$appname_B1'
+	CREATE SUBSCRIPTION $subname_BA
+	CONNECTION '$node_A_connstr application_name=$subname_BA'
 	PUBLICATION tap_pub_A
 	WITH (origin = none)");
 
 # node_B (pub) -> node_A (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
-my $appname_A = 'tap_sub_A';
 $node_A->safe_psql(
 	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_A
-	CONNECTION '$node_B_connstr application_name=$appname_A'
+	CREATE SUBSCRIPTION $subname_AB
+	CONNECTION '$node_B_connstr application_name=$subname_AB'
 	PUBLICATION tap_pub_B
 	WITH (origin = none, copy_data = off)");
 
 # Wait for initial table sync to finish
-$node_A->wait_for_subscription_sync($node_B, $appname_A);
-$node_B->wait_for_subscription_sync($node_A, $appname_B1);
+$node_A->wait_for_subscription_sync($node_B, $subname_AB);
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
 
 is(1, 1, 'Bidirectional replication setup is complete');
 
-my $result;
-
 ###############################################################################
 # Check that bidirectional logical replication setup does not cause infinite
 # recursive insertion.
@@ -68,8 +74,8 @@ my $result;
 $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
 $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 # check that transaction was committed on subscriber(s)
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
@@ -85,8 +91,8 @@ is( $result, qq(11
 
 $node_A->safe_psql('postgres', "DELETE FROM tab;");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 ###############################################################################
 # Check that remote data of node_B (that originated from node_C) is not
@@ -109,23 +115,20 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_C (pub) -> node_B (sub)
 my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
 $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
-
-my $appname_B2 = 'tap_sub_B2';
 $node_B->safe_psql(
 	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B2
-	CONNECTION '$node_C_connstr application_name=$appname_B2'
+	CREATE SUBSCRIPTION $subname_BC
+	CONNECTION '$node_C_connstr application_name=$subname_BC'
 	PUBLICATION tap_pub_C
 	WITH (origin = none)");
-
-$node_B->wait_for_subscription_sync($node_C, $appname_B2);
+$node_B->wait_for_subscription_sync($node_C, $subname_BC);
 
 # insert a record
 $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
 
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
-$node_A->wait_for_catchup($appname_B1);
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
+$node_A->wait_for_catchup($subname_BA);
 
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
 is($result, qq(32), 'The node_C data replicated to node_B');
@@ -136,6 +139,69 @@ is($result, qq(),
 	'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
 );
 
+###############################################################################
+# Specifying origin=NONE indicates that the publisher should only replicate the
+# changes that are generated locally from node_B, but in this case since the
+# node_B is also subscribing data from node_A, node_B can have remotely
+# originated data from node_A. We log a warning, in this case, to draw
+# attention to there being possible remote data.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+	'postgres', "
+        CREATE SUBSCRIPTION $subname_AB2
+        CONNECTION '$node_B_connstr application_name=$subname_AB2'
+        PUBLICATION tap_pub_B
+        WITH (origin = none, copy_data = on)");
+like(
+	$stderr,
+	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested origin=NONE but might copy data that had a different origin/,
+	"Create subscription with origin = none and copy_data when the publisher has subscribed same table"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# Alter subscription ... refresh publication should be successful when no new
+# table is added
+$node_A->safe_psql(
+	'postgres', "
+        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+
+# Check Alter subscription ... refresh publication when there is a new
+# table that is subscribing data from a different publication
+$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+
+# add a new table to the publication
+$node_A->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
+$node_B->safe_psql(
+	'postgres', "
+        ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
+
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
+
+# add a new table to the publication
+$node_B->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");
+
+# Alter subscription ... refresh publication should log a warning when a new
+# table in the publisher is subscribing data from a different publication
+($result, $stdout, $stderr) = $node_A->psql(
+	'postgres', "
+        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+like(
+	$stderr,
+	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested origin=NONE but might copy data that had a different origin/,
+	"Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin=none"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# clear the operations done by this test
+$node_A->safe_psql('postgres', "DROP TABLE tab_new");
+$node_B->safe_psql('postgres', "DROP TABLE tab_new");
+$node_A->safe_psql('postgres', "DROP SUBSCRIPTION $subname_AB2");
+
 # shutdown
 $node_B->stop('fast');
 $node_A->stop('fast');
-- 
2.32.0

