Hi,

I've been investigating some issues reported by users, related to
logical replication unexpectedly breaking with messages like:

  LOG: invalidating slot "s" because its restart_lsn X/Y exceeds
       max_slot_wal_keep_size

which is pretty confusing, because the system has that GUC set to -1 (so
disabled, there should be no limit). Or a message like this:

  ERROR: requested WAL segment 000...0AA has already been removed

which is a bit less confusing, but still puzzling because replication
slots are meant to prevent exactly this.

I speculated there's some sort of rare race condition, in how we advance
the slot LSN values. I didn't know where to start, so I gave up on
trying to understand the whole complex code. Instead, I wrote a simple
stress test that

1) sets up a cluster (primary + 1+ logical replicas)

2) runs pgbench on the primary, checks replicas/slots

3) randomly restarts the nodes (both primary/replicas) with either fast
  and immediate mode, with random short pauses

4) runs checkpoints in tight while loop

This is based on the observations that in past reports the issues seem
to happen only with logical replication, shortly after reconnect (e.g.
after connection reset etc.). And the slots are invalidated / WAL
removed during a checkpoint, so frequent checkpoints should make it
easier to hit ...

Attached is a couple scripts running this - it's not particularly pretty
and may need some tweak to make it work on your machine (run.sh is the
script to run).

And unfortunately, this started to fail pretty quick. The short story is
that it's not difficult to get into a situation where restart_lsn for a
slot moves backwards, so something like this can happen:

1) slot restart_lsn moves forward to LSN A

2) checkpoint happens, updates min required LSN for slots, recycles
segments it considers unnecessary (up to LSN A)

3) slot restart_lsn moves backwards to LSN B (where B < A)

4) kaboom

This would perfectly explain all the symptoms I mentioned earlier. The
max_slot_wal_keep_size reference is confusing, but AFAIK it's just based
on (reasonable) belief that LSN can't move backwards, and so the only
reason for restart_lsn being before min required LSN is that this GUC
kicked in. But the assumption does not hold :-(

Now, why/how can this happen?

I kept adding a elog(LOG) messages to all places updating LSNs for a
slot, and asserts to fail if data.restart_lsn moves backwards. See the
attached 0001 patch. An example for a failure (for the walsended backend
that triggered the assert) looks like this:

  344.139 LOG:  starting logical decoding for slot "s1"

  344.139 DETAIL:  Streaming transactions committing after 1/E97EAC30,
                   reading WAL from 1/E96FB4D0.

  344.140 LOG:  logical decoding found consistent point at 1/E96FB4D0

  344.140 DETAIL:  Logical decoding will begin using saved snapshot.

  344.140 LOG:  LogicalConfirmReceivedLocation 1/E9865398

  344.140 LOG:  LogicalConfirmReceivedLocation updating
                   data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0)
                   candidate_restart_valid 0/0 (from 1/E9865398)
                   candidate_restart_lsn 0/0 (from 1/E979D4C8)

  344.145 LOG:  LogicalIncreaseRestartDecodingForSlot
                   restart_decoding_lsn 1/E96FB4D0

  344.145 LOG:  LogicalIncreaseRestartDecodingForSlot s1
                   candidate_restart_valid_lsn 1/E979D4C8 (0/0)
                   candidate_restart_lsn 1/E96FB4D0 (0/0)

  344.147 LOG:  LogicalIncreaseRestartDecodingForSlot
                   restart_decoding_lsn 1/E979D4C8

  344.156 LOG:  LogicalIncreaseXminForSlot
                   candidate_catalog_xmin 30699
                   candidate_xmin_lsn 1/E993AD68 (0/0)
  ...
  344.235 LOG:  LogicalIncreaseRestartDecodingForSlot
                   restart_decoding_lsn 1/E9F33AF8

  344.240 LOG:  LogicalConfirmReceivedLocation 1/E9DCCD78

  344.240 LOG:  LogicalConfirmReceivedLocation updating
                   data.restart_lsn to 1/E96FB4D0 (from 1/E979D4C8)
                   candidate_restart_valid 0/0 (from 1/E979D4C8)
                   candidate_restart_lsn 0/0 (from 1/E96FB4D0)

  345.536 LOG:  server process (PID 2518127) was terminated by
                signal 6: Aborted

We start decoding at 1/E96FB4D0, and right after startup we get a
confirmation, and LogicalConfirmReceivedLocation updates restart_lsn to
1/E979D4C8.

But then LogicalIncreaseRestartDecodingForSlot comes along, and stores
the restart_decoding_lsn 1/E96FB4D0 (which is the initial restart_lsn)
into candidate_restart_lsn.

