Dear Alexander, Amit, All

> Amit wrote:
> > Is my understanding correct that we need 0001 because
> > PhysicalConfirmReceivedLocation() doesn't save the slot to disk after
> > changing the slot's restart_lsn?
>
> Yes.  Also, even if it would save slot to the disk, there is still
> race condition that concurrent checkpoint could use updated value from
> the shared memory to clean old WAL segments, and then crash happens
> before we managed to write the slot to the disk.
>
> How can that happen, if we first write the updated value to disk and
> then update the shared memory as we do in
> LogicalConfirmReceivedLocation?

I guess, that the problem with logical slots still exist. Please, see the tap
test: src/test/recovery/t/046_logical_slot.pl from the v6 version of the patch.
A race condition may happen when logical slot's restart_lsn was changed but not
yet written to the disk. Imagine, there is another physical slot which is
advanced at this moment. It recomputes oldest min LSN and takes into account
changed but not saved to disk restart_lsn of the logical slot. We come to the
situation when the WAL segment for the logical slot's restart_lsn may be
truncated after immediate restart.

I'm not sure what may happen with two checkpoints which execute in parallel, but
I would say that the patch 0001 guarantees that every checkpoint run will trim
the WAL segments based on the already saved on disk restart LSNs of slots. The
rule to trim the WAL by saved slot's restart_lsn will not be violated.

> Amit wrote:
> As per my understanding, neither the xmin nor the LSN problem exists
> for logical slots. I am pointing this out to indicate we may need to
> think of a different solution for physical slots, if these are
> problems only for physical slots.

As I've already told, it indirectly affects the logical slots as well.

> Alexander wrote:
> I spend more time on this.  The next revision is attached.  It
> contains revised comments and other cosmetic changes.  I'm going to
> backpatch 0001 to all supported branches, and 0002 to 17 where
> injection points were introduced.

Alexander, thank you for polishing the patch. Just my opinion, I would prefer
to put tests before the fix due to reason that you can reproduce the problem
when simply checkout the commit with tests. Once, the tests are after the fix
you are not able to do this way. Anyway, I'm ok with your changes. Thank you!

I did some changes in the patch (v7 is attached):
* Removed modules/test_replslot_required_lsn directory. It is not needed 
anymore,
once you've moved test files to another directory.
* Renamed tests to 046_checkpoint_logical_slot.pl, 
047_checkpoint_physical_slot.pl.
I believe, such names are more descriptive.

Please, consider these changes.

With best regards,
Vitaly
From b85ffd56eb1e28d5e61e6221ba97e7e3bea7a982 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorot...@postgresql.org>
Date: Sat, 24 May 2025 13:34:36 +0300
Subject: [PATCH 3/3] Remove redundant ReplicationSlotsComputeRequiredLSN calls

The function ReplicationSlotsComputeRequiredLSN is used to calculate the
oldest slots' required LSN. It is called every time when restart_lsn
value of any slot is changed (for example, when a slot is advancing).
The slot's oldest required LSN is used to remote old WAL segments in two
places - when checkpoint or restart point is created (CreateCheckPoint,
CreateRestartPoint functions). Old WAL segments seems to be truncated in
these two functions only.

The idea of the patch is to call ReplicationSlotsComputeRequiredLSN in
CreateCheckPoint or CreateRestartPoint functions only, before call of
RemoveOldXlogFiles function where old WAL segments are removed. There
is no obvious need to recalculate oldest required LSN every time when a
slot's restart_lsn is changed.

The value of the oldest required lsn can affect on slot invalidation.
The function InvalidateObsoleteReplicationSlots with non zero second
parameter (oldestSegno) is called in CreateCheckPoint,
CreateRestartPoint functions only where slot invalidation occurs with
reason RS_INVAL_WAL_REMOVED. Once we update the oldest slots' required
lsn in the beginning of these functions, the proposed patch should not
break the behaviour of slot invalidation function in this case.
---
 src/backend/access/transam/xlog.c          | 4 ++++
 src/backend/replication/logical/logical.c  | 1 -
 src/backend/replication/logical/slotsync.c | 4 ----
 src/backend/replication/slot.c             | 5 -----
 src/backend/replication/slotfuncs.c        | 2 --
 src/backend/replication/walsender.c        | 1 -
 6 files changed, 4 insertions(+), 13 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0a7f7a71d8b..bdfd0a59ab7 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7324,6 +7324,7 @@ CreateCheckPoint(int flags)
 	 * might be advanced concurrently, so we call this before
 	 * CheckPointReplicationSlots() synchronizes replication slots.
 	 */
