Dear Hackers, Let me please introduce a new version of the patch.
Patch description: The slot data is flushed to the disk at the beginning of checkpoint. If an existing slot is advanced in the middle of checkpoint execution, its advanced restart LSN is taken to calculate the oldest LSN for WAL segments removal at the end of checkpoint. If the node is restarted just after the checkpoint, the slots data will be read from the disk at recovery with the oldest restart LSN which can refer to removed WAL segments. The patch introduces a new in-memory state for slots - flushed_restart_lsn which is used to calculate the oldest LSN for WAL segments removal. This state is updated every time with the current restart_lsn at the moment, when the slot is saving to disk. With best regards, Vitaly
From 480ab108499d95c8befd95911524c4d77cec6e2e Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Mon, 3 Mar 2025 17:02:15 +0300 Subject: [PATCH 1/2] Keep WAL segments by slot's flushed restart LSN The slot data is flushed to the disk at the beginning of checkpoint. If an existing slot is advanced in the middle of checkpoint execution, its advanced restart LSN is taken to calculate the oldest LSN for WAL segments removal at the end of checkpoint. If the node is restarted just after the checkpoint, the slots data will be read from the disk at recovery with the oldest restart LSN which can refer to removed WAL segments. The patch introduces a new in-memory state for slots - flushed_restart_lsn which is used to calculate the oldest LSN for WAL segments removal. This state is updated every time with the current restart_lsn at the moment, when the slot is saving to disk. --- src/backend/replication/slot.c | 41 ++++++++++++++++++++++++++++++++++ src/include/replication/slot.h | 7 ++++++ 2 files changed, 48 insertions(+) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 719e531eb90..294418df217 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->restart_lsn_flushed = InvalidXLogRecPtr; slot->inactive_since = 0; /* @@ -1165,20 +1166,36 @@ ReplicationSlotsComputeRequiredLSN(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* Get the flushed restart_lsn for the persistent slot to compute + * the oldest LSN for WAL segments removals. + */ + if (persistency == RS_PERSISTENT) + { + if (restart_lsn_flushed != InvalidXLogRecPtr && + restart_lsn > restart_lsn_flushed) + { + restart_lsn = restart_lsn_flushed; + } + } + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -1216,7 +1233,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void) { ReplicationSlot *s; XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn_flushed; bool invalidated; + ReplicationSlotPersistency persistency; s = &ReplicationSlotCtl->replication_slots[i]; @@ -1230,14 +1249,28 @@ ReplicationSlotsComputeLogicalRestartLSN(void) /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); + persistency = s->data.persistency; restart_lsn = s->data.restart_lsn; invalidated = s->data.invalidated != RS_INVAL_NONE; + restart_lsn_flushed = s->restart_lsn_flushed; SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ if (invalidated) continue; + /* Get the flushed restart_lsn for the persistent slot to compute + * the oldest LSN for WAL segments removals. + */ + if (persistency == RS_PERSISTENT) + { + if (restart_lsn_flushed != InvalidXLogRecPtr && + restart_lsn > restart_lsn_flushed) + { + restart_lsn = restart_lsn_flushed; + } + } + if (restart_lsn == InvalidXLogRecPtr) continue; @@ -1455,6 +1488,7 @@ ReplicationSlotReserveWal(void) Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + Assert(slot->restart_lsn_flushed == InvalidXLogRecPtr); /* * The replication slot mechanism is used to prevent removal of required @@ -1766,6 +1800,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, */ SpinLockAcquire(&s->mutex); + Assert(s->data.restart_lsn >= s->restart_lsn_flushed); + restart_lsn = s->data.restart_lsn; /* we do nothing if the slot is already invalid */ @@ -1835,7 +1871,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * just rely on .invalidated. */ if (invalidation_cause == RS_INVAL_WAL_REMOVED) + { s->data.restart_lsn = InvalidXLogRecPtr; + s->restart_lsn_flushed = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; @@ -2354,6 +2393,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) if (!slot->just_dirtied) slot->dirty = false; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; SpinLockRelease(&slot->mutex); LWLockRelease(&slot->io_in_progress_lock); @@ -2569,6 +2609,7 @@ RestoreSlotFromDisk(const char *name) slot->effective_xmin = cp.slotdata.xmin; slot->effective_catalog_xmin = cp.slotdata.catalog_xmin; slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush; + slot->restart_lsn_flushed = cp.slotdata.restart_lsn; slot->candidate_catalog_xmin = InvalidTransactionId; slot->candidate_xmin_lsn = InvalidXLogRecPtr; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index f5a24ccfbf2..b04d2401d6e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -215,6 +215,13 @@ typedef struct ReplicationSlot * recently stopped. */ TimestampTz inactive_since; + + /* Latest restart_lsn that has been flushed to disk. For persistent slots + * the flushed LSN should be taken into account when calculating the oldest + * LSN for WAL segments removal. + */ + XLogRecPtr restart_lsn_flushed; + } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) -- 2.34.1
From 2413ab4468b94280d19316b203848912ed6d713f Mon Sep 17 00:00:00 2001 From: Vitaly Davydov <v.davy...@postgrespro.ru> Date: Fri, 13 Dec 2024 16:02:14 +0300 Subject: [PATCH 2/2] Fix src/recovery/t/001_stream_rep.pl --- src/test/recovery/t/001_stream_rep.pl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ee57d234c86..eae9d00b9b4 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -553,6 +553,9 @@ chomp($phys_restart_lsn_post); ok( ($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0, "physical slot advance persists across restarts"); +# Cleanup unused WAL segments +$node_primary->safe_psql('postgres', "CHECKPOINT;"); + # Check if the previous segment gets correctly recycled after the # server stopped cleanly, causing a shutdown checkpoint to be generated. my $primary_data = $node_primary->data_dir; -- 2.34.1