On 2020/06/18 14:40, Fujii Masao wrote:


On 2020/06/18 11:44, Kyotaro Horiguchi wrote:
At Wed, 17 Jun 2020 20:13:01 +0900, Fujii Masao <masao.fu...@oss.nttdata.com> 
wrote in
ReplicationSlotAcquireInternal.  I think we should call
ConditionVariablePrepareToSleep before the sorrounding for statement
block.

OK, so what about the attached patch? I added
ConditionVariablePrepareToSleep()
just before entering the "for" loop in
InvalidateObsoleteReplicationSlots().

Thanks.

Thanks for the review!


ReplicationSlotAcquireInternal:
+ * If *slot == NULL, search for the slot with the given name.

'*' seems needless here.

Fixed.

Also I added "Only one of slot and name can be specified." into
the comments of ReplicationSlotAcquireInternal().


The patch moves ConditionVariablePrepareToSleep. We need to call the
function before looking into active_pid as originally commented.
Since it is not protected by ReplicationSlotControLock, just before
releasing the lock is not correct.

The attached on top of the v3 fixes that.

Yes, you're right. I merged your 0001.patch into mine.

+        if (behavior != SAB_Inquire)
+            ConditionVariablePrepareToSleep(&s->active_cv);
+    else if (behavior != SAB_Inquire)

Isn't "behavior == SAB_Block" condition better here?
I changed the patch that way.

The attached is the updated version of the patch.
I also merged Alvaro's patch into this.


+   s = (slot == NULL) ? SearchNamedReplicationSlot(name) : slot;
+   if (s == NULL || !s->in_use || strcmp(name, NameStr(s->data.name)) != 0)

The conditions in the second line is needed for the case slot is
given, but it is already done in SearchNamedReplicationSlot if slot is
not given.  I would like something like the following instead, but I
don't insist on it.

Yes, I got rid of strcmp() check, but left is_use check as it is.
I like that because it's simpler.


     ReplicationSlot *s = NULL;
     ...
     if (!slot)
         s = SearchNamedReplicationSlot(name);
     else if(s->in_use && strcmp(name, NameStr(s->data.name)))
         s = slot;


+        ereport(ERROR,
+                (errcode(ERRCODE_UNDEFINED_OBJECT),
+                 errmsg("replication slot \"%s\" does not exist", name)));

The error message is not right when the given slot doesn't match the
given name.

This doesn't happen after applying Alvaro's patch.

BTW, using "name" here is not valid because it may be NULL.
So I added the following code and used "slot_name" in log messages.

+    slot_name = name ? name : NameStr(slot->data.name);

Sorry, this caused compiler failure. So I fixed that and
attached the updated version of the patch.

Regards,

--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..a7bbcf3499 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int                    max_replication_slots = 0;      /* the maximum number 
of replication
                                                                                
 * slots */
 
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+                                                                               
  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -322,77 +325,117 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 }
 
 /*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
+ *
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
+ */
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
+{
+       int                     i;
+       ReplicationSlot *slot = NULL;
+
+       Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+                                                               LW_SHARED));
+
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+               {
+                       slot = s;
+                       break;
+               }
+       }
+
+       return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
  *
  * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise.  If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error.  If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise.  If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
-       ReplicationSlot *slot;
+       return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * Only one of slot and name can be specified.
+ * If slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+                                                          SlotAcquireBehavior 
behavior)
+{
+       ReplicationSlot *s;
        int                     active_pid;
-       int                     i;
+
+       AssertArg((slot == NULL) ^ (name == NULL));
 
 retry:
        Assert(MyReplicationSlot == NULL);
 
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
        /*
-        * Search for the named slot and mark it active if we find it.  If the
-        * slot is already active, we exit the loop with active_pid set to the 
PID
-        * of the backend that owns it.
+        * Search for the slot with the specified name if the slot to acquire is
+        * not given. If the slot is not found, we either return -1 or error 
out.
         */
-       active_pid = 0;
-       slot = NULL;
-       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       s = slot ? slot : SearchNamedReplicationSlot(name);
+       if (s == NULL || !s->in_use)
        {
-               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               LWLockRelease(ReplicationSlotControlLock);
 
-               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
-               {
-                       /*
-                        * This is the slot we want; check if it's active under 
some other
-                        * process.  In single user mode, we don't need this 
check.
-                        */
-                       if (IsUnderPostmaster)
-                       {
-                               /*
-                                * Get ready to sleep on it in case it is 
active.  (We may end
-                                * up not sleeping, but we don't want to do 
this while holding
-                                * the spinlock.)
-                                */
-                               ConditionVariablePrepareToSleep(&s->active_cv);
-
-                               SpinLockAcquire(&s->mutex);
-
-                               active_pid = s->active_pid;
-                               if (active_pid == 0)
-                                       active_pid = s->active_pid = MyProcPid;
-
-                               SpinLockRelease(&s->mutex);
-                       }
-                       else
-                               active_pid = MyProcPid;
-                       slot = s;
-
-                       break;
-               }
-       }
-       LWLockRelease(ReplicationSlotControlLock);
-
-       /* If we did not find the slot, error out. */
-       if (slot == NULL)
+               if (behavior == SAB_Inquire)
+                       return -1;
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
-                                errmsg("replication slot \"%s\" does not 
exist", name)));
+                                errmsg("replication slot \"%s\" does not 
exist",
+                                               name ? name : 
NameStr(slot->data.name))));
+       }
 
        /*
-        * If we found the slot but it's already active in another backend, we
-        * either error out or retry after a short wait, as caller specified.
+        * This is the slot we want; check if it's active under some other
+        * process.  In single user mode, we don't need this check.
+        */
+       if (IsUnderPostmaster)
+       {
+               /*
+                * Get ready to sleep on the slot in case it is active if 
SAB_Block.
+                * (We may end up not sleeping, but we don't want to do this 
while
+                * holding the spinlock.)
+                */
+               if (behavior == SAB_Block)
+                       ConditionVariablePrepareToSleep(&s->active_cv);
+
+               SpinLockAcquire(&s->mutex);
+               if (s->active_pid == 0)
+                       s->active_pid = MyProcPid;
+               active_pid = s->active_pid;
+               SpinLockRelease(&s->mutex);
+       }
+       else
+               active_pid = MyProcPid;
+       LWLockRelease(ReplicationSlotControlLock);
+
+       /*
+        * If we found the slot but it's already active in another process, we
+        * either error out, return the PID of the owning process, or retry
+        * after a short wait, as caller specified.
         */
        if (active_pid != MyProcPid)
        {
@@ -400,24 +443,24 @@ retry:
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_IN_USE),
                                         errmsg("replication slot \"%s\" is 
active for PID %d",
-                                                       name, active_pid)));
+                                                       NameStr(s->data.name), 
active_pid)));
                else if (behavior == SAB_Inquire)
                        return active_pid;
 
                /* Wait here until we get signaled, and then restart */