+	ReplicationSlotsComputeRequiredLSN();
 	slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
 
 	/*
@@ -7528,6 +7529,7 @@ CreateCheckPoint(int flags)
 		 * cleanup.  Then, we must synchronize the replication slots again in
 		 * order to make this LSN safe to use.
 		 */
+		ReplicationSlotsComputeRequiredLSN();
 		slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
 		CheckPointReplicationSlots(shutdown);
 
@@ -7901,6 +7903,7 @@ CreateRestartPoint(int flags)
 	 * might be advanced concurrently, so we call this before
 	 * CheckPointReplicationSlots() synchronizes replication slots.
 	 */
+	ReplicationSlotsComputeRequiredLSN();
 	slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
 
 	if (log_checkpoints)
@@ -8001,6 +8004,7 @@ CreateRestartPoint(int flags)
 		 * cleanup.  Then, we must synchronize the replication slots again in
 		 * order to make this LSN safe to use.
 		 */
+		ReplicationSlotsComputeRequiredLSN();
 		slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
 		CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 081e6593722..34e973393c2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1925,7 +1925,6 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
 			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
 		}
 	}
 	else
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..30662c09275 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -335,7 +335,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		SpinLockRelease(&slot->mutex);
 
 		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
 	}
 
 	return updated_config || updated_xmin_or_lsn;
@@ -502,9 +501,6 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-		/* Prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
-
 		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
 
 		/*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87fa9cb..dd18fe10f7d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1008,7 +1008,6 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * limits.
 	 */
 	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
 
 	/*
 	 * If removing the directory fails, the worst thing that will happen is
@@ -1494,9 +1493,6 @@ ReplicationSlotReserveWal(void)
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
-
 		/*
 		 * If all required WAL is still there, great, otherwise retry. The
 		 * slot should prevent further removal of WAL, unless there's a
@@ -2014,7 +2010,6 @@ restart:
 	if (invalidated)
 	{
 		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
 	}
 
 	return invalidated;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 36cc2ed4e44..3300fb9b1c9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -583,7 +583,6 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	 * advancing potentially done.
 	 */
 	ReplicationSlotsComputeRequiredXmin(false);
-	ReplicationSlotsComputeRequiredLSN();
 
 	ReplicationSlotRelease();
 
@@ -819,7 +818,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
 		ReplicationSlotSave();
 
 #ifdef USE_ASSERT_CHECKING
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d751d34295d..dff749f00a8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2384,7 +2384,6 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	if (changed)
 	{
 		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredLSN();
 		PhysicalWakeupLogicalWalSnd();
 	}
 
-- 
2.34.1

From c978cc88848615670fce667c83cda3fe874d80c0 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorot...@postgresql.org>
Date: Sat, 24 May 2025 13:26:28 +0300
Subject: [PATCH 2/3] Add TAP tests to check replication slot advance during
 the checkpoint

The new tests verify that logical and physical replication slots are still
valid after an immediate restart on checkpoint completion when the slot was
advanced during the checkpoint.

This commit introduces two new injection points to make these tests possible:

* checkpoint-before-old-wal-removal - triggered in the checkpointer process
  just before old WAL segments cleanup;
* logical-replication-slot-advance-segment - triggered in
  LogicalConfirmReceivedLocation() when restart_lsn was changed enough to
  point to the next WAL segment.

Discussion: https://postgr.es/m/flat/1d12d2-67235980-35-19a406a0%4063439497
Author: Vitaly Davydov <v.davy...@postgrespro.ru>
Author: Tomas Vondra <to...@vondra.me>
Reviewed-by: Alexander Korotkov <aekorot...@gmail.com>
Backpatch-through: 17
---
 src/backend/access/transam/xlog.c             |   4 +
 src/backend/replication/logical/logical.c     |  18 +++
 src/test/recovery/meson.build                 |   2 +
 .../recovery/t/046_checkpoint_logical_slot.pl | 139 ++++++++++++++++++
 .../t/047_checkpoint_physical_slot.pl         | 133 +++++++++++++++++
 5 files changed, 296 insertions(+)
 create mode 100644 src/test/recovery/t/046_checkpoint_logical_slot.pl
 create mode 100644 src/test/recovery/t/047_checkpoint_physical_slot.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a0e589e9c4b..0a7f7a71d8b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7509,6 +7509,10 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+#ifdef USE_INJECTION_POINTS
