Hi,

On 2021-04-07 17:10:37 -0700, Andres Freund wrote:
> I think this can be solved in two different ways:
>
> 1) Hold ReplicationSlotAllocationLock with LW_SHARED across most of
>    InvalidateObsoleteReplicationSlots(). That way nobody could re-create a new
>    slot in the to-be-obsoleted-slot's place.
>
> 2) Atomically check whether the slot needs to be invalidated and try to
>    acquire if needed. Don't release ReplicationSlotControlLock between those
>    two steps. Signal the owner to release the slot iff we couldn't acquire the
>    slot. In the latter case wait and then recheck if the slot still needs to
>    be dropped.
>
> To me 2) seems better, because we then can also be sure that the slot still
> needs to be obsoleted, rather than potentially doing so unnecessarily.
>
>
> It looks to me like several of the problems here stem from trying to reuse
> code from ReplicationSlotAcquireInternal() (which before this was just named
> ReplicationSlotAcquire()).  I don't think that makes sense, because cases like
> this want to check if a condition is true, and acquire it only if so.
>
> IOW, I think this basically needs to look like ReplicationSlotsDropDBSlots(),
> except that a different condition is checked, and the if (active_pid) case
> needs to prepare a condition variable, signal the owner and then wait on the
> condition variable, to restart after.

I'm also confused by the use of ConditionVariableTimedSleep(timeout =
10). Why do we need a timed sleep here in the first place? And why with
such a short sleep?

I also noticed that the code is careful to use CHECK_FOR_INTERRUPTS(); -
but is aware it's running in checkpointer. I don't think CFI does much
there? If we are worried about needing to check for interrupts, more
work is needed.


Sketch for a fix attached. I did leave the odd
ConditionVariableTimedSleep(10ms) in, because I wasn't sure why it's
there...

After this I don't see a reason to have SAB_Inquire - as far as I can
tell it's practically impossible to use without race conditions? Except
for raising an error - which is "builtin"...

Greetings,

Andres Freund
>From bb2c2c4ecca51d2675f2e5c4d6ac3490995be2b0 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 7 Apr 2021 19:01:03 -0700
Subject: [PATCH 1/2] WIP: Sketch for a fix for
 InvalidateObsoleteReplicationSlots().

---
 src/backend/replication/slot.c | 226 ++++++++++++++++++++-------------
 1 file changed, 139 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 75a087c2f9d..5864b9b0139 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1153,6 +1153,140 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Helper for InvalidateObsoleteReplicationSlots. Returns whether
