Hi, On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote: > Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical > slot conflict handling. > > Overall design: > > 1. We want to enable logical decoding on standbys, but replay of WAL > from the primary might remove data that is needed by logical decoding, > causing error(s) on the standby. To prevent those errors, a new replication > conflict scenario needs to be addressed (as much as hot standby does). > > 2. Our chosen strategy for dealing with this type of replication slot > is to invalidate logical slots for which needed data has been removed. > > 3. To do this we need the latestRemovedXid for each change, just as we > do for physical replication conflicts, but we also need to know > whether any particular change was to data that logical replication > might access. That way, during WAL replay, we know when there is a risk of > conflict and, if so, if there is a conflict. > > 4. We can't rely on the standby's relcache entries for this purpose in > any way, because the startup process can't access catalog contents. > > 5. Therefore every WAL record that potentially removes data from the > index or heap must carry a flag indicating whether or not it is one > that might be accessed during logical decoding. > > Why do we need this for logical decoding on standby? > > First, let's forget about logical decoding on standby and recall that > on a primary database, any catalog rows that may be needed by a logical > decoding replication slot are not removed. > > This is done thanks to the catalog_xmin associated with the logical > replication slot. > > But, with logical decoding on standby, in the following cases: > > - hot_standby_feedback is off > - hot_standby_feedback is on but there is no a physical slot between > the primary and the standby. Then, hot_standby_feedback will work, > but only while the connection is alive (for example a node restart > would break it) > > Then, the primary may delete system catalog rows that could be needed > by the logical decoding on the standby (as it does not know about the > catalog_xmin on the standby). > > So, it’s mandatory to identify those rows and invalidate the slots > that may need them if any. Identifying those rows is the purpose of > this commit.
This is a very nice commit message. > Implementation: > > When a WAL replay on standby indicates that a catalog table tuple is > to be deleted by an xid that is greater than a logical slot's > catalog_xmin, then that means the slot's catalog_xmin conflicts with > the xid, and we need to handle the conflict. While subsequent commits > will do the actual conflict handling, this commit adds a new field > isCatalogRel in such WAL records (and a new bit set in the > xl_heap_visible flags field), that is true for catalog tables, so as to > arrange for conflict handling. > > The affected WAL records are the ones that already contain the > snapshotConflictHorizon field, namely: > > - gistxlogDelete > - gistxlogPageReuse > - xl_hash_vacuum_one_page > - xl_heap_prune > - xl_heap_freeze_page > - xl_heap_visible > - xl_btree_reuse_page > - xl_btree_delete > - spgxlogVacuumRedirect > > Due to this new field being added, xl_hash_vacuum_one_page and > gistxlogDelete do now contain the offsets to be deleted as a > FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignement. > It's not needed on the others struct where isCatalogRel has > been added. > > Author: Andres Freund (in an older version), Amit Khandekar, Bertrand > Drouvot I think you're first author on this one by now. I think this commit is ready to go. Unless somebody thinks differently, I think I might push it tomorrow. > Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby. > @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags) > */ > XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); > KeepLogSeg(recptr, &_logSegNo); > - if (InvalidateObsoleteReplicationSlots(_logSegNo)) > + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, > NULL); > + if (invalidated) > { > /* > * Some slots have been invalidated; recalculate the old-segment I don't really understand why you changed InvalidateObsoleteReplicationSlots to return void instead of bool, and then added an output boolean argument via a pointer? > @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record) > /* Update our copy of the parameters in pg_control */ > memcpy(&xlrec, XLogRecGetData(record), > sizeof(xl_parameter_change)); > > + /* > + * Invalidate logical slots if we are in hot standby and the > primary does not > + * have a WAL level sufficient for logical decoding. No need to > search > + * for potentially conflicting logically slots if standby is > running > + * with wal_level lower than logical, because in that case, we > would > + * have either disallowed creation of logical slots or > invalidated existing > + * ones. > + */ > + if (InRecovery && InHotStandby && > + xlrec.wal_level < WAL_LEVEL_LOGICAL && > + wal_level >= WAL_LEVEL_LOGICAL) > + { > + TransactionId ConflictHorizon = InvalidTransactionId; > + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, > NULL, InvalidOid, &ConflictHorizon); > + } > + Are there races around changing wal_level? > @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) > SpinLockAcquire(&s->mutex); > effective_xmin = s->effective_xmin; > effective_catalog_xmin = s->effective_catalog_xmin; > - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && > - > XLogRecPtrIsInvalid(s->data.restart_lsn)); > + invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) && > + > XLogRecPtrIsInvalid(s->data.restart_lsn)) > + || > (!TransactionIdIsValid(s->data.xmin) && > + > !TransactionIdIsValid(s->data.catalog_xmin))); > SpinLockRelease(&s->mutex); > > /* invalidated slots need not apply */ I still would like a wrapper function to determine whether a slot has been invalidated. This This is too complicated to be repeated in other places. > @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void) > } > > /* > - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot > - * and mark it invalid, if necessary and possible. > + * Helper for InvalidateObsoleteReplicationSlots > + * > + * Acquires the given slot and mark it invalid, if necessary and possible. > * > * Returns whether ReplicationSlotControlLock was released in the interim > (and > * in that case we're not holding the lock at return, otherwise we are). > * > - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) > + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched > otherwise.) > * > * This is inherently racy, because we release the LWLock > * for syscalls, so caller must restart if we return true. > */ > static bool > -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > - bool *invalidated) > +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, > XLogRecPtr oldestLSN, > + > bool *invalidated, TransactionId *xid) This is too long a name. I'd probably just leave it at the old name. > @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, > XLogRecPtr oldestLSN, > * Check if the slot needs to be invalidated. If it needs to be > * invalidated, and is not currently acquired, acquire it and > mark it > * as having been invalidated. We do this with the spinlock > held to > - * avoid race conditions -- for example the restart_lsn could > move > - * forward, or the slot could be dropped. > + * avoid race conditions -- for example the restart_lsn (or the > + * xmin(s) could) move forward or the slot could be dropped. > */ > SpinLockAcquire(&s->mutex); > > restart_lsn = s->data.restart_lsn; > + slot_xmin = s->data.xmin; > + slot_catalog_xmin = s->data.catalog_xmin; > + > + /* slot has been invalidated (logical decoding conflict case) */ > + if ((xid && > + ((LogicalReplicationSlotIsInvalid(s)) > + || > Uh, huh? That's very odd formatting. > /* > - * If the slot is already invalid or is fresh enough, we don't > need to > - * do anything. > + * We are not forcing for invalidation because the xid is valid > and > + * this is a non conflicting slot. > */ > - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= > oldestLSN) > + (TransactionIdIsValid(*xid) && !( > + > (TransactionIdIsValid(slot_xmin) && > TransactionIdPrecedesOrEquals(slot_xmin, *xid)) > + > || > + > (TransactionIdIsValid(slot_catalog_xmin) && > TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid)) > + > )) > + )) > + || > + /* slot has been invalidated (obsolete LSN case) */ > + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || > restart_lsn >= oldestLSN))) > { > SpinLockRelease(&s->mutex); > if (released_lock) This needs some cleanup. > @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, > XLogRecPtr oldestLSN, > { > MyReplicationSlot = s; > s->active_pid = MyProcPid; > - s->data.invalidated_at = restart_lsn; > - s->data.restart_lsn = InvalidXLogRecPtr; > - > + if (xid) > + { > + s->data.xmin = InvalidTransactionId; > + s->data.catalog_xmin = InvalidTransactionId; > + } > + else > + { > + s->data.invalidated_at = restart_lsn; > + s->data.restart_lsn = InvalidXLogRecPtr; > + } > /* Let caller know */ > *invalidated = true; > } > @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, > XLogRecPtr oldestLSN, > */ > if (last_signaled_pid != active_pid) > { > - ereport(LOG, > - errmsg("terminating process %d > to release replication slot \"%s\"", > - active_pid, > NameStr(slotname)), > - errdetail("The slot's > restart_lsn %X/%X exceeds the limit by %llu bytes.", > - > LSN_FORMAT_ARGS(restart_lsn), > - (unsigned > long long) (oldestLSN - restart_lsn)), > - errhint("You might need to > increase max_slot_wal_keep_size.")); > + if (xid) > + { > + if (TransactionIdIsValid(*xid)) > + { > + ereport(LOG, > + > errmsg("terminating process %d because replication slot \"%s\" conflicts with > recovery", > + > active_pid, NameStr(slotname)), > + errdetail("The > slot conflicted with xid horizon %u.", > + > *xid)); > + } > + else > + { > + ereport(LOG, > + > errmsg("terminating process %d because replication slot \"%s\" conflicts with > recovery", > + > active_pid, NameStr(slotname)), > + > errdetail("Logical decoding on standby requires wal_level to be at least > logical on the primary server")); > + } > + > + (void) SendProcSignal(active_pid, > PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); > + } > + else > + { > + ereport(LOG, > + errmsg("terminating > process %d to release replication slot \"%s\"", > + active_pid, > NameStr(slotname)), > + errdetail("The slot's > restart_lsn %X/%X exceeds the limit by %llu bytes.", > + > LSN_FORMAT_ARGS(restart_lsn), > + > (unsigned long long) (oldestLSN - restart_lsn)), > + errhint("You might need > to increase max_slot_wal_keep_size.")); > + > + (void) kill(active_pid, SIGTERM); I think it ought be possible to deduplicate this a fair bit. For one, two of the errmsg()s above are identical. But I think this could be consolidated further, e.g. by using the same message style for the three cases, and passing in a separately translated reason for the termination? > + } > > - (void) kill(active_pid, SIGTERM); > last_signaled_pid = active_pid; > } > > @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, > XLogRecPtr oldestLSN, > ReplicationSlotSave(); > ReplicationSlotRelease(); > > - ereport(LOG, > - errmsg("invalidating obsolete > replication slot \"%s\"", > - NameStr(slotname)), > - errdetail("The slot's restart_lsn %X/%X > exceeds the limit by %llu bytes.", > - > LSN_FORMAT_ARGS(restart_lsn), > - (unsigned long long) > (oldestLSN - restart_lsn)), > - errhint("You might need to increase > max_slot_wal_keep_size.")); > + if (xid) > + { > + pgstat_drop_replslot(s); Why is this done here now? > + if (TransactionIdIsValid(*xid)) > + { > + ereport(LOG, > + errmsg("invalidating > slot \"%s\" because it conflicts with recovery", NameStr(slotname)), > + errdetail("The slot > conflicted with xid horizon %u.", *xid)); > + } > + else > + { > + ereport(LOG, > + errmsg("invalidating > slot \"%s\" because it conflicts with recovery", NameStr(slotname)), > + errdetail("Logical > decoding on standby requires wal_level to be at least logical on the primary > server")); > + } > + } > + else > + { > + ereport(LOG, > + errmsg("invalidating obsolete > replication slot \"%s\"", > + NameStr(slotname)), > + errdetail("The slot's > restart_lsn %X/%X exceeds the limit by %llu bytes.", > + > LSN_FORMAT_ARGS(restart_lsn), > + (unsigned > long long) (oldestLSN - restart_lsn)), > + errhint("You might need to > increase max_slot_wal_keep_size.")); > + } > I don't like all these repeated elogs... > @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason) > case PROCSIG_RECOVERY_CONFLICT_LOCK: > case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: > case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: > + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: > + > + /* > + * For conflicts that require a logical slot to > be > + * invalidated, the requirement is for the > signal receiver to > + * release the slot, so that it could be > invalidated by the > + * signal sender. So for normal backends, the > transaction > + * should be aborted, just like for other > recovery conflicts. > + * But if it's walsender on standby, we don't > want to go > + * through the following > IsTransactionOrTransactionBlock() > + * check, so break here. > + */ > + if (am_cascading_walsender && > + reason == > PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && > + MyReplicationSlot && > SlotIsLogical(MyReplicationSlot)) > + { > + RecoveryConflictPending = true; > + QueryCancelPending = true; > + InterruptPending = true; > + break; > + } > > /* > * If we aren't in a transaction any longer > then ignore. I can't see any reason for this to be mixed into the same case "body" as LOCK etc? > diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c > index 38c6f18886..290d4b45f4 100644 > --- a/src/backend/replication/slot.c > +++ b/src/backend/replication/slot.c > @@ -51,6 +51,7 @@ > #include "storage/proc.h" > #include "storage/procarray.h" > #include "utils/builtins.h" > +#include "access/xlogrecovery.h" Add new includes in the "alphabetically" right place... Greetings, Andres Freund