+	INJECTION_POINT("checkpoint-before-old-wal-removal", NULL);
+#endif
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6b3995133e2..081e6593722 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "access/xact.h"
+#include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "fmgr.h"
 #include "miscadmin.h"
@@ -41,6 +42,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
 
@@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
+		XLogRecPtr	restart_lsn pg_attribute_unused();
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
+		/* remember the old restart lsn */
+		restart_lsn = MyReplicationSlot->data.restart_lsn;
+
 		/*
 		 * Prevent moving the confirmed_flush backwards, as this could lead to
 		 * data duplication issues caused by replicating already replicated
@@ -1889,6 +1895,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		 */
 		if (updated_xmin || updated_restart)
 		{
+#ifdef USE_INJECTION_POINTS
+			XLogSegNo	seg1,
+						seg2;
+
+			XLByteToSeg(restart_lsn, seg1, wal_segment_size);
+			XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
+
+			/* trigger injection point, but only if segment changes */
+			if (seg1 != seg2)
+				INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
+#endif
+
 			ReplicationSlotMarkDirty();
 			ReplicationSlotSave();
 			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index cb983766c67..92429d28402 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -54,6 +54,8 @@ tests += {
       't/043_no_contrecord_switch.pl',
       't/044_invalidate_inactive_slots.pl',
       't/045_archive_restartpoint.pl',
+      't/046_checkpoint_logical_slot.pl',
+      't/047_checkpoint_physical_slot.pl'
     ],
   },
 }
