Hi, here are some review comments for patch v18-0001.

======
doc/src/sgml/protocol.sgml

nitpick - Although it is no fault of your patch, IMO it would be nicer for
the TWO_PHASE description (of CREATE REPLICATION SLOT) to also be in the
same consistent order as what you have (e.g. below FAILOVER). So I moved it.

======
src/backend/access/transam/twophase.c

LookupGXactBySubid:
nitpick - add a blank line before return

======
src/backend/commands/subscriptioncmds.c

CommonChecksForFailoverAndTwophase:
nitpick - added Assert for the generic-looking "option" parameter name
nitpick - modified comment about transaction block

~~~

1. AlterSubscription
+ * Workers may still survive even if the subscription has
+ * been disabled. They may read the pg_subscription
+ * catalog and detect that the twophase parameter is
+ * updated, which causes the assertion failure. Ensure
+ * workers have already been exited to avoid it.

"which causes the assertion failure" -- what assertion failure is that? The
comment is not very clear.

~

nitpick - in comment /twophase/two_phase/
nitpick - typo /acoordingly/accordingly/

======
src/backend/replication/logical/launcher.c

logicalrep_workers_find:
nitpick - /require_lock/acquire_lock/
nitpick - take the Assert out of the else.

======
src/backend/replication/slot.c

nitpick - refactor the code to check (failover) only one time. See the
nitpicks attachment.

~

2. ParseAlterReplSlotOptions

nitpick -- IMO the ParseAlterReplSlotOptions(). function does more harm
than good here by adding the unnecessary complexity of messing around with
multiple parameters that are passed-by-reference. All this would be simpler
if it was just coded inline in the AlterReplicationSlot() function, which
is the only caller. I've refactored all this to demonstrate (see nitpicks
attachment)

======
src/include/replication/worker_internal.h

nitpick - /require_lock/acquire_lock/

======
src/test/regress/sql/subscription.sql

nitpick - tweak comments

======
src/test/subscription/t/021_twophase.pl

nitpick - change comment style to indicate each test part better.

======
99.
Please also see the attached diffs patch which implements any nitpicks
mentioned above.

======
Kind Regards,
Peter Smith.
Fujitsu Australia
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 3ac4a4b..cba6661 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2050,21 +2050,6 @@ psql "dbname=postgres replication=database" -c 
"IDENTIFY_SYSTEM;"
 
       <variablelist>
        <varlistentry>
-        <term><literal>TWO_PHASE [ <replaceable 
class="parameter">boolean</replaceable> ]</literal></term>
-        <listitem>
-         <para>
-          If true, this logical replication slot supports decoding of two-phase
-          commit. With this option, commands related to two-phase commit such 
as
-          <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT 
PREPARED</literal>
-          and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
-          The transaction will be decoded and transmitted at
-          <literal>PREPARE TRANSACTION</literal> time.
-          The default is false.
-         </para>
-        </listitem>
-       </varlistentry>
-
-       <varlistentry>
         <term><literal>RESERVE_WAL [ <replaceable 
class="parameter">boolean</replaceable> ]</literal></term>
         <listitem>
          <para>
@@ -2104,6 +2089,21 @@ psql "dbname=postgres replication=database" -c 
"IDENTIFY_SYSTEM;"
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>TWO_PHASE [ <replaceable 
class="parameter">boolean</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If true, this logical replication slot supports decoding of two-phase
+          commit. With this option, commands related to two-phase commit such 
as
+          <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT 
PREPARED</literal>
+          and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+          The transaction will be decoded and transmitted at
+          <literal>PREPARE TRANSACTION</literal> time.
+          The default is false.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
       <para>
diff --git a/src/backend/access/transam/twophase.c 
b/src/backend/access/transam/twophase.c
index 35bce68..f3c6e1f 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2741,5 +2741,6 @@ LookupGXactBySubid(Oid subid)
                }
        }
        LWLockRelease(TwoPhaseStateLock);
