On Mon, Aug 25, 2025 at 5:05 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > A few comments on 0001: >
Some more comments: 1. + /* + * Return false if the leader apply worker has stopped retaining + * information for detecting conflicts. This implies that update_deleted + * can no longer be reliably detected. + */ + if (!retention_active) + return false; + /* * For conflict detection, we use the conflict slot's xmin value instead * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as @@ -3254,7 +3315,15 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid, oldestxmin = slot->data.xmin; SpinLockRelease(&slot->mutex); - Assert(TransactionIdIsValid(oldestxmin)); + /* + * Return false if the conflict detection slot.xmin is set to + * InvalidTransactionId. This situation arises if the current worker is + * either a table synchronization or parallel apply worker, and the leader + * stopped retention immediately after checking the + * oldest_nonremovable_xid above. + */ + if (!TransactionIdIsValid(oldestxmin)) + return false; If the current worker is tablesync or parallel_apply, it should have exited from the above check of retention_active as we get the leader's oldest_nonremovable_xid to decide that. What am, I missing? This made me wonder whether we need to use slot's xmin after we have fetched leader's oldest_nonremovable_xid to find deleted tuple? 2. - * The interval is reset to a minimum value of 100ms once there is some - * activity on the node. + * The interval is reset to the lesser of 100ms and + * max_conflict_retention_duration once there is some activities on the node. AFAICS, this is not adhered in the code because you are using it when there is no activity aka when new_xid_found is false. IS the comment wrong or code needs some updation? 3. + + /* Ensure the wait time remains within the maximum limit */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval, + MySubscription->maxconflretention); Can't we combine it with calculation of max_interval few lines above this change? And also adjust comments atop adjust_xid_advance_interval() accordingly? 4. if (am_leader_apply_worker() && - MySubscription->retaindeadtuples && + MySubscription->retaindeadtuples && MySubscription->retentionactive && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) I think this code can look neat if you have one condition per line. Apart from above comments, I have tried to improve some code comments in the attached. -- With Regards, Amit Kapila.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 29d0c9a6e45..3df1828e755 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4546,8 +4546,8 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data, return; /* - * Stop retaining conflict information if required (See - * should_stop_conflict_info_retention() for details). + * We don't need to maintain oldest_nonremovable_xid if we decide + * to stop retaining conflict information for this worker. */ if (should_stop_conflict_info_retention(rdt_data)) return; @@ -4650,8 +4650,8 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) } /* - * Stop retaining conflict information if required (See - * should_stop_conflict_info_retention() for details). + * We don't need to maintain oldest_nonremovable_xid if we decide + * to stop retaining conflict information for this worker. */ if (should_stop_conflict_info_retention(rdt_data)) return; @@ -4771,16 +4771,16 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) } /* - * Check whether conflict information retention should be stopped because the - * wait time has exceeded the maximum limit (max_conflict_retention_duration). + * Check whether conflict information retention should be stopped due to + * exceeding the maximum wait time (max_conflict_retention_duration). * - * If retention should be stopped, proceed to the + * If retention should be stopped, transition to the * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return * false. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_dead_tuples and re-enables it after confirming that the - * replication slot has been dropped. + * Note: Retention won't be resumed automatically. The user must manually + * disable retain_dead_tuples and re-enable it after confirming that the + * replication slot maintained by the launcher has been dropped. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4802,9 +4802,10 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); /* - * Return if the wait time has not exceeded the maximum limit - * (max_conflict_retention_duration). The time spent waiting for table - * synchronization is not counted, as it's an infrequent operation. + * Return early if the wait time has not exceeded the configured maximum + * (max_conflict_retention_duration). Time spent waiting for table + * synchronization is excluded from this calculation, as it occurs + * infrequently. */ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, MySubscription->maxconflretention + diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b86c759394f..62ea1a00580 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -95,8 +95,8 @@ typedef struct LogicalRepWorker * named "pg_conflict_detection". It asynchronously collects this ID to * decide when to advance the xmin value of the slot. * - * This ID would be set to InvalidTransactionId if the apply worker has - * stopped retaining information useful for conflict detection. + * This ID is set to InvalidTransactionId when the apply worker stops + * retaining information needed for conflict detection. */ TransactionId oldest_nonremovable_xid;