+ * ReplicationSlotControlLock was released.
+ */
+static bool
+InvalidateObsoleteReplicationSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+	int		last_signaled_pid = 0;
+	bool	released_lock = false;
+
+	while (true)
+	{
+		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
+		bool		slot_conflicts;
+		NameData	slotname;
+		int			active_pid = 0;
+
+		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+		CHECK_FOR_INTERRUPTS();
+
+		slot_conflicts = false;
+
+		if (!s->in_use)
+			continue;
+
+		/*
+		 * Check if the slot needs to be invalidated. If it needs to be
+		 * invalidated, and is not currently acquired, acquire it and mark it
+		 * as having been invalidated. We do all of this with the spinlock
+		 * held - otherwise there would be race conditions (e.g. the slot's
+		 * restart_lsn moving ahead, the slot concurrently being dropped after
+		 * we release ReplicationSlotControlLock, ...).
+		 */
+		SpinLockAcquire(&s->mutex);
+
+		restart_lsn = s->data.restart_lsn;
+
+		/* check if slot needs to be invalidated */
+		if (!XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < oldestLSN)
+		{
+			slot_conflicts = true;
+			slotname = s->data.name;
+			active_pid = s->active_pid;
+
+			/* check if we can acquire it */
+			if (active_pid == 0)
+			{
+				MyReplicationSlot = s;
+				s->active_pid = MyProcPid;
+				s->data.invalidated_at = s->data.restart_lsn;
+				s->data.restart_lsn = InvalidXLogRecPtr;
+			}
+		}
+
+		SpinLockRelease(&s->mutex);
+
+		if (!slot_conflicts)
+		{
+			Assert(active_pid == 0);
+
+			break;
+		}
+		else if (active_pid != 0)
+		{
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != active_pid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								active_pid, NameStr(slotname))));
+
+				(void) kill(active_pid, SIGTERM);
+				last_signaled_pid = active_pid;
+			}
+
+			/*
+			 * Wait until the slot is released.
+			 *
+			 * Will immediately return in the first iteration, so we can
+			 * recheck the condition before sleeping. That addresses the
+			 * otherwise possible race of the slot already having been
+			 * released.
+			 */
+			ConditionVariableTimedSleep(&s->active_cv, 10,
+										WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+			/* re-acquire for next loop iteration */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		}
+		else
+		{
+			/*
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations. Now that we (temporarily) acquired the slot,
+			 * that's safe, as long as we afterwards restart the scan from
+			 * scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ereport(LOG,
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+							NameStr(slotname),
+							LSN_FORMAT_ARGS(restart_lsn))));
+
+			break;
+		}
+
+	}
+
+	Assert(!released_lock == LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+	return released_lock;
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1171,99 +1305,17 @@ restart:
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
-		NameData	slotname;
-		int		wspid;
-		int		last_signaled_pid = 0;
+
+		CHECK_FOR_INTERRUPTS();
 
 		if (!s->in_use)
 			continue;
 
-		SpinLockAcquire(&s->mutex);
-		slotname = s->data.name;
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-			continue;
-		LWLockRelease(ReplicationSlotControlLock);
-		CHECK_FOR_INTERRUPTS();
-
-		/* Get ready to sleep on the slot in case it is active */
-		ConditionVariablePrepareToSleep(&s->active_cv);
-
-		for (;;)
+		if (InvalidateObsoleteReplicationSlot(s, oldestLSN))
 		{
-			/*
-			 * Try to mark this slot as used by this process.
-			 *
-			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
-			 * should not cancel the prepared condition variable
-			 * if this slot is active in other process. Because in this case
-			 * we have to wait on that CV for the process owning
-			 * the slot to be terminated, later.
-			 */
-			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
-
-			/*
-			 * Exit the loop if we successfully acquired the slot or
-			 * the slot was dropped during waiting for the owning process
-			 * to be terminated. For example, the latter case is likely to
-			 * happen when the slot is temporary because it's automatically
-			 * dropped by the termination of the owning process.
-			 */
-			if (wspid <= 0)
-				break;
-
-			/*
-			 * Signal to terminate the process that owns the slot.
-			 *
-			 * There is the race condition where other process may own
-			 * the slot after the process using it was terminated and before
-			 * this process owns it. To handle this case, we signal again
-			 * if the PID of the owning process is changed than the last.
-			 *
-			 * XXX This logic assumes that the same PID is not reused
-			 * very quickly.
-			 */
-			if (last_signaled_pid != wspid)
-			{
-				ereport(LOG,
-						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-								wspid, NameStr(slotname))));
-				(void) kill(wspid, SIGTERM);
-				last_signaled_pid = wspid;
-			}
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
-
-		/*
-		 * Do nothing here and start from scratch if the slot has
-		 * already been dropped.
-		 */
-		if (wspid == -1)
+			/* if the lock was released, we need to restart from scratch */
 			goto restart;
-
-		ereport(LOG,
-				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-						NameStr(slotname),
-						LSN_FORMAT_ARGS(restart_lsn))));
-
-		SpinLockAcquire(&s->mutex);
-		s->data.invalidated_at = s->data.restart_lsn;
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-
-		/* Make sure the invalidated state persists across server restart */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-		ReplicationSlotRelease();
-
-		/* if we did anything, start from scratch */
-		goto restart;
+		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 }
-- 
2.31.0.121.g9198c13e34

>From 9ad7751dadce53d1d442b2fe949a96652634a4ac Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Wed, 7 Apr 2021 19:03:05 -0700
Subject: [PATCH 2/2] WIP: Remove SlotAcquireBehavior again.

---
 src/include/replication/slot.h                | 10 +---
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/slot.c                | 49 +++++--------------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +-
 5 files changed, 16 insertions(+), 51 deletions(-)

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50df..0443e02d2be 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency
 	RS_TEMPORARY
 } ReplicationSlotPersistency;
 
