On Thu, Jun 12, 2025 at 11:34 AM Zhijie Hou (Fujitsu)
<houzj.f...@fujitsu.com> wrote:
>

Few comments on v36 patches:
==========================
1. In advance_conflict_slot_xmin(), we first save the slot to disk,
then update its effective xmin, and then do the required xmin
computation. Now, if we don't save the slot every time, there is a
risk that its value can go backwards after a restart. But OTOH, for
physical slots maintained by walsender for physical replication, we
also don't save the physical slot. However, still the system works,
see discussion in email: [1].

As per my understanding, even if the conflict_slot's xmin moved back
after restart, it shouldn't cause any problem. Because it will anyway
be moved ahead in the next cycle, and there won't be any rows that
will get removed but are required for conflict detection. If this is
correct, then we don't need to save the slot in
advance_conflict_slot_xmin().

2.
+ *
+ * Issue a warning if track_commit_timestamp is not enabled when
+ * check_commit_ts is set to true.
+ *
+ * Issue a warning if the subscription is being disabled.
+ */
+void
+CheckSubConflictInfoRetention(bool retain_conflict_info, bool check_commit_ts,
+   bool disabling_sub)
+{
+ if (!retain_conflict_info)
+ return;
+
+ if (check_commit_ts && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting
conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (disabling_sub)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until
the subscription is enabled"),
+ errhint("Consider setting %s to false.",
+ "retain_conflict_info"));

The quoted comments atop this function just say what it is apparent
from the code. It is better if the comments explain why we allow to
proceed when the above conditions are not met.

I think we can probably add a check here that this option requires
wal_level = replica as the launcher needs to create a physical slot to
retain the required info.

3. Isn't the new check for logical slots in
check_new_cluster_subscription_configuration() somewhat redundant with
the previous check done in
check_new_cluster_logical_replication_slots()? Can't we combine both?

Apart from this, I have made a number of changes in the comments and a
few other cosmetic changes in the attached.

[1]: https://www.postgresql.org/message-id/28c8bf-68470780-3-51b29480%4089454035

-- 
With Regards,
Amit Kapila.
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index b8f9bf573ea..16310cf474f 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8088,8 +8088,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration 
count&gt;</replaceable>:<replaceable>&l
       </para>
       <para>
        If true, the information (e.g., dead tuples, commit timestamps, and
-       origins) on the subscriber that is still useful for conflict detection
-       is retained.
+       origins) on the subscriber that is useful for conflict detection is\
+       retained.
       </para></entry>
      </row>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml 
b/doc/src/sgml/ref/create_subscription.sgml
index be90088bcd0..0e49bf09eca 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -443,18 +443,18 @@ CREATE SUBSCRIPTION <replaceable 
class="parameter">subscription_name</replaceabl
         <listitem>
          <para>
           Specifies whether the information (e.g., dead tuples, commit
-          timestamps, and origins) on the subscriber that is still useful for
-          conflict detection is retained. The default is
-          <literal>false</literal>. If set to true, an additional replication
-          slot named <quote><literal>pg_conflict_detection</literal></quote>
+          timestamps, and origins) required for conflict detection on the
+          subscriber is retained. The default is <literal>false</literal>.
+          If set to true, a replication slot named
+          <quote><literal>pg_conflict_detection</literal></quote>
           will be created on the subscriber to prevent the conflict information
           from being removed.
          </para>
 
          <para>
           Note that the information useful for conflict detection is retained
-          only after the creation of the additional slot. You can verify the
-          existence of this slot by querying <link 
linkend="view-pg-replication-slots">pg_replication_slots</link>.
+          only after the creation of the slot. You can verify the existence of
+          this slot by querying <link 
linkend="view-pg-replication-slots">pg_replication_slots</link>.
           And even if multiple subscriptions on one node enable this option,
           only one replication slot will be created.
          </para>
@@ -468,7 +468,7 @@ CREATE SUBSCRIPTION <replaceable 
class="parameter">subscription_name</replaceabl
          </para>
 
          <para>
-          This option cannot be enabled if the publisher is also a physical 
standby.
+          This option cannot be enabled if the publisher is a physical standby.
          </para>
         </listitem>
        </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 9938fbe3e57..396cfcce1f6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,7 +72,7 @@
 #define SUBOPT_PASSWORD_REQUIRED       0x00000800
 #define SUBOPT_RUN_AS_OWNER                    0x00001000
 #define SUBOPT_FAILOVER                                0x00002000
-#define SUBOPT_RETAIN_CONFLICT_INFO            0x00004000
+#define SUBOPT_RETAIN_CONFLICT_INFO    0x00004000
 #define SUBOPT_LSN                                     0x00008000
 #define SUBOPT_ORIGIN                          0x00010000
 
