On 2020/06/15 13:42, Kyotaro Horiguchi wrote:
At Sat, 13 Jun 2020 01:38:49 +0900, Fujii Masao <masao.fu...@oss.nttdata.com> 
wrote in
Hi,

The document explains that "lost" value that
pg_replication_slots.wal_status reports means

     some WAL files are definitely lost and this slot cannot be used to
     resume replication anymore.

However, I observed "lost" value while inserting lots of records,
but replication could continue normally. So I wonder if
pg_replication_slots.wal_status may have a bug.

wal_status is calculated in GetWALAvailability(), and probably I found
some issues in it.


        keepSegs = ConvertToXSegs(Max(max_wal_size_mb, wal_keep_segments),
                                                          wal_segment_size) +
                                                          1;

max_wal_size_mb is the number of megabytes. wal_keep_segments is
the number of WAL segment files. So it's strange to calculate max of
them.

Oops!  I don't want to believe I did that but it's definitely wrong.

The above should be the following?

     Max(ConvertToXSegs(max_wal_size_mb, wal_segment_size),
     wal_keep_segments) + 1

Looks reasonable.

                if ((max_slot_wal_keep_size_mb <= 0 ||
                         max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
                        oldestSegMaxWalSize <= targetSeg)
                        return WALAVAIL_NORMAL;

This code means that wal_status reports "normal" only when
max_slot_wal_keep_size is negative or larger than max_wal_size.
Why is this condition necessary? The document explains "normal
  means that the claimed files are within max_wal_size". So whatever
  max_slot_wal_keep_size value is, IMO that "normal" should be
  reported if the WAL files claimed by the slot are within max_wal_size.
  Thought?

It was a kind of hard to decide. Even when max_slot_wal_keep_size is
smaller than max_wal_size, the segments more than
max_slot_wal_keep_size are not guaranteed to be kept.  In that case
the state transits as NORMAL->LOST skipping the "RESERVED" state.
Putting aside whether the setting is useful or not, I thought that the
state transition is somewhat abrupt.

Or, if that condition is really necessary, the document should be
updated so that the note about the condition is added.

Does the following make sense?

https://www.postgresql.org/docs/13/view-pg-replication-slots.html

normal means that the claimed files are within max_wal_size.
+ If max_slot_wal_keep_size is smaller than max_wal_size, this state
+ will not appear.

If the WAL files claimed by the slot exceeds max_slot_wal_keep_size
but any those WAL files have not been removed yet, wal_status seems
to report "lost". Is this expected behavior? Per the meaning of "lost"
described in the document, "lost" should be reported only when
any claimed files are removed, I think. Thought?

Or this behavior is expected and the document is incorrect?

In short, it is known behavior but it was judged as useless to prevent
that.

That can happen when checkpointer removes up to the segment that is
being read by walsender.  I think that that doesn't happen (or
happenswithin a narrow time window?) for physical replication but
happenes for logical replication.

While development, I once added walsender a code to exit for that
reason, but finally it is moved to InvalidateObsoleteReplicationSlots
as a bit defferent function.

BTW, I read the code of InvalidateObsoleteReplicationSlots() and probably
found some issues in it.

1. Each cycle of the "for" loop in InvalidateObsoleteReplicationSlots()
    emits the log message  "terminating walsender ...". This means that
    if it takes more than 10ms for walsender to exit after it's signaled,
    the second and subsequent cycles would happen and output the same
    log message several times. IMO that log message should be output
    only once.

2. InvalidateObsoleteReplicationSlots() uses the loop to scan replication
    slots array and uses the "for" loop in each scan. Also it calls
    ReplicationSlotAcquire() for each "for" loop cycle, and
    ReplicationSlotAcquire() uses another loop to scan replication slots
    array. I don't think this is good design.

    ISTM that we can get rid of ReplicationSlotAcquire()'s loop because
    InvalidateObsoleteReplicationSlots() already know the index of the slot
    that we want to find. The attached patch does that. Thought?

3. There is a corner case where the termination of walsender cleans up
    the temporary replication slot while InvalidateObsoleteReplicationSlots()
    is sleeping on ConditionVariableTimedSleep(). In this case,
    ReplicationSlotAcquire() is called in the subsequent cycle of the "for"
    loop, cannot find the slot and then emits ERROR message. This leads
    to the failure of checkpoint by the checkpointer.

    To avoid this case, if SAB_Inquire is specified, ReplicationSlotAcquire()
    should return the special value instead of emitting ERROR even when
    it cannot find the slot. Also InvalidateObsoleteReplicationSlots() should
    handle that special returned value.

Regards,


--
Fujii Masao
Advanced Computing Technology Center
Research and Development Headquarters
NTT DATA CORPORATION
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..72aa5de60c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int                    max_replication_slots = 0;      /* the maximum number 
of replication
                                                                                
 * slots */
 
+static int ReplicationSlotAcquireInternal(const char *name,
+                                                                               
  SlotAcquireBehavior behavior, int index);
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -332,25 +334,45 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  */
 int
 ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
+{
+       int                     i;
+
+       /*
+        * Search for the named slot and mark it active if we find it.
+        */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
+                       break;
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       return ReplicationSlotAcquireInternal(name, behavior, i);
+}
+
+static int
+ReplicationSlotAcquireInternal(const char *name,
+                                                          SlotAcquireBehavior 
behavior, int index)
 {
        ReplicationSlot *slot;
        int                     active_pid;
-       int                     i;
 
 retry:
        Assert(MyReplicationSlot == NULL);
 
        /*
-        * Search for the named slot and mark it active if we find it.  If the
-        * slot is already active, we exit the loop with active_pid set to the 
PID
+        * If the slot is already active, we set active_pid to the PID
         * of the backend that owns it.
         */
        active_pid = 0;
        slot = NULL;
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       if (index >= 0 && index < max_replication_slots)
        {
-               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+               ReplicationSlot *s = 
&ReplicationSlotCtl->replication_slots[index];
 
                if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                {
@@ -378,8 +400,6 @@ retry:
                        else
                                active_pid = MyProcPid;
                        slot = s;
-
-                       break;
                }
        }
        LWLockRelease(ReplicationSlotControlLock);
@@ -1120,8 +1140,9 @@ restart:
 
                for (;;)
                {
-                       int                     wspid = 
ReplicationSlotAcquire(NameStr(slotname),
-                                                                               
                           SAB_Inquire);
+                       int                     wspid =
+                               
ReplicationSlotAcquireInternal(NameStr(slotname),
+                                                                               
           SAB_Inquire, i);
 
                        /* no walsender? success! */
                        if (wspid == 0)

Reply via email to