-/* For ReplicationSlotAcquire, q.v. */
-typedef enum SlotAcquireBehavior
-{
-	SAB_Error,
-	SAB_Block,
-	SAB_Inquire
-} SlotAcquireBehavior;
-
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern int	ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 01d354829b9..1f38c5b33ea 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-	(void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
+	ReplicationSlotAcquire(NameStr(*name), true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5864b9b0139..e0ad6830bb7 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 0;	/* the maximum number of replication
 										 * slots */
 
-static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
-										  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -373,34 +371,16 @@ SearchNamedReplicationSlot(const char *name)
 /*
  * Find a previously created slot and mark it as used by this process.
  *
- * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, -1 if the slot no longer
- * exists, or the PID of the owning process otherwise.  If behavior is
- * SAB_Error, then trying to acquire an owned slot is an error.
- * If SAB_Block, we sleep until the slot is released by the owning process.
+ * An error is raised if nowait is true and the slot is currently in use. If
+ * nowait is false, we sleep until the slot is released by the owning process.
  */
-int
-ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
-{
-	return ReplicationSlotAcquireInternal(NULL, name, behavior);
-}
-
-/*
- * Mark the specified slot as used by this process.
- *
- * Only one of slot and name can be specified.
- * If slot == NULL, search for the slot with the given name.
- *
- * See comments about the return value in ReplicationSlotAcquire().
- */
-static int
-ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
-							   SlotAcquireBehavior behavior)
+void
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
 	ReplicationSlot *s;
 	int			active_pid;
 
-	AssertArg((slot == NULL) ^ (name == NULL));
+	AssertArg(name != NULL);
 
 retry:
 	Assert(MyReplicationSlot == NULL);
@@ -411,17 +391,15 @@ retry:
 	 * Search for the slot with the specified name if the slot to acquire is
 	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = slot ? slot : SearchNamedReplicationSlot(name);
+	s = SearchNamedReplicationSlot(name);
 	if (s == NULL || !s->in_use)
 	{
 		LWLockRelease(ReplicationSlotControlLock);
 
-		if (behavior == SAB_Inquire)
-			return -1;
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist",
-						name ? name : NameStr(slot->data.name))));
+						name)));
 	}
 
 	/*
@@ -435,7 +413,7 @@ retry:
 		 * (We may end up not sleeping, but we don't want to do this while
 		 * holding the spinlock.)
 		 */
-		if (behavior == SAB_Block)
+		if (!nowait)
 			ConditionVariablePrepareToSleep(&s->active_cv);
 
 		SpinLockAcquire(&s->mutex);
@@ -455,13 +433,11 @@ retry:
 	 */
 	if (active_pid != MyProcPid)
 	{
-		if (behavior == SAB_Error)
+		if (!nowait)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("replication slot \"%s\" is active for PID %d",
 							NameStr(s->data.name), active_pid)));
-		else if (behavior == SAB_Inquire)
-			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
 		ConditionVariableSleep(&s->active_cv,
@@ -469,7 +445,7 @@ retry:
 		ConditionVariableCancelSleep();
 		goto retry;
 	}
-	else if (behavior == SAB_Block)
+	else if (!nowait)
 		ConditionVariableCancelSleep();	/* no sleep needed after all */
 
 	/* Let everybody know we've modified this slot */
@@ -477,9 +453,6 @@ retry:
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
-
-	/* success */
-	return 0;
 }
 
 /*
@@ -587,7 +560,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
+	ReplicationSlotAcquire(name, nowait);
 
 	ReplicationSlotDropAcquired();
 }
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9d36879ed7..0fc422be82e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
 
 	/* Acquire the slot so we "own" it */
-	(void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
+	ReplicationSlotAcquire(NameStr(*slotname), true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4bf8a18e01e..6483b57fc0b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+		ReplicationSlotAcquire(cmd->slotname, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+	ReplicationSlotAcquire(cmd->slotname, true);
 
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
 		ereport(ERROR,
-- 
2.31.0.121.g9198c13e34

Reply via email to