-               ConditionVariableSleep(&slot->active_cv,
+               ConditionVariableSleep(&s->active_cv,
                                                           
WAIT_EVENT_REPLICATION_SLOT_DROP);
                ConditionVariableCancelSleep();
                goto retry;
        }
-       else
-               ConditionVariableCancelSleep(); /* no sleep needed after all */
+       else if (behavior == SAB_Block)
+               ConditionVariableCancelSleep(); /* no sleep needed after all */
 
        /* Let everybody know we've modified this slot */
-       ConditionVariableBroadcast(&slot->active_cv);
+       ConditionVariableBroadcast(&s->active_cv);
 
        /* We made this slot active, so it's ours now. */
-       MyReplicationSlot = slot;
+       MyReplicationSlot = s;
 
        /* success */
        return 0;
@@ -1100,43 +1143,82 @@ restart:
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                XLogRecPtr      restart_lsn = InvalidXLogRecPtr;
                NameData        slotname;
+               int             wspid;
+               int             last_signaled_pid = 0;
 
                if (!s->in_use)
                        continue;
 
                SpinLockAcquire(&s->mutex);
-               if (s->data.restart_lsn == InvalidXLogRecPtr ||
-                       s->data.restart_lsn >= oldestLSN)
-               {
-                       SpinLockRelease(&s->mutex);
-                       continue;
-               }
-
                slotname = s->data.name;
                restart_lsn = s->data.restart_lsn;
-
                SpinLockRelease(&s->mutex);
+
+               if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
oldestLSN)
+                       continue;
                LWLockRelease(ReplicationSlotControlLock);
 
+               /* Get ready to sleep on the slot in case it is active */
+               ConditionVariablePrepareToSleep(&s->active_cv);
+
                for (;;)
                {
-                       int                     wspid = 
ReplicationSlotAcquire(NameStr(slotname),
-                                                                               
                           SAB_Inquire);
+                       /*
+                        * Try to mark this slot as used by this process.
+                        *
+                        * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+                        * should not cancel the prepared condition variable
+                        * if this slot is active in other process. Because in 
this case
+                        * we have to wait on that CV for the process owning
+                        * the slot to be terminated, later.
+                        */
+                       wspid = ReplicationSlotAcquireInternal(s, NULL, 
SAB_Inquire);
 
-                       /* no walsender? success! */
-                       if (wspid == 0)
+                       /*
+                        * Exit the loop if we successfully acquired the slot or
+                        * the slot was dropped during waiting for the owning 
process
+                        * to be terminated. For example, the latter case is 
likely to
+                        * happen when the slot is temporary because it's 
automatically
+                        * dropped by the termination of the owning process.
+                        */
+                       if (wspid <= 0)
                                break;
 
-                       ereport(LOG,
-                                       (errmsg("terminating walsender %d 
because replication slot \"%s\" is too far behind",
-                                                       wspid, 
NameStr(slotname))));
-                       (void) kill(wspid, SIGTERM);
+                       /*
+                        * Signal to terminate the process that owns the slot.
+                        *
+                        * There is the race condition where other process may 
own
+                        * the slot after the process using it was terminated 
and before
+                        * this process owns it. To handle this case, we signal 
again
+                        * if the PID of the owning process is changed than the 
last.
+                        *
+                        * XXX This logic assumes that the same PID is not 
reused
+                        * very quickly.
+                        */
+                       if (last_signaled_pid != wspid)
+                       {
+                               ereport(LOG,
+                                               (errmsg("terminating process %d 
because replication slot \"%s\" is too far behind",
+                                                               wspid, 
NameStr(slotname))));
+                               (void) kill(wspid, SIGTERM);
+                               last_signaled_pid = wspid;
+                       }
 
                        ConditionVariableTimedSleep(&s->active_cv, 10,
                                                                                
WAIT_EVENT_REPLICATION_SLOT_DROP);
                }
                ConditionVariableCancelSleep();
 
+               /*
+                * Do nothing here and start from scratch if the slot has
+                * already been dropped.
+                */
+               if (wspid == -1)
+               {
+                       CHECK_FOR_INTERRUPTS();
+                       goto restart;
+               }
+
                ereport(LOG,
                                (errmsg("invalidating slot \"%s\" because its 
restart_lsn %X/%X exceeds max_slot_wal_keep_size",
                                                NameStr(slotname),

Reply via email to