diff --git a/src/test/recovery/t/046_checkpoint_logical_slot.pl b/src/test/recovery/t/046_checkpoint_logical_slot.pl
new file mode 100644
index 00000000000..b4265c4a6a5
--- /dev/null
+++ b/src/test/recovery/t/046_checkpoint_logical_slot.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# This test verifies the case when the logical slot is advanced during
+# checkpoint. The test checks that the logical slot's restart_lsn still refers
+# to an existed WAL segment after immediate restart.
+#
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+
+$node = PostgreSQL::Test::Cluster->new('mike');
+$node->init;
+$node->append_conf('postgresql.conf',
+	"shared_preload_libraries = 'injection_points'");
+$node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+
+# Create a simple table to generate data into.
+$node->safe_psql('postgres',
+	q{create table t (id serial primary key, b text)});
+
+# Create the two slots we'll need.
+$node->safe_psql('postgres',
+	q{select pg_create_logical_replication_slot('slot_logical', 'test_decoding')}
+);
+$node->safe_psql('postgres',
+	q{select pg_create_physical_replication_slot('slot_physical', true)});
+
+# Advance both slots to the current position just to have everything "valid".
+$node->safe_psql('postgres',
+	q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null)}
+);
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run checkpoint to flush current state to disk and set a baseline.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Generate some transactions to get RUNNING_XACTS.
+my $xacts = $node->background_psql('postgres');
+$xacts->query_until(
+	qr/run_xacts/,
+	q(\echo run_xacts
+SELECT 1 \watch 0.1
+\q
+));
+
+# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+# Run another checkpoint to set a new restore LSN.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Another 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+# Run another checkpoint, this time in the background, and make it wait
+# on the injection point) so that the checkpoint stops right before
+# removing old WAL segments.
+note('starting checkpoint\n');
+
+my $checkpoint = $node->background_psql('postgres');
+$checkpoint->query_safe(
+	q(select injection_points_attach('checkpoint-before-old-wal-removal','wait'))
+);
+$checkpoint->query_until(
+	qr/starting_checkpoint/,
+	q(\echo starting_checkpoint
+checkpoint;
+\q
+));
+
+# Wait until the checkpoint stops right before removing WAL segments.
+note('waiting for injection_point\n');
+$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal');
+note('injection_point is reached');
+
+# Try to advance the logical slot, but make it stop when it moves to the next
+# WAL segment (this has to happen in the background, too).
+my $logical = $node->background_psql('postgres');
+$logical->query_safe(
+	q{select injection_points_attach('logical-replication-slot-advance-segment','wait');}
+);
+$logical->query_until(
+	qr/get_changes/,
+	q(
+\echo get_changes
+select count(*) from pg_logical_slot_get_changes('slot_logical', null, null) \watch 1
+\q
+));
+
+# Wait until the slot's restart_lsn points to the next WAL segment.
+note('waiting for injection_point\n');
+$node->wait_for_event('client backend',
+	'logical-replication-slot-advance-segment');
+note('injection_point is reached');
+
+# OK, we're in the right situation: time to advance the physical slot, which
+# recalculates the required LSN, and then unblock the checkpoint, which
+# removes the WAL still needed by the logical slot.
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Continue the checkpoint.
+$node->safe_psql('postgres',
+	q{select injection_points_wakeup('checkpoint-before-old-wal-removal')});
+
+# Abruptly stop the server (1 second should be enough for the checkpoint
+# to finish; it would be better).
+$node->stop('immediate');
+
+$node->start;
+
+eval {
+	$node->safe_psql('postgres',
+		q{select count(*) from pg_logical_slot_get_changes('slot_logical', null, null);}
+	);
+};
+is($@, '', "Logical slot still valid");
+
+done_testing();
diff --git a/src/test/recovery/t/047_checkpoint_physical_slot.pl b/src/test/recovery/t/047_checkpoint_physical_slot.pl
new file mode 100644
index 00000000000..454e56b9bd2
--- /dev/null
+++ b/src/test/recovery/t/047_checkpoint_physical_slot.pl
@@ -0,0 +1,133 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# This test verifies the case when the physical slot is advanced during
+# checkpoint. The test checks that the physical slot's restart_lsn still refers
+# to an existed WAL segment after immediate restart.
+#
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+
+$node = PostgreSQL::Test::Cluster->new('mike');
+$node->init;
+$node->append_conf('postgresql.conf',
+	"shared_preload_libraries = 'injection_points'");
+$node->append_conf('postgresql.conf', "wal_level = 'replica'");
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+
+# Create a simple table to generate data into.
+$node->safe_psql('postgres',
+	q{create table t (id serial primary key, b text)});
+
+# Create a physical replication slot.
+$node->safe_psql('postgres',
+	q{select pg_create_physical_replication_slot('slot_physical', true)});
+
+# Advance slot to the current position, just to have everything "valid".
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run checkpoint to flush current state to disk and set a baseline.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Insert 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,100000) s(i)}
+);
+
+# Advance slot to the current position, just to have everything "valid".
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Run another checkpoint to set a new restore LSN.
+$node->safe_psql('postgres', q{checkpoint});
+
+# Another 2M rows; that's about 260MB (~20 segments) worth of WAL.
+$node->safe_psql('postgres',
+	q{insert into t (b) select md5(i::text) from generate_series(1,1000000) s(i)}
+);
+
+my $restart_lsn_init = $node->safe_psql('postgres',
+	q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn_init);
+note("restart lsn before checkpoint: $restart_lsn_init");
+
+# Run another checkpoint, this time in the background, and make it wait
+# on the injection point) so that the checkpoint stops right before
+# removing old WAL segments.
+note('starting checkpoint');
+
+my $checkpoint = $node->background_psql('postgres');
+$checkpoint->query_safe(
+	q{select injection_points_attach('checkpoint-before-old-wal-removal','wait')}
+);
+$checkpoint->query_until(
+	qr/starting_checkpoint/,
+	q(\echo starting_checkpoint
+checkpoint;
+\q
+));
+
+# Wait until the checkpoint stops right before removing WAL segments.
+note('waiting for injection_point');
+$node->wait_for_event('checkpointer', 'checkpoint-before-old-wal-removal');
+note('injection_point is reached');
+
+# OK, we're in the right situation: time to advance the physical slot, which
+# recalculates the required LSN and then unblock the checkpoint, which
+# removes the WAL still needed by the physical slot.
+$node->safe_psql('postgres',
+	q{select pg_replication_slot_advance('slot_physical', pg_current_wal_lsn())}
+);
+
+# Continue the checkpoint.
+$node->safe_psql('postgres',
+	q{select injection_points_wakeup('checkpoint-before-old-wal-removal')});
+
+my $restart_lsn_old = $node->safe_psql('postgres',
+	q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn_old);
+note("restart lsn before stop: $restart_lsn_old");
+
+# Abruptly stop the server (1 second should be enough for the checkpoint
+# to finish; it would be better).
+$node->stop('immediate');
+
+$node->start;
+
+# Get the restart_lsn of the slot right after restarting.
+my $restart_lsn = $node->safe_psql('postgres',
+	q{select restart_lsn from pg_replication_slots where slot_name = 'slot_physical'}
+);
+chomp($restart_lsn);
+note("restart lsn: $restart_lsn");
+
+# Get the WAL segment name for the slot's restart_lsn.
+my $restart_lsn_segment = $node->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn'::pg_lsn)");
+chomp($restart_lsn_segment);
+
+# Check if the required wal segment exists.
+note("required by slot segment name: $restart_lsn_segment");
+my $datadir = $node->data_dir;
+ok( -f "$datadir/pg_wal/$restart_lsn_segment",
+	"WAL segment $restart_lsn_segment for physical slot's restart_lsn $restart_lsn exists"
+);
+
+done_testing();
-- 
2.34.1