@@ -625,6 +625,7 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                 errmsg("password_required=false is 
superuser-only"),
                                 errhint("Subscriptions with the 
password_required option set to false may only be created or modified by the 
superuser.")));
 
+       /* Ensure that we can enable retainconflictinfo. */
        CheckSubConflictInfoRetention(opts.retainconflictinfo, true,
                                                                  
!opts.enabled);
 
@@ -1084,10 +1085,15 @@ CheckAlterSubOption(Subscription *sub, const char 
*option,
         * publisher cannot be modified if the slot is currently acquired by the
         * existing walsender.
         *
-        * Do not allow changing the retain_conflict_info option when the
-        * subscription is enabled or the apply worker is active, to prevent 
race
-        * conditions arising from the new option value being acknowledged
-        * asynchronously by the launcher and apply workers.
+        * Note that two_phase is enabled (aka changed from 'false' to 'true') 
on
+        * the publisher by the existing walsender, so we could have allowed 
that
+        * even when the subscription is enabled. But we kept this restriction 
for
+        * the sake of consistency and simplicity.
+        *
+        * Additionally, do not allow changing the retain_conflict_info option 
when
+        * the subscription is enabled to prevent race conditions arising from 
the
+        * new option value being acknowledged asynchronously by the launcher 
and
+        * apply workers.
         *
         * Without the restriction, a race condition may arise when a user
         * disables and immediately re-enables the retain_conflict_info option. 
In
@@ -1118,11 +1124,6 @@ CheckAlterSubOption(Subscription *sub, const char 
*option,
         * to avoid the race conditions described above, but we maintain the
         * restriction for both enable and disable operations for the sake of
         * consistency.
-        *
-        * Note that two_phase is enabled (aka changed from 'false' to 'true') 
on
-        * the publisher by the existing walsender, so we could have allowed 
that
-        * even when the subscription is enabled. But we kept this restriction 
for
-        * the sake of consistency and simplicity.
         */
        if (sub->enabled)
                ereport(ERROR,
@@ -1399,7 +1400,7 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                         * subscription has been disabled.
                                         *
                                         * Ensure workers have already been 
exited to avoid the
-                                        * race conditions as described in 
CheckAlterSubOption().
+                                        * race conditions described in 
CheckAlterSubOption().
                                         */
                                        if (logicalrep_workers_find(subid, 
true, true))
                                                ereport(ERROR,
@@ -1417,12 +1418,8 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                        Assert(!sub->enabled);
 
                                        /*
-                                        * Provide a notice if 
retain_conflict_info is enabled for
-                                        * a disabled subscription, reminding 
the user to enable
-                                        * the subscription to prevent the 
accumulation of dead
-                                        * tuples. A warning is not issued since
-                                        * retain_conflict_info can be altered 
only for disabled
-                                        * subscriptions.
+                                        * Remind the user that enabling 
subscription will prevent
+                                        * the accumulation of dead tuples.
                                         */
                                        if (opts.retainconflictinfo)
                                                ereport(NOTICE,
@@ -1461,10 +1458,7 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                                        
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                                         errmsg("cannot enable 
subscription that does not have a slot name")));
 
-                               /*
-                                * Perform the track_commit_timestamp check 
only when enabling
-                                * the subscription.
-                                */
+                               /* Check track_commit_timestamp only when 
enabling the subscription. */
                                
CheckSubConflictInfoRetention(sub->retainconflictinfo,
                                                                                
          opts.enabled, !opts.enabled);
 
@@ -2341,14 +2335,15 @@ check_publications_origin(WalReceiverConn *wrconn, List 
*publications,
 }
 
 /*
- * Check if the publisher's status permits enabling retain_conflict_info.
+ * Determine whether the retain_conflict_info can be enable based on the
+ * publisher's status.
  *
- * Enabling retain_conflict_info is not allowed if the publisher's version is
- * prior to PG18 or if the publisher is in recovery (operating as a standby
+ * This option is disallowed if the publisher is running a version earlier
+ * than the PG19, or if the publisher is in recovery (i.e., it is a standby
  * server).
  *
- * Refer to the comments atop maybe_advance_nonremovable_xid() for detailed
- * reasons.
+ * See comments atop maybe_advance_nonremovable_xid() for a detailed
+ * explanation.
  */
 static void
 check_pub_conflict_info_retention(WalReceiverConn *wrconn, bool 
retain_conflict_info)
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 799daa247e2..aa763fc5ef2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1339,7 +1339,7 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker,
                /*
                 * Create a replication slot to retain information necessary for
                 * conflict detection such as dead tuples, commit timestamps, 
and
-                * origins if requested by any subscription.
+                * origins.
                 *
                 * The slot is created before starting the apply worker to 
prevent it
                 * from unnecessarily maintaining its oldest_nonremovable_xid.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index f7434b0f6cd..10e1cb7ea45 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5257,18 +5257,20 @@ InitializeLogRepWorker(void)
        }
 
        /*
-        * Restart the worker if retain_conflict_info was enabled at startup. 
The
-        * replication slot for conflict detection may not be created yet, or
-        * might soon be dropped as the launcher sees retain_conflict_info as
-        * disabled. To prevent unnecessary maintenance of 
oldest_nonremovable_xid
-        * when the slot is absent or at risk of being dropped, a restart is
-        * initiated.
+        * Restart the worker if retain_conflict_info was enabled during 
startup.
+        *
+        * At this point, the replication slot used for conflict detection might
+        * not exist yet, or could be dropped soon if the launcher perceives
+        * retain_conflict_info as disabled. To avoid unnecessary tracking of
+        * oldest_nonremovable_xid when the slot is absent or at risk of being
+        * dropped, a restart is initiated.
         *
         * The oldest_nonremovable_xid should be initialized only when the
         * retain_conflict_info is enabled before launching the worker. See
         * logicalrep_worker_launch.
         */
-       if (am_leader_apply_worker() && MySubscription->retainconflictinfo &&
+       if (am_leader_apply_worker() &&
+               MySubscription->retainconflictinfo &&
                
!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
        {
                ereport(LOG,

Reply via email to