On 2020/06/17 12:10, Kyotaro Horiguchi wrote:
At Tue, 16 Jun 2020 22:40:56 -0400, Alvaro Herrera <alvhe...@2ndquadrant.com>
wrote in
On 2020-Jun-17, Fujii Masao wrote:
On 2020/06/17 3:50, Alvaro Herrera wrote:
So InvalidateObsoleteReplicationSlots() can terminate normal backends.
But do we want to do this? If we want, we should add the note about this
case into the docs? Otherwise the users would be surprised at termination
of backends by max_slot_wal_keep_size. I guess that it's basically rarely
happen, though.
Well, if we could distinguish a walsender from a non-walsender process,
then maybe it would make sense to leave backends alive. But do we want
that? I admit I don't know what would be the reason to have a
non-walsender process with an active slot, so I don't have a good
opinion on what to do in this case.
The non-walsender backend is actually doing replication work. It
rather should be killed?
I have no better opinion about this. So I agree to leave the logic as it is
at least for now, i.e., we terminate the process owning the slot whatever
the type of process is.
+ /*
+ * Signal to terminate the process using the replication slot.
+ *
+ * Try to signal every 100ms until it succeeds.
+ */
+ if (!killed && kill(active_pid, SIGTERM) == 0)
+ killed = true;
+ ConditionVariableTimedSleep(&slot->active_cv, 100,
+
WAIT_EVENT_REPLICATION_SLOT_DROP);
+ } while (ReplicationSlotIsActive(slot, NULL));
Note that here you're signalling only once and then sleeping many times
in increments of 100ms -- you're not signalling every 100ms as the
comment claims -- unless the signal fails, but you don't really expect
that. On the contrary, I'd claim that the logic is reversed: if the
signal fails, *then* you should stop signalling.
You mean; in this code path, signaling fails only when the target process
disappears just before signaling. So if it fails, slot->active_pid is
expected to become 0 even without signaling more. Right?
I guess kill() can also fail if the PID now belongs to a process owned
by a different user.
Yes. This case means that the PostgreSQL process using the slot disappeared
and the same PID was assigned to non-PostgreSQL process. So if kill() fails
for this reason, we don't need to kill() again.
I think we've disregarded very quick reuse of
PIDs, so we needn't concern ourselves with it.
The first time call to ConditionVariableTimedSleep doen't actually
sleep, so the loop works as expected. But we may make an extra call
to kill(2). Calling ConditionVariablePrepareToSleep beforehand of the
loop would make it better.
Sorry I failed to understand your point...
Anyway, the attached is the updated version of the patch. This fixes
all the issues in InvalidateObsoleteReplicationSlots() that I reported
upthread.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..a065d41d76 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number
of replication
* slots */
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -322,77 +325,104 @@ ReplicationSlotCreate(const char *name, bool db_specific,
}
/*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+ int i;
+ ReplicationSlot *slot = NULL;
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ slot = s;
+ break;
+ }
+ }
+
+ return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
*
* The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise. If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error. If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise. If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
*/
int
ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
{
- ReplicationSlot *slot;
+ return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * If *slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+ SlotAcquireBehavior
behavior)
+{
+ ReplicationSlot *s;
int active_pid;
- int i;
retry:
Assert(MyReplicationSlot == NULL);
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
/*
- * Search for the named slot and mark it active if we find it. If the
- * slot is already active, we exit the loop with active_pid set to the
PID
- * of the backend that owns it.
+ * Search for the slot with the specified name if the slot to
+ * acquire is not given. If the slot is not found, we either
+ * return -1 or error out.
*/
- active_pid = 0;
- slot = NULL;
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+ if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
{
- ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-
- if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ if (behavior == SAB_Inquire)
{
- /*
- * This is the slot we want; check if it's active under
some other
- * process. In single user mode, we don't need this
check.
- */
- if (IsUnderPostmaster)
- {
- /*
- * Get ready to sleep on it in case it is
active. (We may end
- * up not sleeping, but we don't want to do
this while holding
- * the spinlock.)
- */
- ConditionVariablePrepareToSleep(&s->active_cv);
-
- SpinLockAcquire(&s->mutex);
-
- active_pid = s->active_pid;
- if (active_pid == 0)
- active_pid = s->active_pid = MyProcPid;
-
- SpinLockRelease(&s->mutex);
- }
- else
- active_pid = MyProcPid;
- slot = s;
-
- break;
+ LWLockRelease(ReplicationSlotControlLock);
+ return -1;
}
- }
- LWLockRelease(ReplicationSlotControlLock);
-
- /* If we did not find the slot, error out. */
- if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not
exist", name)));
+ }
+
+ /*
+ * This is the slot we want; check if it's active under some other
+ * process. In single user mode, we don't need this check.
+ */
+ if (IsUnderPostmaster)
+ {
+ SpinLockAcquire(&s->mutex);
+ if (s->active_pid == 0)
+ s->active_pid = MyProcPid;
+ active_pid = s->active_pid;
+ SpinLockRelease(&s->mutex);
+ }
+ else
+ active_pid = MyProcPid;
+ LWLockRelease(ReplicationSlotControlLock);
/*
* If we found the slot but it's already active in another backend, we
- * either error out or retry after a short wait, as caller specified.
+ * either error out, return the PID of the owning process, or retry
+ * after a short wait, as caller specified.
*/
if (active_pid != MyProcPid)
{
@@ -405,19 +435,17 @@ retry:
return active_pid;
/* Wait here until we get signaled, and then restart */
- ConditionVariableSleep(&slot->active_cv,
+ ConditionVariableSleep(&s->active_cv,
WAIT_EVENT_REPLICATION_SLOT_DROP);
ConditionVariableCancelSleep();
goto retry;
}
- else
- ConditionVariableCancelSleep(); /* no sleep needed after all */
/* Let everybody know we've modified this slot */
- ConditionVariableBroadcast(&slot->active_cv);
+ ConditionVariableBroadcast(&s->active_cv);
/* We made this slot active, so it's ours now. */
- MyReplicationSlot = slot;
+ MyReplicationSlot = s;
/* success */
return 0;
@@ -1100,43 +1128,71 @@ restart:
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
NameData slotname;
+ int wspid;
+ int last_signaled_pid = 0;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
- if (s->data.restart_lsn == InvalidXLogRecPtr ||
- s->data.restart_lsn >= oldestLSN)
- {
- SpinLockRelease(&s->mutex);
- continue;
- }
-
slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
-
SpinLockRelease(&s->mutex);
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >=
oldestLSN)
+ continue;
LWLockRelease(ReplicationSlotControlLock);
for (;;)
{
- int wspid =
ReplicationSlotAcquire(NameStr(slotname),
-
SAB_Inquire);
+ wspid = ReplicationSlotAcquireInternal(s,
NameStr(slotname),
+
SAB_Inquire);
- /* no walsender? success! */
- if (wspid == 0)
+ /*
+ * Exit the loop if we successfully acquired the slot or
+ * the slot was dropped during waiting for the owning
process
+ * to be terminated. For example, the latter case is
likely to
+ * happen when the slot is temporary because it's
automatically
+ * dropped by the termination of the owning process.
+ */
+ if (wspid <= 0)
break;
- ereport(LOG,
- (errmsg("terminating walsender %d
because replication slot \"%s\" is too far behind",
- wspid,
NameStr(slotname))));
- (void) kill(wspid, SIGTERM);
+ /*
+ * Signal to terminate the process that owns the slot.
+ *
+ * There is the race condition where other process may
own
+ * the slot after the process using it was terminated
and before
+ * this process owns it. To handle this case, we signal
again
+ * if the PID of the owning process is changed than the
last.
+ *
+ * XXX This logic assumes that the same PID is not
reused
+ * very quickly.
+ */
+ if (last_signaled_pid != wspid)
+ {
+ ereport(LOG,
+ (errmsg("terminating process %d
because replication slot \"%s\" is too far behind",
+ wspid,
NameStr(slotname))));
+ (void) kill(wspid, SIGTERM);
+ last_signaled_pid = wspid;
+ }
ConditionVariableTimedSleep(&s->active_cv, 10,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();
+ /*
+ * Do nothing here and start from scratch if the slot has
+ * already been dropped.
+ */
+ if (wspid == -1)
+ {
+ CHECK_FOR_INTERRUPTS();
+ goto restart;
+ }
+
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its
restart_lsn %X/%X exceeds max_slot_wal_keep_size",
NameStr(slotname),