From 1e0629efc65f190a58ec729db6f3ada4f8b83897 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorot...@postgresql.org>
Date: Sat, 24 May 2025 16:26:27 +0300
Subject: [PATCH 1/3] Keep WAL segments by the flushed value of the slot's
 restart LSN

The patch fixes the issue with the unexpected removal of old WAL segments
after checkpoint, followed by an immediate restart. The issue occurs when
a slot is advanced after the start of the checkpoint and before old WAL
segments are removed at the end of the checkpoint.

The idea of the patch is to get the minimal restart_lsn at the beginning
of checkpoint (or restart point) creation and use this value when calculating
the oldest LSN for WAL segments removal at the end of checkpoint. This idea
was proposed by Tomas Vondra in the discussion.

Discussion: https://postgr.es/m/flat/1d12d2-67235980-35-19a406a0%4063439497
Author: Vitaly Davydov <v.davy...@postgrespro.ru>
Reviewed-by: Tomas Vondra <to...@vondra.me>
Reviewed-by: Alexander Korotkov <aekorot...@gmail.com>
Backpatch-through: 13
---
 src/backend/access/transam/xlog.c         | 55 +++++++++++++++++++----
 src/backend/replication/logical/logical.c | 10 ++++-
 src/backend/replication/walsender.c       |  4 ++
 3 files changed, 60 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1914859b2ee..a0e589e9c4b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -677,7 +677,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
 												  XLogRecPtr pagePtr,
 												  TimeLineID newTLI);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN,
+					   XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli,
@@ -7087,6 +7088,7 @@ CreateCheckPoint(int flags)
 	VirtualTransactionId *vxids;
 	int			nvxids;
 	int			oldXLogAllowed = 0;
+	XLogRecPtr	slotsMinReqLSN;
 
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7315,6 +7317,15 @@ CreateCheckPoint(int flags)
 	 */
 	END_CRIT_SECTION();
 
+	/*
+	 * Get the current minimum LSN to be used later in the WAL segment
+	 * cleanup.  We may clean up only WAL segments, which are not needed
+	 * according to synchronized LSNs of replication slots.  The slot's LSN
+	 * might be advanced concurrently, so we call this before
+	 * CheckPointReplicationSlots() synchronizes replication slots.
+	 */
+	slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
 	/*
 	 * In some cases there are groups of actions that must all occur on one
 	 * side or the other of a checkpoint record. Before flushing the
@@ -7503,17 +7514,25 @@ CreateCheckPoint(int flags)
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
-	KeepLogSeg(recptr, &_logSegNo);
+	KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
 	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
 										   _logSegNo, InvalidOid,
 										   InvalidTransactionId))
 	{
+		/*
+		 * Recalculate the current minimum LSN to be used in the WAL segment
+		 * cleanup.  Then, we must synchronize the replication slots again in
+		 * order to make this LSN safe to use.
+		 */
+		slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+		CheckPointReplicationSlots(shutdown);
+
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
 		 * horizon, starting again from RedoRecPtr.
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
-		KeepLogSeg(recptr, &_logSegNo);
+		KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr,
@@ -7788,6 +7807,7 @@ CreateRestartPoint(int flags)
 	XLogRecPtr	endptr;
 	XLogSegNo	_logSegNo;
 	TimestampTz xtime;
