From 8cc50c4464f24da10c49e5218ddb6b0f38a58c02 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 19 Dec 2025 10:51:20 +0800
Subject: [PATCH v5HEAD] Fix a race condition of updating
 procArray->replication_slot_xmin.

Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots while not holding ProcArrayLock if
already_locked is false, and acquires the ProcArrayLock just before
updating the replication slot xmin.

This could lead to a race condition: if a backend creates a new slot and
attempts to initialize the slot.xmin, but meanwhile, another backend invokes
ReplicationSlotsComputeRequiredXmin() with already_locked set to false, the
global slot xmin may be initially updated by the newly created slot, only to be
subsequently overwritten by the backend running
ReplicationSlotsComputeRequiredXmin() with an invalid or new xid value.

In the reported failure, a walsender for an apply worker computes
InvalidTransaction as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker with this value. Then the walsender for a tablesync worker
ended up computing the transaction id by
GetOldestSafeDecodingTransactionId() without considering replication
slot xmin. That led to an error ""cannot build an initial slot
snapshot as oldest safe xid %u follows snapshot's xmin %u", which was
an assertion failure prior to 240e0dbacd3.

To address the bug, we acquire the ReplicationSlotControlLock exclusively during
slot creation to do the initial update of the slot xmin. In
ReplicationSlotsComputeRequiredXmin(), we hold the ReplicationSlotControlLock in
shared mode until the global slot xmin is updated in
ProcArraySetReplicationSlotXmin(). This approach prevents concurrent
computations and updates of new xmin horizons by other backends during the
initial slot xmin update process, while it still permits concurrent calls to
ReplicationSlotsComputeRequiredXmin().
---
 src/backend/replication/logical/launcher.c |  2 ++
 src/backend/replication/logical/logical.c  | 12 ++++---
 src/backend/replication/logical/slotsync.c |  2 ++
 src/backend/replication/slot.c             | 42 +++++++++++++++++++---
 4 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..b9780c7bc99 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void)
 	Assert(MyReplicationSlot &&
 		   !TransactionIdIsValid(MyReplicationSlot->data.xmin));
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(false);
@@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void)
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	/* Write this slot to disk */
 	ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1b11ed63dc6..eef95a5b4c4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -405,11 +405,11 @@ CreateInitDecodingContext(const char *plugin,
 	 * without further interlock its return value might immediately be out of
 	 * date.
 	 *
-	 * So we have to acquire the ProcArrayLock to prevent computation of new
-	 * xmin horizons by other backends, get the safe decoding xid, and inform
-	 * the slot machinery about the new limit. Once that's done the
-	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * So we have to acquire both the ReplicationSlotControlLock and the
+	 * ProcArrayLock to prevent concurrent computation and update of new xmin
+	 * horizons by other backends, get the safe decoding xid, and inform the
+	 * slot machinery about the new limit. Once that's done the both locks
+	 * can be released as the slot machinery now is protecting against vacuum.
 	 *
 	 * Note that, temporarily, the data, not just the catalog, xmin has to be
 	 * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -422,6 +422,7 @@ CreateInitDecodingContext(const char *plugin,
 	 *
 	 * ----
 	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -436,6 +437,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bf50317b443..afcb1f3487d 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -857,6 +857,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 
 		reserve_wal_for_local_slot(remote_slot->restart_lsn);
 
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
 		SpinLockAcquire(&slot->mutex);
@@ -865,6 +866,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		SpinLockRelease(&slot->mutex);
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
+		LWLockRelease(ReplicationSlotControlLock);
 
 		/*
 		 * Make sure that concerned WAL is received and flushed before syncing
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 682eccd116c..d69be3fe701 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1171,8 +1171,14 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and the
+ * ProcArrayLock have already been acquired exclusively. It is crucial that the
+ * caller first acquires the ReplicationSlotControlLock, followed by the
+ * ProcArrayLock, to prevent any undetectable deadlocks due to incorrect lock
+ * ordering.
+ *
+ * Note that the ReplicationSlotControlLock must be locked first to avoid
+ * deadlocks.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -1182,8 +1188,33 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	TransactionId agg_catalog_xmin = InvalidTransactionId;
 
 	Assert(ReplicationSlotCtl != NULL);
+	Assert(!already_locked ||
+		   (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+			LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock until after updating the slot xmin
+	 * values, so no backend update the initial xmin for newly created slot
+	 * concurrently. A shared lock is used here to minimize lock contention,
+	 * especially when many slots exist and advancements occur frequently.
+	 * This is safe since an exclusive lock is taken during initial slot xmin
+	 * update in slot creation.
+	 *
+	 * Concurrent invocation of this function may cause the computed slot xmin
+	 * to regress. However, this is harmless because tuples prior to the most
+	 * recent xmin are no longer useful once advancement occurs (see
+	 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
+	 * before updating the effective_xmin). Thus, such regression merely
+	 * prevents VACUUM from prematurely removing tuples without causing the
+	 * early deletion of required data.
+	 *
+	 * One might think that we can hold the ProcArrayLock exclusively and
+	 * update the slot xmin values, but it could increase lock contention on
+	 * the ProcArrayLock, which is not great since this function can be called
+	 * at non-negligible frequency.
+	 */
+	if (!already_locked)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -1218,9 +1249,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	LWLockRelease(ReplicationSlotControlLock);
-
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+	if (!already_locked)
+		LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
-- 
2.51.1.windows.1

