Dear Hackers, I'd like to introduce an improved version of my patch (see the attached file). My original idea was to take into account saved on disk restart_lsn (slot→restart_lsn_flushed) for persistent slots when removing WAL segment files. It helps tackle errors like: ERROR: requested WAL segment 000...0AA has already been removed. Improvements: * flushed_restart_lsn is used only for RS_PERSISTENT slots. * Save physical slot on disk when advancing only once - if restart_lsn_flushed is invalid. It is needed because slots with invalid restart LSN are not used when calculating oldest LSN for WAL truncation. Once restart_lsn becomes valid, it should be saved to disk immediately to update restart_lsn_flushed. Regression tests seems to be ok except: * recovery/t/001_stream_rep.pl (checkpoint is needed) * recovery/t/019_replslot_limit.pl (it seems, slot was invalidated, some adjustments are needed) * pg_basebackup/t/020_pg_receivewal.pl (not sure about it) There are some problems: * More WAL segments may be kept. It may lead to invalidations of slots in some tests (recovery/t/019_replslot_limit.pl). A couple of tests should be adjusted. With best regards, Vitaly Davydov
On Thursday, October 31, 2024 13:32 MSK, "Vitaly Davydov" <v.davy...@postgrespro.ru> wrote: Sorry, attached the missed patch. On Thursday, October 31, 2024 13:18 MSK, "Vitaly Davydov" <v.davy...@postgrespro.ru> wrote: Dear Hackers, I'd like to discuss a problem with replication slots's restart LSN. Physical slots are saved to disk at the beginning of checkpoint. At the end of checkpoint, old WAL segments are recycled or removed from disk, if they are not kept by slot's restart_lsn values. If an existing physical slot is advanced in the middle of checkpoint execution, WAL segments, which are related to saved on disk restart LSN may be removed. It is because the calculation of the replication slot miminal LSN is occured at the end of checkpoint, prior to old WAL segments removal. If to hard stop (pg_stl -m immediate) the postgres instance right after checkpoint and to restart it, the slot's restart_lsn may point to the removed WAL segment. I believe, such behaviour is not good. The doc [0] describes that restart_lsn may be set to the some past value after reload. There is a discussion [1] on pghackers where such behaviour is discussed. The main reason of not flushing physical slots on advancing is a performance reason. I'm ok with such behaviour, except of that the corresponding WAL segments should not be removed. I propose to keep WAL segments by saved on disk (flushed) restart_lsn of slots. Add a new field restart_lsn_flushed into ReplicationSlot structure. Copy restart_lsn to restart_lsn_flushed in SaveSlotToPath. It doesn't change the format of storing the slot contents on disk. I attached a patch. It is not yet complete, but demonstate a way to solve the problem. I reproduced the problem by the following way: * Add some delay in CheckPointBuffers (pg_usleep) to emulate long checkpoint execution. * Execute checkpoint and pg_replication_slot_advance right after starting of the checkpoint from another connection. * Hard restart the server right after checkpoint completion. * After restart slot's restart_lsn may point to removed WAL segment. The proposed patch fixes it. [0] https://www.postgresql.org/docs/current/logicaldecoding-explanation.html [1] https://www.postgresql.org/message-id/flat/059cc53a-8b14-653a-a24d-5f867503b0ee%40postgrespro.ru
From d52e254c558e665bc41389e02e026c1069b29861 Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Thu, 31 Oct 2024 12:29:12 +0300 Subject: [PATCH] Keep WAL segments by slot's flushed restart LSN --- src/backend/replication/slot.c | 27 ++++++++++++++++++++++++++- src/backend/replication/walsender.c | 13 +++++++++++++ src/include/replication/slot.h | 4 ++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 6828100cf1..e6aef1f9a3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->restart_lsn_flushed = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1142,20 +1143,28 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* truncate WAL for persistent slots by flushed restart_lsn */ + if (persistency == RS_PERSISTENT) + restart_lsn = restart_lsn_flushed; + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1193,7 +1202,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1207,14 +1218,20 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); - restart_lsn = s->data.restart_lsn; + persistency = s->data.persistency; + restart_lsn = s->restart_lsn_flushed; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* truncate WAL for persistent slots by flushed restart_lsn */ + if (persistency == RS_PERSISTENT) + restart_lsn = restart_lsn_flushed; + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1432,6 +1449,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->restart_lsn_flushed == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1607,6 +1625,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, */ SpinLockAcquire(&s->mutex); + Assert(s->data.restart_lsn >= s->restart_lsn_flushed); + restart_lsn = s->data.restart_lsn; /* we do nothing if the slot is already invalid */ @@ -1691,7 +1711,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->restart_lsn_flushed = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2189,6 +2212,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2386,6 +2410,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 371eef3ddd..03cdce23f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2329,6 +2329,7 @@ static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; + XLogRecPtr restart_lsn_flushed; ReplicationSlot *slot = MyReplicationSlot; Assert(lsn != InvalidXLogRecPtr); @@ -2336,6 +2337,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (slot->data.restart_lsn != lsn) { changed = true; + restart_lsn_flushed = slot->restart_lsn_flushed; slot->data.restart_lsn = lsn; } SpinLockRelease(&slot->mutex); @@ -2343,6 +2345,17 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (changed) { ReplicationSlotMarkDirty(); + + /* Save the replication slot on disk in case of its flushed restart_lsn + * is invalid. Slots with invalid restart lsn are ignored when + * calculating required LSN. Once we started to keep the WAL by flushed + * restart LSN, we should save to disk an initial valid value. + */ + if (slot->data.persistency == RS_PERSISTENT) { + if (restart_lsn_flushed == InvalidXLogRecPtr && lsn != InvalidXLogRecPtr) + ReplicationSlotSave(); + } + ReplicationSlotsComputeRequiredLSN(); PhysicalWakeupLogicalWalSnd(); } diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 45582cf9d8..ca4c3aab3b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -207,6 +207,10 @@ typedef struct ReplicationSlot /* The time since the slot has become inactive */ TimestampTz inactive_since; + + /* Latest restart LSN that was flushed to disk */ + XLogRecPtr restart_lsn_flushed; + } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) -- 2.34.1