Dear Hackers,
 
I'd like to introduce an improved version of my patch (see the attached file). 
My original idea was to take into account saved on disk restart_lsn 
(slot→restart_lsn_flushed) for persistent slots when removing WAL segment 
files. It helps tackle errors like: ERROR: requested WAL segment 000...0AA has 
already been removed.
 
Improvements:
 * flushed_restart_lsn is used only for RS_PERSISTENT slots. * Save physical 
slot on disk when advancing only once - if restart_lsn_flushed is invalid. It 
is needed because slots with invalid restart LSN are not used when calculating 
oldest LSN for WAL truncation. Once restart_lsn becomes valid, it should be 
saved to disk immediately to update restart_lsn_flushed.
Regression tests seems to be ok except:
 * recovery/t/001_stream_rep.pl (checkpoint is needed) * 
recovery/t/019_replslot_limit.pl (it seems, slot was invalidated, some 
adjustments are needed) * pg_basebackup/t/020_pg_receivewal.pl (not sure about 
it)
 
There are some problems:
 * More WAL segments may be kept. It may lead to invalidations of slots in some 
tests (recovery/t/019_replslot_limit.pl). A couple of tests should be adjusted.
 
With best regards,
Vitaly Davydov


On Thursday, October 31, 2024 13:32 MSK, "Vitaly Davydov" 
<v.davy...@postgrespro.ru> wrote:

 
Sorry, attached the missed patch.

On Thursday, October 31, 2024 13:18 MSK, "Vitaly Davydov" 
<v.davy...@postgrespro.ru> wrote:

Dear Hackers,
 
I'd like to discuss a problem with replication slots's restart LSN. Physical 
slots are saved to disk at the beginning of checkpoint. At the end of 
checkpoint, old WAL segments are recycled or removed from disk, if they are not 
kept by slot's restart_lsn values.
 
If an existing physical slot is advanced in the middle of checkpoint execution, 
WAL segments, which are related to saved on disk restart LSN may be removed. It 
is because the calculation of the replication slot miminal LSN is occured at 
the end of checkpoint, prior to old WAL segments removal. If to hard stop 
(pg_stl -m immediate) the postgres instance right after checkpoint and to 
restart it, the slot's restart_lsn may point to the removed WAL segment. I 
believe, such behaviour is not good.
 
The doc [0] describes that restart_lsn may be set to the some past value after 
reload. There is a discussion [1] on pghackers where such behaviour is 
discussed. The main reason of not flushing physical slots on advancing is a 
performance reason. I'm ok with such behaviour, except of that the 
corresponding WAL segments should not be removed.
 
I propose to keep WAL segments by saved on disk (flushed) restart_lsn of slots. 
Add a new field restart_lsn_flushed into ReplicationSlot structure. Copy 
restart_lsn to restart_lsn_flushed in SaveSlotToPath. It doesn't change the 
format of storing the slot contents on disk. I attached a patch. It is not yet 
complete, but demonstate a way to solve the problem.
 
I reproduced the problem by the following way:
 * Add some delay in CheckPointBuffers (pg_usleep) to emulate long checkpoint 
execution. * Execute checkpoint and pg_replication_slot_advance right after 
starting of the checkpoint from another connection. * Hard restart the server 
right after checkpoint completion. * After restart slot's restart_lsn may point 
to removed WAL segment.
The proposed patch fixes it.
 
[0] https://www.postgresql.org/docs/current/logicaldecoding-explanation.html
[1] 
https://www.postgresql.org/message-id/flat/059cc53a-8b14-653a-a24d-5f867503b0ee%40postgrespro.ru



 



 
From d52e254c558e665bc41389e02e026c1069b29861 Mon Sep 17 00:00:00 2001
From: Vitaly Davydov <v.davy...@postgrespro.ru>
Date: Thu, 31 Oct 2024 12:29:12 +0300
Subject: [PATCH] Keep WAL segments by slot's flushed restart LSN