+
        return found;
 }
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 6995a62..3703cf6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1076,6 +1076,8 @@ CheckAlterSubOption(Subscription *sub, const char 
*option, bool isTopLevel)
 {
        StringInfoData cmd;
 
+       Assert(strstr("two_phase,failover", option));
+
        if (!sub->slotname)
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1098,8 +1100,8 @@ CheckAlterSubOption(Subscription *sub, const char 
*option, bool isTopLevel)
        appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
 
        /*
-        * The changed option of the slot can't be rolled back: prevent we are 
in
-        * the transaction state.
+        * The changed option of the slot can't be rolled back, so disallow if 
we
+        * are in a transaction block.
         */
        PreventInTransactionBlock(isTopLevel, cmd.data);
 
@@ -1282,7 +1284,7 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                        /*
                                         * Workers may still survive even if 
the subscription has
                                         * been disabled. They may read the 
pg_subscription
-                                        * catalog and detect that the twophase 
parameter is
+                                        * catalog and detect that the 
two_phase parameter is
                                         * updated, which causes the assertion 
failure. Ensure
                                         * workers have already been exited to 
avoid it.
                                         */
@@ -1304,7 +1306,7 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                                                 errmsg("cannot 
disable two_phase when prepared transactions are present"),
                                                                 
errhint("Resolve these transactions and try again.")));
 
-                                       /* Change system catalog acoordingly */
+                                       /* Change system catalog accordingly */
                                        
values[Anum_pg_subscription_subtwophasestate - 1] =
                                                CharGetDatum(opts.twophase ?
                                                                         
LOGICALREP_TWOPHASE_STATE_PENDING :
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 45744b7..c566d50 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -272,15 +272,15 @@ logicalrep_worker_find(Oid subid, Oid relid, bool 
only_running)
  * the subscription, instead of just one.
  */
 List *
-logicalrep_workers_find(Oid subid, bool only_running, bool require_lock)
+logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
 {
        int                     i;
        List       *res = NIL;
 
-       if (require_lock)
+       if (acquire_lock)
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-       else
-               Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+       Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
        /* Search for attached worker for a given subscription id. */
        for (i = 0; i < max_logical_replication_workers; i++)
@@ -291,7 +291,7 @@ logicalrep_workers_find(Oid subid, bool only_running, bool 
require_lock)
                        res = lappend(res, w);
        }
 
-       if (require_lock)
+       if (acquire_lock)
                LWLockRelease(LogicalRepWorkerLock);
 
        return res;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2f167a2..e75f24b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -842,23 +842,25 @@ ReplicationSlotAlter(const char *name, bool *failover, 
bool *two_phase)
                                                   " on the standby"));
        }
 
