From b3853307efcdbee2c9053688b3860f0b2347462b Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Apr 2024 06:18:23 +0000
Subject: [PATCH v8 2/4] Alter slot option two_phase only when altering "on" to
 "off"

Since the two_phase option is controlled by both the publisher (as a slot option)
and the subscriber (as a subscription option), the slot option must also be
modified.

Regarding the off->on case, the logical replication already has a mechanism for
it, so there is no need to do anything special for the on->off case; however,
we must connect to the publisher and expressly change the parameter. The
operation cannot be rolled back, and altering the parameter from "on" to "off"
within a transaction is prohibited.
---
 doc/src/sgml/ref/alter_subscription.sgml      |  2 +-
 src/backend/commands/subscriptioncmds.c       | 30 ++++--
 .../libpqwalreceiver/libpqwalreceiver.c       | 23 +++--
 src/include/replication/walreceiver.h         |  5 +-
 src/test/subscription/meson.build             |  1 +
 src/test/subscription/t/099_twophase_added.pl | 95 +++++++++++++++++++
 6 files changed, 140 insertions(+), 16 deletions(-)
 create mode 100644 src/test/subscription/t/099_twophase_added.pl

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 88e9a72147..0c2894a94e 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -70,7 +70,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command>
    with <literal>refresh</literal> option as <literal>true</literal>,
    <command>ALTER SUBSCRIPTION ... SET (failover = on|off)</command> and
-   <command>ALTER SUBSCRIPTION ... SET (two_phase = on|off)</command>
+   <command>ALTER SUBSCRIPTION ... SET (two_phase = off)</command>
    cannot be executed inside a transaction block.
 
    These commands also cannot be executed when the subscription has
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 90d967eb7c..6b2cb71dac 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1097,6 +1097,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	Form_pg_subscription form;
 	bits32		supported_opts;
 	SubOpts		opts = {0};
+	bool		update_failover;
+	bool		update_two_phase;
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -1186,10 +1188,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								 errhint("Resolve these transactions and try again")));
 
 					/*
-					 * The changed two_phase option of the slot can't be rolled
-					 * back.
+					 * Since the altering two_phase option of subscriptions
+					 * also leads to the change of slot option, this command
+					 * cannot be rolled back. So prevent we are in the
+					 * transaction block.
 					 */
-					PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)");
+					if (!opts.twophase)
+						PreventInTransactionBlock(isTopLevel,
+												  "ALTER SUBSCRIPTION ... SET (two_phase = off)");
 
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
@@ -1547,14 +1553,22 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	}
 
 	/*
-	 * Try to acquire the connection necessary for altering slot.
+	 * Check the need to alter the replication slot. Failover and two_phase
+	 * options are controlled by both the publisher (as a slot option) and the
+	 * subscriber (as a subscription option).
+	 */
+	update_failover = replaces[Anum_pg_subscription_subfailover - 1];
+	update_two_phase = (replaces[Anum_pg_subscription_subtwophasestate - 1] &&
+						!opts.twophase);
+
+	/*
+	 * Try to acquire the connection necessary for altering slot, if needed.
 	 *
 	 * This has to be at the end because otherwise if there is an error while
 	 * doing the database operations we won't be able to rollback altered
 	 * slot.
 	 */
-	if (replaces[Anum_pg_subscription_subfailover - 1] ||
-		replaces[Anum_pg_subscription_subtwophasestate - 1])
+	if (update_failover || update_two_phase)
 	{
 		bool		must_use_password;
 		char	   *err;
@@ -1574,7 +1588,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase);
+			walrcv_alter_slot(wrconn, sub->slotname,
+							  update_failover ? &opts.failover : NULL,
+							  update_two_phase ? &opts.twophase : NULL);
 		}
 		PG_FINALLY();
 		{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 998bbd517a..2800597f4c 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool failover, bool two_phase);
+								const bool *failover, const bool *two_phase);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool failover, bool two_phase)
+					const bool *failover, const bool *two_phase)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )",
-					 quote_identifier(slotname),
-					 failover ? "true" : "false",
-					 two_phase ? "true" : "false");
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+					 quote_identifier(slotname));
+
+	if (failover)
+		appendStringInfo(&cmd, "FAILOVER %s",
+						 *failover ? "true" : "false");
+
+	if (failover && two_phase)
+		appendStringInfo(&cmd, ", ");
+
+	if (two_phase)
+		appendStringInfo(&cmd, "TWO_PHASE %s",
+						 *two_phase ? "true" : "false");
+
+	appendStringInfoString(&cmd, ");");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 31fa1257ec..7ffa5a58b3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
-									  bool failover,
-									  bool two_phase);
+									  const bool *failover,
+									  const bool *two_phase);
+
 
 /*
  * walrcv_get_backend_pid_fn
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index c591cd7d61..b4bd522c3d 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/099_twophase_added.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
new file mode 100644
index 0000000000..62f4acad27
--- /dev/null
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -0,0 +1,95 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Additional tests for altering two_phase option
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10
+	log_min_messages = debug1));
+$node_subscriber->start;
+
+# Define tables on both nodes
+$node_publisher->safe_psql('postgres',
+    "CREATE TABLE tab_full (a int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication, with two_phase = "off"
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_subscriber->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION regress_sub
+	CONNECTION '$publisher_connstr' PUBLICATION pub
+	WITH (two_phase = off, copy_data = off, failover = off)");
+
+# Verify the started worker recognized two_phase was disabled
+$node_subscriber->wait_for_log(
+	'logical replication apply worker for subscription "regress_sub" two_phase is DISABLED', $log_offset);
+
+# Check the case that prepared transactions exist on the publisher node.
+#
+# Since the two_phase is "off", then normally, this PREPARE will do nothing
+# until the COMMIT PREPARED, but in this test, we toggle the two_phase to "on"
+# again before the COMMIT PREPARED happens.
+
+# Prepare a transaction to insert some tuples into the table
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(1, 5));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Verify the prepared transaction is not yet replicated to the subscriber
+# because two_phase is set to "off".
+my $result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "transaction is not prepared on subscriber");
+
+$log_offset = -s $node_subscriber->logfile;
+
+# Toggle the two_phase to "on" *before* the COMMIT PREPARED. Since we are the
+# special path for the case where both two_phase and failover are altered, it
+# is also set to "on".
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION regress_sub DISABLE;
+    ALTER SUBSCRIPTION regress_sub SET (two_phase = on, failover = on);
+    ALTER SUBSCRIPTION regress_sub ENABLE;");
+
+# Verify the started worker recognized two_phase was enabled
+$node_subscriber->wait_for_log(
+	'logical replication apply worker for subscription "regress_sub" two_phase is ENABLED', $log_offset);
+
+# And do COMMIT PREPARED the prepared transaction
+$node_publisher->safe_psql('postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('regress_sub');
+
+# Verify inserted tuples are replicated
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM tab_full;");
+is($result, q(5),
+   "prepared transactions done before altering can be replicated");
+
+done_testing();
-- 
2.43.0