---
 src/backend/replication/slot.c      | 27 ++++++++++++++++++++++++++-
 src/backend/replication/walsender.c | 13 +++++++++++++
 src/include/replication/slot.h      |  4 ++++
 3 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6828100cf1..e6aef1f9a3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
 	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+	slot->restart_lsn_flushed = InvalidXLogRecPtr;
 	slot->inactive_since = 0;
 
 	/*
@@ -1142,20 +1143,28 @@ ReplicationSlotsComputeRequiredLSN(void)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
+		XLogRecPtr	restart_lsn_flushed;
 		bool		invalidated;
+		ReplicationSlotPersistency persistency;
 
 		if (!s->in_use)
 			continue;
 
 		SpinLockAcquire(&s->mutex);
+		persistency = s->data.persistency;
 		restart_lsn = s->data.restart_lsn;
 		invalidated = s->data.invalidated != RS_INVAL_NONE;
+		restart_lsn_flushed = s->restart_lsn_flushed;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
 		if (invalidated)
 			continue;
 
+		/* truncate WAL for persistent slots by flushed restart_lsn */
+		if (persistency == RS_PERSISTENT)
+			restart_lsn = restart_lsn_flushed;
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -1193,7 +1202,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 	{
 		ReplicationSlot *s;
 		XLogRecPtr	restart_lsn;
+		XLogRecPtr	restart_lsn_flushed;
 		bool		invalidated;
+		ReplicationSlotPersistency persistency;
 
 		s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1207,14 +1218,20 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 
 		/* read once, it's ok if it increases while we're checking */
 		SpinLockAcquire(&s->mutex);
-		restart_lsn = s->data.restart_lsn;
+		persistency = s->data.persistency;
+		restart_lsn = s->restart_lsn_flushed;
 		invalidated = s->data.invalidated != RS_INVAL_NONE;
+		restart_lsn_flushed = s->restart_lsn_flushed;
 		SpinLockRelease(&s->mutex);
 
 		/* invalidated slots need not apply */
 		if (invalidated)
 			continue;
 
+		/* truncate WAL for persistent slots by flushed restart_lsn */
+		if (persistency == RS_PERSISTENT)
+			restart_lsn = restart_lsn_flushed;
+
 		if (restart_lsn == InvalidXLogRecPtr)
 			continue;
 
@@ -1432,6 +1449,7 @@ ReplicationSlotReserveWal(void)
 
 	Assert(slot != NULL);
 	Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+	Assert(slot->restart_lsn_flushed == InvalidXLogRecPtr);
 
 	/*
 	 * The replication slot mechanism is used to prevent removal of required
@@ -1607,6 +1625,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		 */
 		SpinLockAcquire(&s->mutex);
 
+		Assert(s->data.restart_lsn >= s->restart_lsn_flushed);
+
 		restart_lsn = s->data.restart_lsn;
 
 		/* we do nothing if the slot is already invalid */
@@ -1691,7 +1711,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			 * just rely on .invalidated.
 			 */
 			if (invalidation_cause == RS_INVAL_WAL_REMOVED)
+			{
 				s->data.restart_lsn = InvalidXLogRecPtr;
+				s->restart_lsn_flushed = InvalidXLogRecPtr;
+			}
 
 			/* Let caller know */
 			*invalidated = true;
@@ -2189,6 +2212,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	if (!slot->just_dirtied)
 		slot->dirty = false;
 	slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+	slot->restart_lsn_flushed = cp.slotdata.restart_lsn;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2386,6 +2410,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->effective_xmin = cp.slotdata.xmin;
 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
 		slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
+		slot->restart_lsn_flushed = cp.slotdata.restart_lsn;
 
 		slot->candidate_catalog_xmin = InvalidTransactionId;
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 371eef3ddd..03cdce23f0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2329,6 +2329,7 @@ static void
 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	bool		changed = false;
+	XLogRecPtr	restart_lsn_flushed;
 	ReplicationSlot *slot = MyReplicationSlot;
 
 	Assert(lsn != InvalidXLogRecPtr);
@@ -2336,6 +2337,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (slot->data.restart_lsn != lsn)
 	{
 		changed = true;
+		restart_lsn_flushed = slot->restart_lsn_flushed;
 		slot->data.restart_lsn = lsn;
 	}
 	SpinLockRelease(&slot->mutex);
@@ -2343,6 +2345,17 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
+
+		/* Save the replication slot on disk in case of its flushed restart_lsn
+		 * is invalid. Slots with invalid restart lsn are ignored when
+		 * calculating required LSN. Once we started to keep the WAL by flushed
+		 * restart LSN, we should save to disk an initial valid value.
+		 */
+		if (slot->data.persistency == RS_PERSISTENT) {
+			if (restart_lsn_flushed == InvalidXLogRecPtr && lsn != InvalidXLogRecPtr)
+				ReplicationSlotSave();
+		}
+
 		ReplicationSlotsComputeRequiredLSN();
 		PhysicalWakeupLogicalWalSnd();
 	}
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 45582cf9d8..ca4c3aab3b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -207,6 +207,10 @@ typedef struct ReplicationSlot
 
 	/* The time since the slot has become inactive */
 	TimestampTz inactive_since;
+
+	/* Latest restart LSN that was flushed to disk */
+	XLogRecPtr restart_lsn_flushed;
+
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
-- 
2.34.1

Reply via email to