On 11/8/24 15:57, Ashutosh Bapat wrote:
> ...
> 
> After examining the code before reading [2], I came to the same
> conclusion as Masahiko-san in [2]. We install candidate_restart_lsn
> based on the running transaction record whose LSN is between
> restart_lsn and confirmed_flush_lsn. Since candidate_restart_valid of
> such candidates would be lesser than any confirmed_flush_lsn received
> from downstream. I am surprised that the fix suggested by Masahiko-san
> didn't work though. The fix also fix the asymmetry, between
> LogicalIncreaseXminForSlot and LogicalIncreaseRestartDecodingForSlot,
> that you have pointed out in your next email. What behaviour do you
> see with that fix applied?
> 
> 
> [1] https://www.postgresql.org/message-id/Yz2hivgyjS1RfMKs%40depesz.com
> [2] 
> https://www.postgresql.org/message-id/CAD21AoBVhYnGBuW_o%3DwEGgTp01qiHNAx1a14b1X9kFXmuBe%3Dsg%40mail.gmail.com
> 
> 

I read that message (and the earlier discussion multiple times) while
investigating the issue, and TBH it's not very clear to me what the
conclusion is :-(

There's some discussion about whether the candidate fields should be
reset on release or not. There are even claims that it might be
legitimate to not reset the fields and update the restart_lsn. Using
such "stale" LSN values seems rather suspicious to me, but I don't have
a proof that it's incorrect. FWIW this unclarity is what I mentioned the
policy/contract for candidate fields is not explained anywhere.

That being said, I gave that fix a try - see the attached 0001 patch. It
tweaks LogicalIncreaseRestartDecodingForSlot (it needs a bit more care
because of the spinlock), and it adds a couple asserts to make sure the
data.restart_lsn never moves back.

And indeed, with this my stress test script does not crash anymore.

But is that really correct? The lack of failure in one specific test
does not really prove that. And then again - why should it be OK for the
other candidate fields to move backwards? Isn't that suspicious? It sure
seems counter-intuitive to me, and I'm not sure the code expects that.

So in 0002 I added a couple more asserts to make sure the LSN fields
only move forward, and those *do* continue to fail, and in some cases
the amount by which the fields move back are pretty significant
(multiple megabytes).

Maybe it's fine if this "backwards move" never propagates to e.g.
"restart_lsn", not sure. And I'm not sure which other fields should not
move backwards (what about data.confirm_flush for example?).


regards

-- 
Tomas Vondra
From a81c50bbb8b7384a48752316468c59ab5faf9114 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Sat, 9 Nov 2024 12:14:19 +0100
Subject: [PATCH 1/2] restart_lsn backwards move - fix and asserts

---
 src/backend/replication/logical/logical.c | 13 ++++++++++++-
 src/backend/replication/slot.c            |  1 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e9..28d70e732d5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -391,6 +391,7 @@ CreateInitDecodingContext(const char *plugin,
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 	}
@@ -1762,6 +1763,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 {
 	bool		updated_lsn = false;
 	ReplicationSlot *slot;
+	bool		spin_released = false;
 
 	slot = MyReplicationSlot;
 
@@ -1794,12 +1796,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 * might never end up updating if the receiver acks too slowly. A missed
 	 * value here will just cause some extra effort after reconnecting.
 	 */
-	if (slot->candidate_restart_valid == InvalidXLogRecPtr)
+	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn));
@@ -1815,6 +1819,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 		confirmed_flush = slot->data.confirmed_flush;
 		SpinLockRelease(&slot->mutex);
 
+		spin_released = true;
+
 		elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
 			 LSN_FORMAT_ARGS(restart_lsn),
 			 LSN_FORMAT_ARGS(current_lsn),
@@ -1823,6 +1829,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 			 LSN_FORMAT_ARGS(confirmed_flush));
 	}
 
+	if (!spin_released)
+		SpinLockRelease(&slot->mutex);
+
 	/* candidates are already valid with the current flush position, apply */
 	if (updated_lsn)
 		LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
@@ -1875,6 +1884,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		{
 			Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
 
+			Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn);
+
 			MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
 			MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
 			MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 6828100cf1a..0f38ccc8353 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1468,6 +1468,7 @@ ReplicationSlotReserveWal(void)
 			restart_lsn = GetXLogInsertRecPtr();
 
 		SpinLockAcquire(&slot->mutex);
+		Assert(slot->data.restart_lsn <= restart_lsn);
 		slot->data.restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
 
-- 
2.47.0

From 380dcc26d5b4db75233938130057d66547ddeae5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Sat, 9 Nov 2024 12:14:49 +0100
Subject: [PATCH 2/2] asserts for candidate lsn fields

---
 src/backend/replication/logical/logical.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 28d70e732d5..fe030d147c6 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1718,6 +1718,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1731,6 +1733,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
 	 */
 	else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_xmin_lsn <= current_lsn);
+
 		slot->candidate_catalog_xmin = xmin;
 		slot->candidate_xmin_lsn = current_lsn;
 
@@ -1784,6 +1788,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (current_lsn <= slot->data.confirmed_flush)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 
@@ -1798,6 +1805,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
 	 */
 	else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
 	{
+		Assert(slot->candidate_restart_valid <= current_lsn);
+		Assert(slot->candidate_restart_lsn <= restart_lsn);
+
 		slot->candidate_restart_valid = current_lsn;
 		slot->candidate_restart_lsn = restart_lsn;
 		SpinLockRelease(&slot->mutex);
-- 
2.47.0

Reply via email to