And then a little bit later we get another confirmation, we call
LogicalConfirmReceivedLocation which propagates candidate_restart_lsn
into data.restart_lsn.

This is how restart_lsn moves backwards, causing issues. I've reproduced
this on PG14 and current master, but I believe the issue exists since
the introduction of logical replication in 9.4 :-(

I'm not claiming this is the only way how this can happen, but all the
cases I've seen in my stress testing look like this. Moreover, I'm not
claiming this is the only LSN field that can move backwards like this.
It seems to me various other candidate_ fields have the same issue, but
may have consequences other than discarding "unnecessary" WAL.

I've been removed of this [1] thread from 2022. I'm 99% sure it's the
same issue - it happened shortly after a reconnect, etc. And it seems to
me Masahiko-san was about the causes in [2]. I don't think the fix
suggested in that message (changing the branch to "else if") can work,
though. At least it did not really help in my testing.

And I'm not sure it'd fix all the issues - it only affects restart_lsn,
but AFAICS the same issue (LSNs moving backwards) can happen for the
other LSN slot field (candidate_xmin_lsn).

I don't see how the backwards move could be valid for any of those
fields, and for "propagating" the candidate values into restart_lsn.

But there's no protection against the backward moves - so either it's
considered to be OK (which seems incorrect), or it was not expected to
happen in practice.

The 0001 patch adds an assert preventing those backward moves on all the
fields. This means it fails with ABORT earlier, even before a checkpoint
gets a chance to invalidate the slot or remove the segment.

0002 part replaces the asserts with elog(LOG), and instead tweaks the
updates to do Max(old,new) to prevent the backward moves. With this I'm
no longer able to reproduce the issue - and there's a lot of LOG
messages about the (prevented) backward moves.

Unfortunately, I'm not sure this is quite correct. Because consider for
example this:

    slot->candidate_catalog_xmin = xmin;
    slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn,
                                   current_lsn);

I suspect this means the fields could get "out of sync". Not sure what
could break because of this.

I have to admit the "protocol" for the candidate fields (and how the
values propagate) is not very clear to me. And AFAICS it's not really
described/explained anywhere :-(


Note: While working on this, I realized PG14 and PG15 needs the fix
eb27d3dc8887, which was backpatched only to 16+. But I hit that on 14
too during testing. I already pinged Daniel about this, but cherry-pick
that before testing before he has time for that.


regards


[1] https://www.postgresql.org/message-id/Yz2hivgyjS1RfMKs%40depesz.com

[2]
https://www.postgresql.org/message-id/CAD21AoBVhYnGBuW_o%3DwEGgTp01qiHNAx1a14b1X9kFXmuBe%3Dsg%40mail.gmail.com

-- 
Tomas Vondra

Attachment: logrep-script.tgz
Description: application/compressed-tar

From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:41:25 +0100
Subject: [PATCH vnew 1/2] WIP: add elog messages and asserts

shows how the LSNs move, protects against move backwards
---
 src/backend/access/transam/xlog.c           | 34 ++++++++++++
 src/backend/replication/logical/logical.c   | 60 +++++++++++++++++++++
 src/backend/replication/logical/snapbuild.c |  6 +++
 src/backend/replication/slot.c              | 17 +++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1a795bba9f..ba9415b9149 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
+	XLogRecPtr	oldlsn;
+
 	SpinLockAcquire(&XLogCtl->info_lck);
+	oldlsn = XLogCtl->replicationSlotMinLSN;
 	XLogCtl->replicationSlotMinLSN = lsn;
 	SpinLockRelease(&XLogCtl->info_lck);
+
+	elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn));
+
+	/* no backward moves */
+	Assert(lsn >= oldlsn);
 }
 
 
@@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 	retval = XLogCtl->replicationSlotMinLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
 
+	elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval));
+
 	return retval;
 }
 
@@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
+
+	elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(recptr, &_logSegNo);
+		elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
@@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
+
+	elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(endptr, &_logSegNo);
+
+		elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
 	}
 	_logSegNo--;
 
+	elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	/*
 	 * Try to recycle segments on a useful timeline. If we've been promoted
 	 * since the beginning of this restartpoint, use the new timeline chosen
@@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
+	elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno);
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..16fed53063b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin,
 		ReplicationSlotReserveWal();
 	else
 	{
+		XLogRecPtr oldlsn;
 		SpinLockAcquire(&slot->mutex);
+		oldlsn = slot->data.restart_lsn;
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
+
+		elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s",
+			LSN_FORMAT_ARGS(restart_lsn),
+			LSN_FORMAT_ARGS(oldlsn),
+			NameStr(slot->data.name));
 	}
 
 	/* ----
@@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			NameStr(slot->data.name),
+			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/* also don't even consider going back for actual restart_lsn */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			  NameStr(slot->data.name),
+			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/*
+		 * also don't even consider going back for actual restart_lsn
+		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		 */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
@@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
+	elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn));
+
 	/* Do an unlocked check for candidate_lsn first. */
 	if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
 		MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
