Hi,

On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
>  slot conflict handling.
> 
> Overall design:
> 
> 1. We want to enable logical decoding on standbys, but replay of WAL
> from the primary might remove data that is needed by logical decoding,
> causing error(s) on the standby. To prevent those errors, a new replication
> conflict scenario needs to be addressed (as much as hot standby does).
> 
> 2. Our chosen strategy for dealing with this type of replication slot
> is to invalidate logical slots for which needed data has been removed.
> 
> 3. To do this we need the latestRemovedXid for each change, just as we
> do for physical replication conflicts, but we also need to know
> whether any particular change was to data that logical replication
> might access. That way, during WAL replay, we know when there is a risk of
> conflict and, if so, if there is a conflict.
> 
> 4. We can't rely on the standby's relcache entries for this purpose in
> any way, because the startup process can't access catalog contents.
> 
> 5. Therefore every WAL record that potentially removes data from the
> index or heap must carry a flag indicating whether or not it is one
> that might be accessed during logical decoding.
> 
> Why do we need this for logical decoding on standby?
> 
> First, let's forget about logical decoding on standby and recall that
> on a primary database, any catalog rows that may be needed by a logical
> decoding replication slot are not removed.
> 
> This is done thanks to the catalog_xmin associated with the logical
> replication slot.
> 
> But, with logical decoding on standby, in the following cases:
> 
> - hot_standby_feedback is off
> - hot_standby_feedback is on but there is no a physical slot between
>   the primary and the standby. Then, hot_standby_feedback will work,
>   but only while the connection is alive (for example a node restart
>   would break it)
> 
> Then, the primary may delete system catalog rows that could be needed
> by the logical decoding on the standby (as it does not know about the
> catalog_xmin on the standby).
> 
> So, it’s mandatory to identify those rows and invalidate the slots
> that may need them if any. Identifying those rows is the purpose of
> this commit.

This is a very nice commit message.


> Implementation:
> 
> When a WAL replay on standby indicates that a catalog table tuple is
> to be deleted by an xid that is greater than a logical slot's
> catalog_xmin, then that means the slot's catalog_xmin conflicts with
> the xid, and we need to handle the conflict. While subsequent commits
> will do the actual conflict handling, this commit adds a new field
> isCatalogRel in such WAL records (and a new bit set in the
> xl_heap_visible flags field), that is true for catalog tables, so as to
> arrange for conflict handling.
> 
> The affected WAL records are the ones that already contain the
> snapshotConflictHorizon field, namely:
> 
> - gistxlogDelete
> - gistxlogPageReuse
> - xl_hash_vacuum_one_page
> - xl_heap_prune
> - xl_heap_freeze_page
> - xl_heap_visible
> - xl_btree_reuse_page
> - xl_btree_delete
> - spgxlogVacuumRedirect
> 
> Due to this new field being added, xl_hash_vacuum_one_page and
> gistxlogDelete do now contain the offsets to be deleted as a
> FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignement.
> It's not needed on the others struct where isCatalogRel has
> been added.
> 
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
> Drouvot

I think you're first author on this one by now.


I think this commit is ready to go. Unless somebody thinks differently, I
think I might push it tomorrow.


> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.


> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
>        */
>       XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
>       KeepLogSeg(recptr, &_logSegNo);
> -     if (InvalidateObsoleteReplicationSlots(_logSegNo))
> +     InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, 
> NULL);
> +     if (invalidated)
>       {
>               /*
>                * Some slots have been invalidated; recalculate the old-segment

I don't really understand why you changed InvalidateObsoleteReplicationSlots
to return void instead of bool, and then added an output boolean argument via
a pointer?



> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
>               /* Update our copy of the parameters in pg_control */
>               memcpy(&xlrec, XLogRecGetData(record), 
> sizeof(xl_parameter_change));
>  
> +             /*
> +              * Invalidate logical slots if we are in hot standby and the 
> primary does not
> +              * have a WAL level sufficient for logical decoding. No need to 
> search
> +              * for potentially conflicting logically slots if standby is 
> running
> +              * with wal_level lower than logical, because in that case, we 
> would
> +              * have either disallowed creation of logical slots or 
> invalidated existing
> +              * ones.
> +              */
> +             if (InRecovery && InHotStandby &&
> +                     xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> +                     wal_level >= WAL_LEVEL_LOGICAL)
> +             {
> +                     TransactionId ConflictHorizon = InvalidTransactionId;
> +                     InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, 
> NULL, InvalidOid, &ConflictHorizon);
> +             }
> +

Are there races around changing wal_level?



> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>               SpinLockAcquire(&s->mutex);
>               effective_xmin = s->effective_xmin;
>               effective_catalog_xmin = s->effective_catalog_xmin;
> -             invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> -                                        
> XLogRecPtrIsInvalid(s->data.restart_lsn));
> +             invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> +                                             
> XLogRecPtrIsInvalid(s->data.restart_lsn))
> +                                        || 
> (!TransactionIdIsValid(s->data.xmin) &&
> +                                                
> !TransactionIdIsValid(s->data.catalog_xmin)));
>               SpinLockRelease(&s->mutex);
>  
>               /* invalidated slots need not apply */

I still would like a wrapper function to determine whether a slot has been
invalidated. This This is too complicated to be repeated in other places.


> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
>  }
>  
>  /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
>   *
>   * Returns whether ReplicationSlotControlLock was released in the interim 
> (and
>   * in that case we're not holding the lock at return, otherwise we are).
>   *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched 
> otherwise.)
>   *
>   * This is inherently racy, because we release the LWLock
>   * for syscalls, so caller must restart if we return true.
>   */
>  static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> -                                                        bool *invalidated)
> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
> +                                                                             
>                    bool *invalidated, TransactionId *xid)

This is too long a name. I'd probably just leave it at the old name.



> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>                * Check if the slot needs to be invalidated. If it needs to be
>                * invalidated, and is not currently acquired, acquire it and 
> mark it
>                * as having been invalidated.  We do this with the spinlock 
> held to
> -              * avoid race conditions -- for example the restart_lsn could 
> move
> -              * forward, or the slot could be dropped.
> +              * avoid race conditions -- for example the restart_lsn (or the
> +              * xmin(s) could) move forward or the slot could be dropped.
>                */
>               SpinLockAcquire(&s->mutex);
>  
>               restart_lsn = s->data.restart_lsn;
> +             slot_xmin = s->data.xmin;
> +             slot_catalog_xmin = s->data.catalog_xmin;
> +
> +             /* slot has been invalidated (logical decoding conflict case) */
> +             if ((xid &&
> +                      ((LogicalReplicationSlotIsInvalid(s))
> +                       ||
>  

Uh, huh?

That's very odd formatting.

>               /*
> -              * If the slot is already invalid or is fresh enough, we don't 
> need to
> -              * do anything.
> +              * We are not forcing for invalidation because the xid is valid 
> and
> +              * this is a non conflicting slot.
>                */
> -             if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= 
> oldestLSN)
> +                       (TransactionIdIsValid(*xid) && !(
> +                                                                             
>            (TransactionIdIsValid(slot_xmin) && 
> TransactionIdPrecedesOrEquals(slot_xmin, *xid))
> +                                                                             
>            ||
> +                                                                             
>            (TransactionIdIsValid(slot_catalog_xmin) && 
> TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid))
> +                                                                             
>            ))
> +                       ))
> +                     ||
> +             /* slot has been invalidated (obsolete LSN case) */
> +                     (!xid && (XLogRecPtrIsInvalid(restart_lsn) || 
> restart_lsn >= oldestLSN)))
>               {
>                       SpinLockRelease(&s->mutex);
>                       if (released_lock)


This needs some cleanup.


> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>               {
>                       MyReplicationSlot = s;
>                       s->active_pid = MyProcPid;
> -                     s->data.invalidated_at = restart_lsn;
> -                     s->data.restart_lsn = InvalidXLogRecPtr;
> -
> +                     if (xid)
> +                     {
> +                             s->data.xmin = InvalidTransactionId;
> +                             s->data.catalog_xmin = InvalidTransactionId;
> +                     }
> +                     else
> +                     {
> +                             s->data.invalidated_at = restart_lsn;
> +                             s->data.restart_lsn = InvalidXLogRecPtr;
> +                     }
>                       /* Let caller know */
>                       *invalidated = true;
>               }
> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>                        */
>                       if (last_signaled_pid != active_pid)
>                       {
> -                             ereport(LOG,
> -                                             errmsg("terminating process %d 
> to release replication slot \"%s\"",
> -                                                        active_pid, 
> NameStr(slotname)),
> -                                             errdetail("The slot's 
> restart_lsn %X/%X exceeds the limit by %llu bytes.",
> -                                                               
> LSN_FORMAT_ARGS(restart_lsn),
> -                                                               (unsigned 
> long long) (oldestLSN - restart_lsn)),
> -                                             errhint("You might need to 
> increase max_slot_wal_keep_size."));
> +                             if (xid)
> +                             {
> +                                     if (TransactionIdIsValid(*xid))
> +                                     {
> +                                             ereport(LOG,
> +                                                             
> errmsg("terminating process %d because replication slot \"%s\" conflicts with 
> recovery",
> +                                                                        
> active_pid, NameStr(slotname)),
> +                                                             errdetail("The 
> slot conflicted with xid horizon %u.",
> +                                                                             
>   *xid));
> +                                     }
> +                                     else
> +                                     {
> +                                             ereport(LOG,
> +                                                             
> errmsg("terminating process %d because replication slot \"%s\" conflicts with 
> recovery",
> +                                                                        
> active_pid, NameStr(slotname)),
> +                                                             
> errdetail("Logical decoding on standby requires wal_level to be at least 
> logical on the primary server"));
> +                                     }
> +
> +                                     (void) SendProcSignal(active_pid, 
> PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
> +                             }
> +                             else
> +                             {
> +                                     ereport(LOG,
> +                                                     errmsg("terminating 
> process %d to release replication slot \"%s\"",
> +                                                                active_pid, 
> NameStr(slotname)),
> +                                                     errdetail("The slot's 
> restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +                                                                       
> LSN_FORMAT_ARGS(restart_lsn),
> +                                                                       
> (unsigned long long) (oldestLSN - restart_lsn)),
> +                                                     errhint("You might need 
> to increase max_slot_wal_keep_size."));
> +
> +                                     (void) kill(active_pid, SIGTERM);

I think it ought be possible to deduplicate this a fair bit. For one, two of
the errmsg()s above are identical.  But I think this could be consolidated
further, e.g. by using the same message style for the three cases, and passing
in a separately translated reason for the termination?


> +                             }
>  
> -                             (void) kill(active_pid, SIGTERM);
>                               last_signaled_pid = active_pid;
>                       }
>  
> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, 
> XLogRecPtr oldestLSN,
>                       ReplicationSlotSave();
>                       ReplicationSlotRelease();
>  
> -                     ereport(LOG,
> -                                     errmsg("invalidating obsolete 
> replication slot \"%s\"",
> -                                                NameStr(slotname)),
> -                                     errdetail("The slot's restart_lsn %X/%X 
> exceeds the limit by %llu bytes.",
> -                                                       
> LSN_FORMAT_ARGS(restart_lsn),
> -                                                       (unsigned long long) 
> (oldestLSN - restart_lsn)),
> -                                     errhint("You might need to increase 
> max_slot_wal_keep_size."));
> +                     if (xid)
> +                     {
> +                             pgstat_drop_replslot(s);

Why is this done here now?


> +                             if (TransactionIdIsValid(*xid))
> +                             {
> +                                     ereport(LOG,
> +                                                     errmsg("invalidating 
> slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> +                                                     errdetail("The slot 
> conflicted with xid horizon %u.", *xid));
> +                             }
> +                             else
> +                             {
> +                                     ereport(LOG,
> +                                                     errmsg("invalidating 
> slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> +                                                     errdetail("Logical 
> decoding on standby requires wal_level to be at least logical on the primary 
> server"));
> +                             }
> +                     }
> +                     else
> +                     {
> +                             ereport(LOG,
> +                                             errmsg("invalidating obsolete 
> replication slot \"%s\"",
> +                                                        NameStr(slotname)),
> +                                             errdetail("The slot's 
> restart_lsn %X/%X exceeds the limit by %llu bytes.",
> +                                                               
> LSN_FORMAT_ARGS(restart_lsn),
> +                                                               (unsigned 
> long long) (oldestLSN - restart_lsn)),
> +                                             errhint("You might need to 
> increase max_slot_wal_keep_size."));
> +                     }
>

I don't like all these repeated elogs...



> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>                       case PROCSIG_RECOVERY_CONFLICT_LOCK:
>                       case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>                       case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> +                     case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> +
> +                             /*
> +                              * For conflicts that require a logical slot to 
> be
> +                              * invalidated, the requirement is for the 
> signal receiver to
> +                              * release the slot, so that it could be 
> invalidated by the
> +                              * signal sender. So for normal backends, the 
> transaction
> +                              * should be aborted, just like for other 
> recovery conflicts.
> +                              * But if it's walsender on standby, we don't 
> want to go
> +                              * through the following 
> IsTransactionOrTransactionBlock()
> +                              * check, so break here.
> +                              */
> +                             if (am_cascading_walsender &&
> +                                     reason == 
> PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> +                                     MyReplicationSlot && 
> SlotIsLogical(MyReplicationSlot))
> +                             {
> +                                     RecoveryConflictPending = true;
> +                                     QueryCancelPending = true;
> +                                     InterruptPending = true;
> +                                     break;
> +                             }
>  
>                               /*
>                                * If we aren't in a transaction any longer 
> then ignore.

I can't see any reason for this to be mixed into the same case "body" as LOCK
etc?


> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index 38c6f18886..290d4b45f4 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -51,6 +51,7 @@
>  #include "storage/proc.h"
>  #include "storage/procarray.h"
>  #include "utils/builtins.h"
> +#include "access/xlogrecovery.h"

Add new includes in the "alphabetically" right place...



Greetings,

Andres Freund


Reply via email to