On 2020/06/18 11:44, Kyotaro Horiguchi wrote:
At Wed, 17 Jun 2020 20:13:01 +0900, Fujii Masao <masao.fu...@oss.nttdata.com>
wrote in
ReplicationSlotAcquireInternal. I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.
OK, so what about the attached patch? I added
ConditionVariablePrepareToSleep()
just before entering the "for" loop in
InvalidateObsoleteReplicationSlots().
Thanks.
Thanks for the review!
ReplicationSlotAcquireInternal:
+ * If *slot == NULL, search for the slot with the given name.
'*' seems needless here.
Fixed.
Also I added "Only one of slot and name can be specified." into
the comments of ReplicationSlotAcquireInternal().
The patch moves ConditionVariablePrepareToSleep. We need to call the
function before looking into active_pid as originally commented.
Since it is not protected by ReplicationSlotControLock, just before
releasing the lock is not correct.
The attached on top of the v3 fixes that.
Yes, you're right. I merged your 0001.patch into mine.
+ if (behavior != SAB_Inquire)
+ ConditionVariablePrepareToSleep(&s->active_cv);
+ else if (behavior != SAB_Inquire)
Isn't "behavior == SAB_Block" condition better here?
I changed the patch that way.
The attached is the updated version of the patch.
I also merged Alvaro's patch into this.
+ s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+ if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)
The conditions in the second line is needed for the case slot is
given, but it is already done in SearchNamedReplicationSlot if slot is
not given. I would like something like the following instead, but I
don't insist on it.
Yes, I got rid of strcmp() check, but left is_use check as it is.
I like that because it's simpler.
ReplicationSlot *s = NULL;
...
if (!slot)
s = SearchNamedReplicationSlot(name);
else if(s->in_use && strcmp(name, NameStr(s->data.name)))
s = slot;
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist", name)));
The error message is not right when the given slot doesn't match the
given name.
This doesn't happen after applying Alvaro's patch.
BTW, using "name" here is not valid because it may be NULL.
So I added the following code and used "slot_name" in log messages.
+ slot_name = name ? name : NameStr(slot->data.name);
Regards,
--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..77cf366ef1 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number
of replication
* slots */
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -322,77 +325,123 @@ ReplicationSlotCreate(const char *name, bool db_specific,
}
/*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+ int i;
+ ReplicationSlot *slot = NULL;
+
+ Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+ LW_SHARED));
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ slot = s;
+ break;
+ }
+ }
+
+ return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
*
* The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise. If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error. If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise. If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
*/
int
ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
{
- ReplicationSlot *slot;
+ return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * Only one of slot and name can be specified.
+ * If slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+ SlotAcquireBehavior
behavior)
+{
+ ReplicationSlot *s;
int active_pid;
- int i;
+ char *slot_name;
+
+ AssertArg((slot == NULL) ^ (name == NULL));
+
+ /*
+ * Determine the name of slot to acquire. This name is used in
+ * log messages.
+ */
+ slot_name = name ? name : NameStr(slot->data.name);
retry:
Assert(MyReplicationSlot == NULL);
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
/*
- * Search for the named slot and mark it active if we find it. If the
- * slot is already active, we exit the loop with active_pid set to the
PID
- * of the backend that owns it.
+ * Search for the slot with the specified name if the slot to acquire is
+ * not given. If the slot is not found, we either return -1 or error
out.
*/
- active_pid = 0;
- slot = NULL;
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ s = slot ? slot : SearchNamedReplicationSlot(name);
+ if (s == NULL || !s->in_use)
{
- ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ LWLockRelease(ReplicationSlotControlLock);
- if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
- {
- /*
- * This is the slot we want; check if it's active under
some other
- * process. In single user mode, we don't need this
check.
- */
- if (IsUnderPostmaster)
- {
- /*
- * Get ready to sleep on it in case it is
active. (We may end
- * up not sleeping, but we don't want to do
this while holding
- * the spinlock.)
- */
- ConditionVariablePrepareToSleep(&s->active_cv);
-
- SpinLockAcquire(&s->mutex);
-
- active_pid = s->active_pid;
- if (active_pid == 0)
- active_pid = s->active_pid = MyProcPid;
-
- SpinLockRelease(&s->mutex);
- }
- else
- active_pid = MyProcPid;
- slot = s;
-
- break;
- }
- }
- LWLockRelease(ReplicationSlotControlLock);
-
- /* If we did not find the slot, error out. */
- if (slot == NULL)
+ if (behavior == SAB_Inquire)
+ return -1;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("replication slot \"%s\" does not
exist", name)));
+ errmsg("replication slot \"%s\" does not
exist", slot_name)));
+ }
/*
- * If we found the slot but it's already active in another backend, we
- * either error out or retry after a short wait, as caller specified.
+ * This is the slot we want; check if it's active under some other
+ * process. In single user mode, we don't need this check.
+ */
+ if (IsUnderPostmaster)
+ {
+ /*
+ * Get ready to sleep on the slot in case it is active if
SAB_Block.
+ * (We may end up not sleeping, but we don't want to do this
while
+ * holding the spinlock.)
+ */
+ if (behavior == SAB_Block)
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
+ SpinLockAcquire(&s->mutex);
+ if (s->active_pid == 0)
+ s->active_pid = MyProcPid;
+ active_pid = s->active_pid;
+ SpinLockRelease(&s->mutex);
+ }
+ else
+ active_pid = MyProcPid;
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * If we found the slot but it's already active in another process, we
+ * either error out, return the PID of the owning process, or retry
+ * after a short wait, as caller specified.
*/
if (active_pid != MyProcPid)
{
@@ -400,24 +449,24 @@ retry:
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is
active for PID %d",
- name, active_pid)));
+ slot_name,
active_pid)));
else if (behavior == SAB_Inquire)
return active_pid;
/* Wait here until we get signaled, and then restart */
- ConditionVariableSleep(&slot->active_cv,
+ ConditionVariableSleep(&s->active_cv,
WAIT_EVENT_REPLICATION_SLOT_DROP);
ConditionVariableCancelSleep();
goto retry;
}
- else
- ConditionVariableCancelSleep(); /* no sleep needed after all */
+ else if (behavior == SAB_Block)
+ ConditionVariableCancelSleep(); /* no sleep needed after all */
/* Let everybody know we've modified this slot */
- ConditionVariableBroadcast(&slot->active_cv);
+ ConditionVariableBroadcast(&s->active_cv);
/* We made this slot active, so it's ours now. */
- MyReplicationSlot = slot;
+ MyReplicationSlot = s;
/* success */
return 0;
@@ -1100,43 +1149,82 @@ restart:
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
NameData slotname;
+ int wspid;
+ int last_signaled_pid = 0;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
- if (s->data.restart_lsn == InvalidXLogRecPtr ||
- s->data.restart_lsn >= oldestLSN)
- {
- SpinLockRelease(&s->mutex);
- continue;
- }
-
slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
-
SpinLockRelease(&s->mutex);
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >=
oldestLSN)
+ continue;
LWLockRelease(ReplicationSlotControlLock);
+ /* Get ready to sleep on the slot in case it is active */
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
for (;;)
{
- int wspid =
ReplicationSlotAcquire(NameStr(slotname),
-
SAB_Inquire);
+ /*
+ * Try to mark this slot as used by this process.
+ *
+ * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+ * should not cancel the prepared condition variable
+ * if this slot is active in other process. Because in
this case
+ * we have to wait on that CV for the process owning
+ * the slot to be terminated, later.
+ */
+ wspid = ReplicationSlotAcquireInternal(s, NULL,
SAB_Inquire);
- /* no walsender? success! */
- if (wspid == 0)
+ /*
+ * Exit the loop if we successfully acquired the slot or
+ * the slot was dropped during waiting for the owning
process
+ * to be terminated. For example, the latter case is
likely to
+ * happen when the slot is temporary because it's
automatically
+ * dropped by the termination of the owning process.
+ */
+ if (wspid <= 0)
break;
- ereport(LOG,
- (errmsg("terminating walsender %d
because replication slot \"%s\" is too far behind",
- wspid,
NameStr(slotname))));
- (void) kill(wspid, SIGTERM);
+ /*
+ * Signal to terminate the process that owns the slot.
+ *
+ * There is the race condition where other process may
own
+ * the slot after the process using it was terminated
and before
+ * this process owns it. To handle this case, we signal
again
+ * if the PID of the owning process is changed than the
last.
+ *
+ * XXX This logic assumes that the same PID is not
reused
+ * very quickly.
+ */
+ if (last_signaled_pid != wspid)
+ {
+ ereport(LOG,
+ (errmsg("terminating process %d
because replication slot \"%s\" is too far behind",
+ wspid,
NameStr(slotname))));
+ (void) kill(wspid, SIGTERM);
+ last_signaled_pid = wspid;
+ }
ConditionVariableTimedSleep(&s->active_cv, 10,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();
+ /*
+ * Do nothing here and start from scratch if the slot has
+ * already been dropped.
+ */
+ if (wspid == -1)
+ {
+ CHECK_FOR_INTERRUPTS();
+ goto restart;
+ }
+
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its
restart_lsn %X/%X exceeds max_slot_wal_keep_size",
NameStr(slotname),