@@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)",
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
+
+			/* don't allow the LSN to go backwards */
+			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
@@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
+			elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update");
 			ReplicationSlotsComputeRequiredXmin(false);
 			ReplicationSlotsComputeRequiredLSN();
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7b761510f1e..05184516704 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot));
 		LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+	}
 }
 
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..8a7aa257293 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / start");
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn));
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / done");
+
 	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
@@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotRelease();
 
 			ereport(LOG,
-					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)",
 							NameStr(slotname),
-							LSN_FORMAT_ARGS(restart_lsn))));
+							LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb)));
 
 			/* done with this slot for now */
 			break;
@@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
+	elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X",
+		oldestSegno, LSN_FORMAT_ARGS(oldestLSN));
+
 restart:
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1327,6 +1336,7 @@ restart:
 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
+			elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated);
 			goto restart;
 		}
 	}
@@ -1337,6 +1347,7 @@ restart:
 	 */
 	if (invalidated)
 	{
+		elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required");
 		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 	}
@@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
+		elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn));
+
 		slot->in_use = true;
 		slot->active_pid = 0;
 
-- 
2.39.5

From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:46:16 +0100
Subject: [PATCH vnew 2/2] WIP: change asserts to elog + defense

When updating LSNs, use Max() with the preceding value to protect against
moves backwards.
---
 src/backend/replication/logical/logical.c | 57 ++++++++++++-----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 16fed53063b..988f4add977 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 
 		/* our candidate can directly be used */
 		updated_xmin = true;
@@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 	}
 	SpinLockRelease(&slot->mutex);
 
@@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/* also don't even consider going back for actual restart_lsn */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/*
-		 * also don't even consider going back for actual restart_lsn
-		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
-		 */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
+
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
@@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
 
-			/* don't allow the LSN to go backwards */
-			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+			if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn)
+				elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn");
 
-			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
+			MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn,
+								  MyReplicationSlot->candidate_restart_lsn);
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
 			updated_restart = true;
-- 
2.39.5

From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:41:25 +0100
Subject: [PATCH v14 1/2] WIP: add elog messages and asserts

shows how the LSNs move, protects against move backwards
---
 src/backend/access/transam/xlog.c           | 34 ++++++++++++
 src/backend/replication/logical/logical.c   | 60 +++++++++++++++++++++
 src/backend/replication/logical/snapbuild.c |  6 +++
 src/backend/replication/slot.c              | 17 +++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f1a795bba9f..ba9415b9149 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 void
 XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
 {
+	XLogRecPtr	oldlsn;
+
 	SpinLockAcquire(&XLogCtl->info_lck);
+	oldlsn = XLogCtl->replicationSlotMinLSN;
 	XLogCtl->replicationSlotMinLSN = lsn;
 	SpinLockRelease(&XLogCtl->info_lck);
+
+	elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn));
+
+	/* no backward moves */
+	Assert(lsn >= oldlsn);
 }
 
 
@@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void)
 	retval = XLogCtl->replicationSlotMinLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
 
+	elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval));
+
 	return retval;
 }
 
@@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
+
+	elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(recptr, &_logSegNo);
+		elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
 	}
 	_logSegNo--;
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
+	elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo);
+
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
@@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
+
+	elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	if (InvalidateObsoleteReplicationSlots(_logSegNo))
 	{
 		/*
@@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags)
 		 */
 		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 		KeepLogSeg(endptr, &_logSegNo);
+
+		elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+			LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
 	}
 	_logSegNo--;
 