-       /*
-        * Do not allow users to enable failover for temporary slots as we do 
not
-        * support syncing temporary slots to the standby.
-        */
-       if (failover && *failover &&
-               MyReplicationSlot->data.persistency == RS_TEMPORARY)
+       if (failover)
+       {
+               /*
+                * Do not allow users to enable failover for temporary slots as 
we do not
+                * support syncing temporary slots to the standby.
+                */
+               if (*failover && MyReplicationSlot->data.persistency == 
RS_TEMPORARY)
                ereport(ERROR,
                                errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                errmsg("cannot enable failover for a temporary 
replication slot"));
 
-       if (failover && MyReplicationSlot->data.failover != *failover)
-       {
-               SpinLockAcquire(&MyReplicationSlot->mutex);
-               MyReplicationSlot->data.failover = *failover;
-               SpinLockRelease(&MyReplicationSlot->mutex);
+               if (MyReplicationSlot->data.failover != *failover)
+               {
+                       SpinLockAcquire(&MyReplicationSlot->mutex);
+                       MyReplicationSlot->data.failover = *failover;
+                       SpinLockRelease(&MyReplicationSlot->mutex);
 
-               update_slot = true;
+                       update_slot = true;
+               }
        }
 
        if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 04f65e0..af8e958 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1405,56 +1405,42 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
        ReplicationSlotDrop(cmd->slotname, !cmd->wait);
 }
 
+
 /*
- * Process extra options given to ALTER_REPLICATION_SLOT.
+ * Change the definition of a replication slot.
  */
 static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd,
-                                                 bool *failover_given, bool 
*failover,
-                                                 bool *two_phase_given, bool 
*two_phase)
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
 {
-       *failover_given = false;
-       *two_phase_given = false;
+       bool            failover_given = false;
+       bool            two_phase_given = false;
+       bool            failover;
+       bool            two_phase;
 
        /* Parse options */
        foreach_ptr(DefElem, defel, cmd->options)
        {
                if (strcmp(defel->defname, "failover") == 0)
                {
-                       if (*failover_given)
+                       if (failover_given)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or 
redundant options")));
-                       *failover_given = true;
-                       *failover = defGetBoolean(defel);
+                       failover_given = true;
+                       failover = defGetBoolean(defel);
                }
                else if (strcmp(defel->defname, "two_phase") == 0)
                {
-                       if (*two_phase_given)
+                       if (two_phase_given)
                                ereport(ERROR,
                                                (errcode(ERRCODE_SYNTAX_ERROR),
                                                 errmsg("conflicting or 
redundant options")));
-                       *two_phase_given = true;
-                       *two_phase = defGetBoolean(defel);
+                       two_phase_given = true;
+                       two_phase = defGetBoolean(defel);
                }
                else
                        elog(ERROR, "unrecognized option: %s", defel->defname);
        }
-}
-
-/*
- * Change the definition of a replication slot.
- */
-static void
-AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
-{
-       bool            failover_given;
-       bool            two_phase_given;
-       bool            failover;
-       bool            two_phase;
-
-       ParseAlterReplSlotOptions(cmd, &failover_given, &failover,
-                                                         &two_phase_given, 
&two_phase);
 
        ReplicationSlotAlter(cmd->slotname,
                                                 failover_given ? &failover : 
NULL,
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 990f524..9646261 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -241,7 +241,7 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                
                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running,
-                                                                        bool 
require_lock);
+                                                                        bool 
acquire_lock);
 extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                                                         Oid 
dbid, Oid subid, const char *subname,
                                                                         Oid 
userid, Oid relid,
diff --git a/src/test/regress/expected/subscription.out 
b/src/test/regress/expected/subscription.out
index 51fa4b9..40e1a07 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,7 +377,7 @@ HINT:  To initiate replication, you must manually create 
the replication slot, e
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      
| off       | p                | f                | any    | t                 
| f             | f        | off                | dbname=regress_doesnotexist | 
0/0
 (1 row)
 
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
                                                                                
                                 List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql 
b/src/test/regress/sql/subscription.sql
index a3886d7..b64f419 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,7 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 
'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
--- We can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase is enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
diff --git a/src/test/subscription/t/021_twophase.pl 
b/src/test/subscription/t/021_twophase.pl
index 4e8f627..66265c7 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -371,8 +371,8 @@ is($result, qq(2), 'replicated data in subscriber table');
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 
 ###############################
-# Disable the subscription and alter it to two_phase = false,
-# then verify that the altered subscription reflects the two_phase option.
+# Alter the subscription to two_phase = false.
+# Verify that the altered subscription reflects the two_phase option.
 ###############################
 
 # Alter subscription two_phase to false
@@ -395,7 +395,10 @@ $result = $node_subscriber->safe_psql('postgres',
 );
 is($result, qq(d), 'two-phase should be disabled');
 
-# Now do a prepare on the publisher and make sure that it is not replicated.
+###############################
+# Now do a prepare on the publisher.
+# Verify that it is not replicated.
+###############################
 $node_publisher->safe_psql(
        'postgres', qq{
     BEGIN;
@@ -411,7 +414,10 @@ $result = $node_subscriber->safe_psql('postgres',
        "SELECT count(*) FROM pg_prepared_xacts;");
 is($result, qq(0), 'should be no prepared transactions on subscriber');
 
-# Now commit the insert and verify that it is replicated
+###############################
+# Now commit the insert.
+# Verify that it is replicated.
+###############################
 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
 
 # Wait for the subscriber to catchup
@@ -422,7 +428,10 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
 is($result, qq(3), 'replicated data in subscriber table');
 
-# Alter subscription two_phase to true
+###############################
+# Alter the subscription to two_phase = true.
+# Verify that the altered subscription reflects the two_phase option.
+###############################
 $node_subscriber->safe_psql('postgres',
     "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
 $node_subscriber->poll_query_until('postgres',

Reply via email to