On Mon, Aug 25, 2025 at 5:05 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> A few comments on 0001:
>

Some more comments:
1.
+ /*
+ * Return false if the leader apply worker has stopped retaining
+ * information for detecting conflicts. This implies that update_deleted
+ * can no longer be reliably detected.
+ */
+ if (!retention_active)
+ return false;
+
  /*
  * For conflict detection, we use the conflict slot's xmin value instead
  * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
@@ -3254,7 +3315,15 @@ FindDeletedTupleInLocalRel(Relation localrel,
Oid localidxoid,
  oldestxmin = slot->data.xmin;
  SpinLockRelease(&slot->mutex);

- Assert(TransactionIdIsValid(oldestxmin));
+ /*
+ * Return false if the conflict detection slot.xmin is set to
+ * InvalidTransactionId. This situation arises if the current worker is
+ * either a table synchronization or parallel apply worker, and the leader
+ * stopped retention immediately after checking the
+ * oldest_nonremovable_xid above.
+ */
+ if (!TransactionIdIsValid(oldestxmin))
+ return false;

If the current worker is tablesync or parallel_apply, it should have
exited from the above check of retention_active as we get the leader's
oldest_nonremovable_xid to decide that. What am, I missing? This made
me wonder whether we need to use slot's xmin after we have fetched
leader's oldest_nonremovable_xid to find deleted tuple?

2.
- * The interval is reset to a minimum value of 100ms once there is some
- * activity on the node.
+ * The interval is reset to the lesser of 100ms and
+ * max_conflict_retention_duration once there is some activities on the node.
AFAICS, this is not adhered in the code because you are using it when
there is no activity aka when new_xid_found is false. IS the comment
wrong or code needs some updation?

3.
+
+ /* Ensure the wait time remains within the maximum limit */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+ MySubscription->maxconflretention);
Can't we combine it with calculation of max_interval few lines above
this change? And also adjust comments atop
adjust_xid_advance_interval() accordingly?

4.
  if (am_leader_apply_worker() &&
- MySubscription->retaindeadtuples &&
+ MySubscription->retaindeadtuples && MySubscription->retentionactive &&
  !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))

I think this code can look neat if you have one condition per line.

Apart from above comments, I have tried to improve some code comments
in the attached.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 29d0c9a6e45..3df1828e755 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4546,8 +4546,8 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
                return;
 
        /*
-        * Stop retaining conflict information if required (See
-        * should_stop_conflict_info_retention() for details).
+        * We don't need to maintain oldest_nonremovable_xid if we decide
+        * to stop retaining conflict information for this worker.
         */
        if (should_stop_conflict_info_retention(rdt_data))
                return;
@@ -4650,8 +4650,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
        }
 
        /*
-        * Stop retaining conflict information if required (See
-        * should_stop_conflict_info_retention() for details).
+        * We don't need to maintain oldest_nonremovable_xid if we decide
+        * to stop retaining conflict information for this worker.
         */
        if (should_stop_conflict_info_retention(rdt_data))
                return;
@@ -4771,16 +4771,16 @@ reset_retention_data_fields(RetainDeadTuplesData 
*rdt_data)
 }
 
 /*
- * Check whether conflict information retention should be stopped because the
- * wait time has exceeded the maximum limit (max_conflict_retention_duration).
+ * Check whether conflict information retention should be stopped due to
+ * exceeding the maximum wait time (max_conflict_retention_duration).
  *
- * If retention should be stopped, proceed to the
+ * If retention should be stopped, transition to the
  * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
  * false.
  *
- * Currently, the retention will not resume automatically unless user manually
- * disables retain_dead_tuples and re-enables it after confirming that the
- * replication slot has been dropped.
+ * Note: Retention won't be resumed automatically. The user must manually
+ * disable retain_dead_tuples and re-enable it after confirming that the
+ * replication slot maintained by the launcher has been dropped.
  */
 static bool
 should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4802,9 +4802,10 @@ should_stop_conflict_info_retention(RetainDeadTuplesData 
*rdt_data)
        now = rdt_data->last_recv_time ? rdt_data->last_recv_time : 
GetCurrentTimestamp();
 
        /*
-        * Return if the wait time has not exceeded the maximum limit
-        * (max_conflict_retention_duration). The time spent waiting for table
-        * synchronization is not counted, as it's an infrequent operation.
+        * Return early if the wait time has not exceeded the configured maximum
+        * (max_conflict_retention_duration). Time spent waiting for table
+        * synchronization is excluded from this calculation, as it occurs
+        * infrequently.
         */
        if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
                                                                        
MySubscription->maxconflretention +
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index b86c759394f..62ea1a00580 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -95,8 +95,8 @@ typedef struct LogicalRepWorker
         * named "pg_conflict_detection". It asynchronously collects this ID to
         * decide when to advance the xmin value of the slot.
         *
-        * This ID would be set to InvalidTransactionId if the apply worker has
-        * stopped retaining information useful for conflict detection.
+        * This ID is set to InvalidTransactionId when the apply worker stops
+        * retaining information needed for conflict detection.
         */
        TransactionId oldest_nonremovable_xid;
 

Reply via email to