+	elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu",
+		LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo);
+
 	/*
 	 * Try to recycle segments on a useful timeline. If we've been promoted
 	 * since the beginning of this restartpoint, use the new timeline chosen
@@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 		}
 	}
 
+	elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno);
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f7b2c85d9b..16fed53063b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin,
 		ReplicationSlotReserveWal();
 	else
 	{
+		XLogRecPtr oldlsn;
 		SpinLockAcquire(&slot->mutex);
+		oldlsn = slot->data.restart_lsn;
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
+
+		elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s",
+			LSN_FORMAT_ARGS(restart_lsn),
+			LSN_FORMAT_ARGS(oldlsn),
+			NameStr(slot->data.name));
 	}
 
 	/* ----
@@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)",
+			  xmin, LSN_FORMAT_ARGS(current_lsn),
+			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
+
+		/* don't allow the LSN to go backwards */
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 	}
@@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			NameStr(slot->data.name),
+			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/* also don't even consider going back for actual restart_lsn */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)",
+			  NameStr(slot->data.name),
+			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
+			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
+
+		/* don't allow LSNs to go back */
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
+		/*
+		 * also don't even consider going back for actual restart_lsn
+		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		 */
+		Assert(slot->data.restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
@@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 {
 	Assert(lsn != InvalidXLogRecPtr);
 
+	elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn));
+
 	/* Do an unlocked check for candidate_lsn first. */
 	if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
 		MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
@@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)",
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
+				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
+
+			/* don't allow the LSN to go backwards */
+			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
@@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
+			elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update");
 			ReplicationSlotsComputeRequiredXmin(false);
 			ReplicationSlotsComputeRequiredLSN();
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7b761510f1e..05184516704 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
 		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
+	{
+		elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot));
 		LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+	}
 }
 
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 037a347cba0..8a7aa257293 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / start");
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 		restart_lsn = s->data.restart_lsn;
 		SpinLockRelease(&s->mutex);
 
+		elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn));
+
 		if (restart_lsn != InvalidXLogRecPtr &&
 			(min_required == InvalidXLogRecPtr ||
 			 restart_lsn < min_required))
@@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
+	elog(LOG, "ReplicationSlotsComputeRequiredLSN / done");
+
 	XLogSetReplicationSlotMinimumLSN(min_required);
 }
 
@@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
 			ReplicationSlotRelease();
 
 			ereport(LOG,
-					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)",
 							NameStr(slotname),
-							LSN_FORMAT_ARGS(restart_lsn))));
+							LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb)));
 
 			/* done with this slot for now */
 			break;
@@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
+	elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X",
+		oldestSegno, LSN_FORMAT_ARGS(oldestLSN));
+
 restart:
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1327,6 +1336,7 @@ restart:
 		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
+			elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated);
 			goto restart;
 		}
 	}
@@ -1337,6 +1347,7 @@ restart:
 	 */
 	if (invalidated)
 	{
+		elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required");
 		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 	}
@@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
+		elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn));
+
 		slot->in_use = true;
 		slot->active_pid = 0;
 
-- 
2.39.5

From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001
From: tomas <tomas>
Date: Wed, 6 Nov 2024 12:46:16 +0100
Subject: [PATCH v14 2/2] WIP: change asserts to elog + defense

When updating LSNs, use Max() with the preceding value to protect against
moves backwards.
---
 src/backend/replication/logical/logical.c | 57 ++++++++++++-----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 16fed53063b..988f4add977 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 
 		/* our candidate can directly be used */
 		updated_xmin = true;
@@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 			  xmin, LSN_FORMAT_ARGS(current_lsn),
 			  LSN_FORMAT_ARGS(slot->candidate_xmin_lsn));
 
-		/* don't allow the LSN to go backwards */
-		Assert(slot->candidate_xmin_lsn <= current_lsn);
+		if (slot->candidate_xmin_lsn > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn");
 
+		/* XXX doesn't the Max() put the fields out of sync? */
 		slot->candidate_catalog_xmin = xmin;
-		slot->candidate_xmin_lsn = current_lsn;
+		slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn);
 	}
 	SpinLockRelease(&slot->mutex);
 
@@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/* also don't even consider going back for actual restart_lsn */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
 		/* our candidate can directly be used */
 		updated_lsn = true;
@@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			  LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid),
 			  LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn));
 
-		/* don't allow LSNs to go back */
-		Assert(slot->candidate_restart_valid <= current_lsn);
-		Assert(slot->candidate_restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_valid > current_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn");
 
-		/*
-		 * also don't even consider going back for actual restart_lsn
-		 * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
-		 */
-		Assert(slot->data.restart_lsn <= restart_lsn);
+		if (slot->candidate_restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn");
+
+		if (slot->data.restart_lsn > restart_lsn)
+			elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn");
+
+		slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn);
+		slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn));
 
-		slot->candidate_restart_valid = current_lsn;
-		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
@@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid),
 				LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn));
 
-			/* don't allow the LSN to go backwards */
-			Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn);
+			if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn)
+				elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn");
 
-			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
+			MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn,
+								  MyReplicationSlot->candidate_restart_lsn);
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
 			updated_restart = true;
-- 
2.39.5

Reply via email to