Hi, On 2021-04-07 17:10:37 -0700, Andres Freund wrote: > I think this can be solved in two different ways: > > 1) Hold ReplicationSlotAllocationLock with LW_SHARED across most of > InvalidateObsoleteReplicationSlots(). That way nobody could re-create a new > slot in the to-be-obsoleted-slot's place. > > 2) Atomically check whether the slot needs to be invalidated and try to > acquire if needed. Don't release ReplicationSlotControlLock between those > two steps. Signal the owner to release the slot iff we couldn't acquire the > slot. In the latter case wait and then recheck if the slot still needs to > be dropped. > > To me 2) seems better, because we then can also be sure that the slot still > needs to be obsoleted, rather than potentially doing so unnecessarily. > > > It looks to me like several of the problems here stem from trying to reuse > code from ReplicationSlotAcquireInternal() (which before this was just named > ReplicationSlotAcquire()). I don't think that makes sense, because cases like > this want to check if a condition is true, and acquire it only if so. > > IOW, I think this basically needs to look like ReplicationSlotsDropDBSlots(), > except that a different condition is checked, and the if (active_pid) case > needs to prepare a condition variable, signal the owner and then wait on the > condition variable, to restart after.
I'm also confused by the use of ConditionVariableTimedSleep(timeout = 10). Why do we need a timed sleep here in the first place? And why with such a short sleep? I also noticed that the code is careful to use CHECK_FOR_INTERRUPTS(); - but is aware it's running in checkpointer. I don't think CFI does much there? If we are worried about needing to check for interrupts, more work is needed. Sketch for a fix attached. I did leave the odd ConditionVariableTimedSleep(10ms) in, because I wasn't sure why it's there... After this I don't see a reason to have SAB_Inquire - as far as I can tell it's practically impossible to use without race conditions? Except for raising an error - which is "builtin"... Greetings, Andres Freund
>From bb2c2c4ecca51d2675f2e5c4d6ac3490995be2b0 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 7 Apr 2021 19:01:03 -0700 Subject: [PATCH 1/2] WIP: Sketch for a fix for InvalidateObsoleteReplicationSlots(). --- src/backend/replication/slot.c | 226 ++++++++++++++++++++------------- 1 file changed, 139 insertions(+), 87 deletions(-) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c2f9d..5864b9b0139 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1153,6 +1153,140 @@ ReplicationSlotReserveWal(void) } } +/* + * Helper for InvalidateObsoleteReplicationSlots. Returns whether + * ReplicationSlotControlLock was released. + */ +static bool +InvalidateObsoleteReplicationSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + int last_signaled_pid = 0; + bool released_lock = false; + + while (true) + { + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool slot_conflicts; + NameData slotname; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + CHECK_FOR_INTERRUPTS(); + + slot_conflicts = false; + + if (!s->in_use) + continue; + + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do all of this with the spinlock + * held - otherwise there would be race conditions (e.g. the slot's + * restart_lsn moving ahead, the slot concurrently being dropped after + * we release ReplicationSlotControlLock, ...). + */ + SpinLockAcquire(&s->mutex); + + restart_lsn = s->data.restart_lsn; + + /* check if slot needs to be invalidated */ + if (!XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < oldestLSN) + { + slot_conflicts = true; + slotname = s->data.name; + active_pid = s->active_pid; + + /* check if we can acquire it */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.invalidated_at = s->data.restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } + } + + SpinLockRelease(&s->mutex); + + if (!slot_conflicts) + { + Assert(active_pid == 0); + + break; + } + else if (active_pid != 0) + { + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* + * Signal to terminate the process that owns the slot. + * + * There is the race condition where other process may own + * the slot after the process using it was terminated and before + * this process owns it. To handle this case, we signal again + * if the PID of the owning process is changed than the last. + * + * XXX This logic assumes that the same PID is not reused + * very quickly. + */ + if (last_signaled_pid != active_pid) + { + ereport(LOG, + (errmsg("terminating process %d because replication slot \"%s\" is too far behind", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; + } + + /* + * Wait until the slot is released. + * + * Will immediately return in the first iteration, so we can + * recheck the condition before sleeping. That addresses the + * otherwise possible race of the slot already having been + * released. + */ + ConditionVariableTimedSleep(&s->active_cv, 10, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* re-acquire for next loop iteration */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + } + else + { + /* + * Don't want to hold ReplicationSlotControlLock across file + * system operations. Now that we (temporarily) acquired the slot, + * that's safe, as long as we afterwards restart the scan from + * scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + NameStr(slotname), + LSN_FORMAT_ARGS(restart_lsn)))); + + break; + } + + } + + Assert(!released_lock == LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + return released_lock; +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. @@ -1171,99 +1305,17 @@ restart: for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn = InvalidXLogRecPtr; - NameData slotname; - int wspid; - int last_signaled_pid = 0; + + CHECK_FOR_INTERRUPTS(); if (!s->in_use) continue; - SpinLockAcquire(&s->mutex); - slotname = s->data.name; - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) - continue; - LWLockRelease(ReplicationSlotControlLock); - CHECK_FOR_INTERRUPTS(); - - /* Get ready to sleep on the slot in case it is active */ - ConditionVariablePrepareToSleep(&s->active_cv); - - for (;;) + if (InvalidateObsoleteReplicationSlot(s, oldestLSN)) { - /* - * Try to mark this slot as used by this process. - * - * Note that ReplicationSlotAcquireInternal(SAB_Inquire) - * should not cancel the prepared condition variable - * if this slot is active in other process. Because in this case - * we have to wait on that CV for the process owning - * the slot to be terminated, later. - */ - wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); - - /* - * Exit the loop if we successfully acquired the slot or - * the slot was dropped during waiting for the owning process - * to be terminated. For example, the latter case is likely to - * happen when the slot is temporary because it's automatically - * dropped by the termination of the owning process. - */ - if (wspid <= 0) - break; - - /* - * Signal to terminate the process that owns the slot. - * - * There is the race condition where other process may own - * the slot after the process using it was terminated and before - * this process owns it. To handle this case, we signal again - * if the PID of the owning process is changed than the last. - * - * XXX This logic assumes that the same PID is not reused - * very quickly. - */ - if (last_signaled_pid != wspid) - { - ereport(LOG, - (errmsg("terminating process %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - last_signaled_pid = wspid; - } - - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); - } - ConditionVariableCancelSleep(); - - /* - * Do nothing here and start from scratch if the slot has - * already been dropped. - */ - if (wspid == -1) + /* if the lock was released, we need to restart from scratch */ goto restart; - - ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); - - SpinLockAcquire(&s->mutex); - s->data.invalidated_at = s->data.restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); - - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); - - /* if we did anything, start from scratch */ - goto restart; + } } LWLockRelease(ReplicationSlotControlLock); } -- 2.31.0.121.g9198c13e34
>From 9ad7751dadce53d1d442b2fe949a96652634a4ac Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Wed, 7 Apr 2021 19:03:05 -0700 Subject: [PATCH 2/2] WIP: Remove SlotAcquireBehavior again. --- src/include/replication/slot.h | 10 +--- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 49 +++++-------------- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 4 +- 5 files changed, 16 insertions(+), 51 deletions(-) diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50df..0443e02d2be 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency RS_TEMPORARY } ReplicationSlotPersistency; -/* For ReplicationSlotAcquire, q.v. */ -typedef enum SlotAcquireBehavior -{ - SAB_Error, - SAB_Block, - SAB_Inquire -} SlotAcquireBehavior; - /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 01d354829b9..1f38c5b33ea 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5864b9b0139..e0ad6830bb7 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ -static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, - const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -373,34 +371,16 @@ SearchNamedReplicationSlot(const char *name) /* * Find a previously created slot and mark it as used by this process. * - * The return value is only useful if behavior is SAB_Inquire, in which - * it's zero if we successfully acquired the slot, -1 if the slot no longer - * exists, or the PID of the owning process otherwise. If behavior is - * SAB_Error, then trying to acquire an owned slot is an error. - * If SAB_Block, we sleep until the slot is released by the owning process. + * An error is raised if nowait is true and the slot is currently in use. If + * nowait is false, we sleep until the slot is released by the owning process. */ -int -ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) -{ - return ReplicationSlotAcquireInternal(NULL, name, behavior); -} - -/* - * Mark the specified slot as used by this process. - * - * Only one of slot and name can be specified. - * If slot == NULL, search for the slot with the given name. - * - * See comments about the return value in ReplicationSlotAcquire(). - */ -static int -ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, - SlotAcquireBehavior behavior) +void +ReplicationSlotAcquire(const char *name, bool nowait) { ReplicationSlot *s; int active_pid; - AssertArg((slot == NULL) ^ (name == NULL)); + AssertArg(name != NULL); retry: Assert(MyReplicationSlot == NULL); @@ -411,17 +391,15 @@ retry: * Search for the slot with the specified name if the slot to acquire is * not given. If the slot is not found, we either return -1 or error out. */ - s = slot ? slot : SearchNamedReplicationSlot(name); + s = SearchNamedReplicationSlot(name); if (s == NULL || !s->in_use) { LWLockRelease(ReplicationSlotControlLock); - if (behavior == SAB_Inquire) - return -1; ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", - name ? name : NameStr(slot->data.name)))); + name))); } /* @@ -435,7 +413,7 @@ retry: * (We may end up not sleeping, but we don't want to do this while * holding the spinlock.) */ - if (behavior == SAB_Block) + if (!nowait) ConditionVariablePrepareToSleep(&s->active_cv); SpinLockAcquire(&s->mutex); @@ -455,13 +433,11 @@ retry: */ if (active_pid != MyProcPid) { - if (behavior == SAB_Error) + if (!nowait) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", NameStr(s->data.name), active_pid))); - else if (behavior == SAB_Inquire) - return active_pid; /* Wait here until we get signaled, and then restart */ ConditionVariableSleep(&s->active_cv, @@ -469,7 +445,7 @@ retry: ConditionVariableCancelSleep(); goto retry; } - else if (behavior == SAB_Block) + else if (!nowait) ConditionVariableCancelSleep(); /* no sleep needed after all */ /* Let everybody know we've modified this slot */ @@ -477,9 +453,6 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; - - /* success */ - return 0; } /* @@ -587,7 +560,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9d36879ed7..0fc422be82e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); /* Acquire the slot so we "own" it */ - (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error); + ReplicationSlotAcquire(NameStr(*slotname), true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4bf8a18e01e..6483b57fc0b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error); + ReplicationSlotAcquire(cmd->slotname, true); if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, -- 2.31.0.121.g9198c13e34