On 11/11/24 15:17, Tomas Vondra wrote: > On 11/11/24 14:51, Ashutosh Bapat wrote: >> ... >> >> I think the problem is about processing older running transactions >> record and setting data.restart_lsn based on the candidates those >> records produce. But what is not clear to me is how come a newer >> candidate_restart_lsn is available immediately upon WAL sender >> restart. I.e. in the sequence of events you mentioned in your first >> email >> 1. 344.139 LOG: starting logical decoding for slot "s1" >> >> 2. 344.139 DETAIL: Streaming transactions committing after 1/E97EAC30, >> reading WAL from 1/E96FB4D0. >> >> 3. 344.140 LOG: logical decoding found consistent point at 1/E96FB4D0 >> >> 4. 344.140 DETAIL: Logical decoding will begin using saved snapshot. >> >> 5. 344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398 >> >> 6. 344.140 LOG: LogicalConfirmReceivedLocation updating >> data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0) >> candidate_restart_valid 0/0 (from 1/E9865398) >> candidate_restart_lsn 0/0 (from 1/E979D4C8) >> >> how did candidate_restart_lsn = 1/E979D4C8 and candidate_restart_valid >> = 1/E9865398 were set in ReplicationSlot after WAL sender? It means it >> must have read and processed running transaction record at 1/E9865398. >> If that's true, how come it went back to a running transactions WAL >> record at 1/E979D4C8? It should be reading WAL records sequentially, >> hence read 1/E979D4C8 first then 1/E9865398. >> >> 344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1 >> candidate_restart_valid_lsn 1/E979D4C8 (0/0) >> candidate_restart_lsn 1/E96FB4D0 (0/0) >> > > Those are good questions, but IIUC that's explained by this comment from > Masahiko-san's analysis [1]: > > Thinking about the root cause more, it seems to me that the root cause > is not the fact that candidate_xxx values are not cleared when being > released. > > In the scenario I reproduced, after restarting the logical decoding, > the walsender sets the restart_lsn to a candidate_restart_lsn left in > the slot upon receiving the ack from the subscriber. ... > > If this is correct, then what happens is: > > 1) replication is running, at some point we set candidate LSN to B > > 2) something breaks, causing reconnect with restart LSN A (< B) > > 3) we still have the candidate LSN B in memory, and after receiving > some confirmation we set it as restart_lsn > > 4) we get to decode the RUNNING_XACTS, which moves restart_lsn back > > > If this analysis is correct, I think it's rather suspicious we don't > reset the candidate fields on restart. Can those "old" values ever be > valid? But I haven't tried resetting them. >
To clarify this a bit, I mean something like in the attached 0003 patch. The reasoning is that after ReplicationSlotAcquire() we should get the slot in the same state as if we just read it from disk. Because why not? Why should the result be different from what we'd get if the primary restated right before the reconnect? Parts 0001 and 0002 add a couple asserts to prevent backwards move for both the restart_lsn and the various candidate LSN fields. Both the 0003 and 0004 patches (applied separately) seems to fix crashes in my stress test, and none of the asserts from 0001+0002 seem to fail. I'm not sure if we need both fixes or just one of them. But neither of those fixes prevents backwards move for confirmed_flush LSN, as enforced by asserts in the 0005 patch. I don't know if this assert is incorrect or now. It seems natural that once we get a confirmation for some LSN, we can't move before that position, but I'm not sure about that. Maybe it's too strict. regards -- Tomas Vondra
From be7ba029036613df80562cc735a3040d6f760a90 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Sat, 9 Nov 2024 12:09:04 +0100 Subject: [PATCH 1/5] asserts on restart_lsn backwards move --- src/backend/replication/logical/logical.c | 3 +++ src/backend/replication/slot.c | 2 ++ 2 files changed, 5 insertions(+) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f7b2c85d9b..d76889664a8 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -376,6 +376,7 @@ CreateInitDecodingContext(const char *plugin, else { SpinLockAcquire(&slot->mutex); + Assert(slot->data.restart_lsn <= restart_lsn); slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); } @@ -1746,6 +1747,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); + Assert(MyReplicationSlot->data.restart_lsn <= MyReplicationSlot->candidate_restart_lsn); + MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 037a347cba0..01e854b4486 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1109,6 +1109,7 @@ ReplicationSlotReserveWal(void) /* start at current insert position */ restart_lsn = GetXLogInsertRecPtr(); SpinLockAcquire(&slot->mutex); + Assert(slot->data.restart_lsn <= restart_lsn); slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); @@ -1122,6 +1123,7 @@ ReplicationSlotReserveWal(void) { restart_lsn = GetRedoRecPtr(); SpinLockAcquire(&slot->mutex); + Assert(slot->data.restart_lsn <= restart_lsn); slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); } -- 2.39.5
From 39024b07c5bc8c3f43fba7a1368368c64a5b6f14 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Sat, 9 Nov 2024 12:09:42 +0100 Subject: [PATCH 2/5] asserts for candidate lsn fields --- src/backend/replication/logical/logical.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index d76889664a8..8fe77f8c463 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1599,6 +1599,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (current_lsn <= slot->data.confirmed_flush) { + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -1612,6 +1614,8 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) { + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; } @@ -1654,6 +1658,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ else if (current_lsn <= slot->data.confirmed_flush) { + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; @@ -1668,6 +1675,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ if (slot->candidate_restart_valid == InvalidXLogRecPtr) { + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); -- 2.39.5
From 2437880b414e61cbfd02dca7a5d70ca9853c8a51 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Mon, 11 Nov 2024 20:52:01 +0100 Subject: [PATCH 3/5] reset slot in acquire --- src/backend/replication/slot.c | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 01e854b4486..c398b91f40c 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -456,6 +456,30 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; + + /* + * Reset the time since the slot has become inactive as the slot is active + * now. + */ + SpinLockAcquire(&s->mutex); + + /* + * ReplicationSlotCreate would do this: + * s->effective_xmin = InvalidTransactionId; + * s->effective_catalog_xmin = InvalidTransactionId; + * s->last_saved_confirmed_flush = InvalidXLogRecPtr; + * + * But we do this more like RestoreSlotFromDisk, as if we loaded the + * slot from disk. + */ + s->effective_xmin = s->data.xmin; + s->effective_catalog_xmin = s->data.catalog_xmin; + + s->candidate_catalog_xmin = InvalidTransactionId; + s->candidate_xmin_lsn = InvalidXLogRecPtr; + s->candidate_restart_valid = InvalidXLogRecPtr; + s->candidate_restart_lsn = InvalidXLogRecPtr; + SpinLockRelease(&s->mutex); } /* -- 2.39.5
From e3028d078b680b007126c6941a49f3335147b775 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Mon, 11 Nov 2024 21:00:07 +0100 Subject: [PATCH 4/5] fix LogicalIncreaseRestartDecodingForSlot --- src/backend/replication/logical/logical.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8fe77f8c463..40ac9f43ce2 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1638,6 +1638,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart { bool updated_lsn = false; ReplicationSlot *slot; + bool spin_released = false; slot = MyReplicationSlot; @@ -1673,7 +1674,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart * might never end up updating if the receiver acks too slowly. A missed * value here will just cause some extra effort after reconnecting. */ - if (slot->candidate_restart_valid == InvalidXLogRecPtr) + else if (slot->candidate_restart_valid == InvalidXLogRecPtr) { Assert(slot->candidate_restart_valid <= current_lsn); Assert(slot->candidate_restart_lsn <= restart_lsn); @@ -1682,6 +1683,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); + spin_released = true; + elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(current_lsn)); @@ -1697,6 +1700,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart confirmed_flush = slot->data.confirmed_flush; SpinLockRelease(&slot->mutex); + spin_released = true; + elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X", LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(current_lsn), @@ -1705,6 +1710,9 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(confirmed_flush)); } + if (!spin_released) + SpinLockRelease(&slot->mutex); + /* candidates are already valid with the current flush position, apply */ if (updated_lsn) LogicalConfirmReceivedLocation(slot->data.confirmed_flush); -- 2.39.5
From 017a1fd52a125b4551a22444ae4d67889f20a0ac Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Mon, 11 Nov 2024 20:31:02 +0100 Subject: [PATCH 5/5] confirmed_flush asserts --- src/backend/replication/logical/logical.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 40ac9f43ce2..73d6149afde 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -605,7 +605,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) } SpinLockAcquire(&slot->mutex); + Assert(slot->data.confirmed_flush <= ctx->reader->EndRecPtr); slot->data.confirmed_flush = ctx->reader->EndRecPtr; + Assert(slot->data.initial_consistent_point <= ctx->reader->EndRecPtr); slot->data.initial_consistent_point = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } @@ -1735,6 +1737,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockAcquire(&MyReplicationSlot->mutex); + Assert(MyReplicationSlot->data.confirmed_flush <= lsn); MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ @@ -1802,6 +1805,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) else { SpinLockAcquire(&MyReplicationSlot->mutex); + Assert(MyReplicationSlot->data.confirmed_flush <= lsn); MyReplicationSlot->data.confirmed_flush = lsn; SpinLockRelease(&MyReplicationSlot->mutex); } -- 2.39.5