On 2020/06/16 14:00, Kyotaro Horiguchi wrote:
At Tue, 16 Jun 2020 01:46:21 +0900, Fujii Masao <masao.fu...@oss.nttdata.com>
wrote in
In short, it is known behavior but it was judged as useless to prevent
that.
That can happen when checkpointer removes up to the segment that is
being read by walsender. I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.
While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function.
BTW, I read the code of InvalidateObsoleteReplicationSlots() and
probably
found some issues in it.
1. Each cycle of the "for" loop in
InvalidateObsoleteReplicationSlots()
emits the log message "terminating walsender ...". This means that
if it takes more than 10ms for walsender to exit after it's signaled,
the second and subsequent cycles would happen and output the same
log message several times. IMO that log message should be output
only once.
Sounds reasonable.
2. InvalidateObsoleteReplicationSlots() uses the loop to scan
replication
slots array and uses the "for" loop in each scan. Also it calls
ReplicationSlotAcquire() for each "for" loop cycle, and
ReplicationSlotAcquire() uses another loop to scan replication slots
array. I don't think this is good design.
ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
InvalidateObsoleteReplicationSlots() already know the index of the
slot
that we want to find. The attached patch does that. Thought?
The inner loop is expected to run at most several times per
checkpoint, which won't be a serious problem. However, it is better if
we can get rid of that in a reasonable way.
The attached patch changes the behavior for SAB_Block. Before the
patch, it rescans from the first slot for the same name, but with the
patch it just rechecks the same slot. The only caller of the function
with SAB_Block is ReplicationSlotDrop and I don't come up with a case
where another slot with the same name is created at different place
before the condition variable fires. But I'm not sure the change is
completely safe.
Yes, that change might not be safe. So I'm thinking another approach to
fix the issues.
Maybe some assertion is needed?
3. There is a corner case where the termination of walsender cleans up
the temporary replication slot while
InvalidateObsoleteReplicationSlots()
is sleeping on ConditionVariableTimedSleep(). In this case,
ReplicationSlotAcquire() is called in the subsequent cycle of the
"for"
loop, cannot find the slot and then emits ERROR message. This leads
to the failure of checkpoint by the checkpointer.
Agreed.
To avoid this case, if SAB_Inquire is specified,
ReplicationSlotAcquire()
should return the special value instead of emitting ERROR even when
it cannot find the slot. Also InvalidateObsoleteReplicationSlots()
should
handle that special returned value.
I thought the same thing hearing that.
While reading InvalidateObsoleteReplicationSlots() code, I found another issue.
ereport(LOG,
(errmsg("terminating walsender %d because replication
slot \"%s\" is too far behind",
wspid,
NameStr(slotname))));
(void) kill(wspid, SIGTERM);
wspid indicates the PID of process using the slot. That process can be
a backend, for example, executing pg_replication_slot_advance().
So "walsender" in the above log message is not always correct.
int wspid =
ReplicationSlotAcquire(NameStr(slotname),
SAB_Inquire);
Why do we need to call ReplicationSlotAcquire() here and mark the slot as
used by the checkpointer? Isn't it enough to check directly the slot's
active_pid, instead?
Maybe ReplicationSlotAcquire() is necessary because
ReplicationSlotRelease() is called later? If so, why do we need to call
ReplicationSlotRelease()? ISTM that we don't need to do that if the slot's
active_pid is zero. No?
If my understanding is right, I'd like to propose the attached patch.
It introduces DeactivateReplicationSlot() and replace the "for" loop
in InvalidateObsoleteReplicationSlots() with it. ReplicationSlotAcquire()
and ReplicationSlotRelease() are no longer called there.
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..b89b6da768 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number
of replication
* slots */
+static bool ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid);
+static void DeactivateReplicationSlot(ReplicationSlot *slot);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -1080,6 +1082,61 @@ ReplicationSlotReserveWal(void)
}
}
+/*
+ * Is the specified replication slot currently actively being used?
+ *
+ * Set *active_pid to the PID of the process using this slot if active.
+ */
+static bool
+ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid)
+{
+ int pid;
+
+ SpinLockAcquire(&slot->mutex);
+ pid = slot->active_pid;
+ SpinLockRelease(&slot->mutex);
+
+ if (active_pid != NULL)
+ *active_pid = pid;
+
+ return (pid != 0);
+}
+
+/*
+ * Deactivate the specified replication slot.
+ *
+ * Terminate the process using this slot if active.
+ */
+static void
+DeactivateReplicationSlot(ReplicationSlot *slot)
+{
+ int active_pid;
+ bool killed = false;
+
+ /* Quick exit if already inactive */
+ if (!ReplicationSlotIsActive(slot, &active_pid))
+ return;
+
+ ereport(LOG,
+ (errmsg("terminating the process %d using replication
slot \"%s\"",
+ active_pid, NameStr(slot->data.name))));
+
+ ConditionVariablePrepareToSleep(&slot->active_cv);
+ do
+ {
+ /*
+ * Signal to terminate the process using the replication slot.
+ *
+ * Try to signal every 100ms until it succeeds.
+ */
+ if (!killed && kill(active_pid, SIGTERM) == 0)
+ killed = true;
+ ConditionVariableTimedSleep(&slot->active_cv, 100,
+
WAIT_EVENT_REPLICATION_SLOT_DROP);
+ } while (ReplicationSlotIsActive(slot, NULL));
+ ConditionVariableCancelSleep();
+}
+
/*
* Mark any slot that points to an LSN older than the given segment
* as invalid; it requires WAL that's about to be removed.
@@ -1105,37 +1162,18 @@ restart:
continue;
SpinLockAcquire(&s->mutex);
- if (s->data.restart_lsn == InvalidXLogRecPtr ||
- s->data.restart_lsn >= oldestLSN)
- {
- SpinLockRelease(&s->mutex);
- continue;
- }
-
slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
-
SpinLockRelease(&s->mutex);
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >=
oldestLSN)
+ continue;
LWLockRelease(ReplicationSlotControlLock);
- for (;;)
- {
- int wspid =
ReplicationSlotAcquire(NameStr(slotname),
-
SAB_Inquire);
-
- /* no walsender? success! */
- if (wspid == 0)
- break;
-
- ereport(LOG,
- (errmsg("terminating walsender %d
because replication slot \"%s\" is too far behind",
- wspid,
NameStr(slotname))));
- (void) kill(wspid, SIGTERM);
-
- ConditionVariableTimedSleep(&s->active_cv, 10,
-
WAIT_EVENT_REPLICATION_SLOT_DROP);
- }
- ConditionVariableCancelSleep();
+ DeactivateReplicationSlot(s);
+ SpinLockAcquire(&s->mutex);
+ s->data.restart_lsn = InvalidXLogRecPtr;
+ SpinLockRelease(&s->mutex);
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its
restart_lsn %X/%X exceeds max_slot_wal_keep_size",
@@ -1143,11 +1181,6 @@ restart:
(uint32) (restart_lsn >> 32),
(uint32) restart_lsn)));
- SpinLockAcquire(&s->mutex);
- s->data.restart_lsn = InvalidXLogRecPtr;
- SpinLockRelease(&s->mutex);
- ReplicationSlotRelease();
-
/* if we did anything, start from scratch */
CHECK_FOR_INTERRUPTS();
goto restart;