+	XLogRecPtr	slotsMinReqLSN;
 
 	/* Concurrent checkpoint/restartpoint cannot happen */
 	Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER);
@@ -7870,6 +7890,15 @@ CreateRestartPoint(int flags)
 	MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 	CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
+	/*
+	 * Get the current minimum LSN to be used later in the WAL segment
+	 * cleanup.  We may clean up only WAL segments, which are not needed
+	 * according to synchronized LSNs of replication slots.  The slot's LSN
+	 * might be advanced concurrently, so we call this before
+	 * CheckPointReplicationSlots() synchronizes replication slots.
+	 */
+	slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
 	if (log_checkpoints)
 		LogCheckpointStart(flags, true);
 
@@ -7958,17 +7987,25 @@ CreateRestartPoint(int flags)
 	receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
-	KeepLogSeg(endptr, &_logSegNo);
+	KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
 	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
 										   _logSegNo, InvalidOid,
 										   InvalidTransactionId))
 	{
+		/*
+		 * Recalculate the current minimum LSN to be used in the WAL segment
+		 * cleanup.  Then, we must synchronize the replication slots again in
+		 * order to make this LSN safe to use.
+		 */
+		slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+		CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
+
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
 		 * horizon, starting again from RedoRecPtr.
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
-		KeepLogSeg(endptr, &_logSegNo);
+		KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
 	}
 	_logSegNo--;
 
@@ -8063,6 +8100,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	XLogSegNo	oldestSegMaxWalSize;	/* oldest segid kept by max_wal_size */
 	XLogSegNo	oldestSlotSeg;	/* oldest segid kept by slot */
 	uint64		keepSegs;
+	XLogRecPtr	slotsMinReqLSN;
 
 	/*
 	 * slot does not reserve WAL. Either deactivated, or has never been active
@@ -8076,8 +8114,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 * oldestSlotSeg to the current segment.
 	 */
 	currpos = GetXLogWriteRecPtr();
+	slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
 	XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size);
-	KeepLogSeg(currpos, &oldestSlotSeg);
+	KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg);
 
 	/*
 	 * Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -8138,7 +8177,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
  * invalidation is optionally done here, instead.
  */
 static void
-KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
+KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo)
 {
 	XLogSegNo	currSegNo;
 	XLogSegNo	segno;
@@ -8151,7 +8190,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	 * Calculate how many segments are kept by slots first, adjusting for
 	 * max_slot_wal_keep_size.
 	 */
-	keep = XLogGetReplicationSlotMinimumLSN();
+	keep = slotsMinReqLSN;
 	if (keep != InvalidXLogRecPtr && keep < recptr)
 	{
 		XLByteToSeg(keep, segno, wal_segment_size);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..6b3995133e2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1878,7 +1878,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
-		/* first write new xmin to disk, so we know what's up after a crash */
+		/*
+		 * First, write new xmin and restart_lsn to disk so we know what's up
+		 * after a crash.  Even when we do this, the checkpointer can see the
+		 * updated restart_lsn value in the shared memory; then, a crash can
+		 * happen before we manage to write that value to the disk.  Thus,
+		 * checkpointer still needs to make special efforts to keep WAL
+		 * segments required by the restart_lsn written to the disk.  See
+		 * CreateCheckPoint() and CreateRestartPoint() for details.
+		 */
 		if (updated_xmin || updated_restart)
 		{
 			ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..d751d34295d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2393,6 +2393,10 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	 * be energy wasted - the worst thing lost information could cause here is
 	 * to give wrong information in a statistics view - we'll just potentially
 	 * be more conservative in removing files.
+	 *
+	 * Checkpointer makes special efforts to keep the WAL segments required by
+	 * the restart_lsn written to the disk. See CreateCheckPoint() and
+	 * CreateRestartPoint() for details.
 	 */
 }
 
-- 
2.34.1

Reply via email to