Hi, I've been investigating some issues reported by users, related to logical replication unexpectedly breaking with messages like:
LOG: invalidating slot "s" because its restart_lsn X/Y exceeds max_slot_wal_keep_size which is pretty confusing, because the system has that GUC set to -1 (so disabled, there should be no limit). Or a message like this: ERROR: requested WAL segment 000...0AA has already been removed which is a bit less confusing, but still puzzling because replication slots are meant to prevent exactly this. I speculated there's some sort of rare race condition, in how we advance the slot LSN values. I didn't know where to start, so I gave up on trying to understand the whole complex code. Instead, I wrote a simple stress test that 1) sets up a cluster (primary + 1+ logical replicas) 2) runs pgbench on the primary, checks replicas/slots 3) randomly restarts the nodes (both primary/replicas) with either fast and immediate mode, with random short pauses 4) runs checkpoints in tight while loop This is based on the observations that in past reports the issues seem to happen only with logical replication, shortly after reconnect (e.g. after connection reset etc.). And the slots are invalidated / WAL removed during a checkpoint, so frequent checkpoints should make it easier to hit ... Attached is a couple scripts running this - it's not particularly pretty and may need some tweak to make it work on your machine (run.sh is the script to run). And unfortunately, this started to fail pretty quick. The short story is that it's not difficult to get into a situation where restart_lsn for a slot moves backwards, so something like this can happen: 1) slot restart_lsn moves forward to LSN A 2) checkpoint happens, updates min required LSN for slots, recycles segments it considers unnecessary (up to LSN A) 3) slot restart_lsn moves backwards to LSN B (where B < A) 4) kaboom This would perfectly explain all the symptoms I mentioned earlier. The max_slot_wal_keep_size reference is confusing, but AFAIK it's just based on (reasonable) belief that LSN can't move backwards, and so the only reason for restart_lsn being before min required LSN is that this GUC kicked in. But the assumption does not hold :-( Now, why/how can this happen? I kept adding a elog(LOG) messages to all places updating LSNs for a slot, and asserts to fail if data.restart_lsn moves backwards. See the attached 0001 patch. An example for a failure (for the walsended backend that triggered the assert) looks like this: 344.139 LOG: starting logical decoding for slot "s1" 344.139 DETAIL: Streaming transactions committing after 1/E97EAC30, reading WAL from 1/E96FB4D0. 344.140 LOG: logical decoding found consistent point at 1/E96FB4D0 344.140 DETAIL: Logical decoding will begin using saved snapshot. 344.140 LOG: LogicalConfirmReceivedLocation 1/E9865398 344.140 LOG: LogicalConfirmReceivedLocation updating data.restart_lsn to 1/E979D4C8 (from 1/E96FB4D0) candidate_restart_valid 0/0 (from 1/E9865398) candidate_restart_lsn 0/0 (from 1/E979D4C8) 344.145 LOG: LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn 1/E96FB4D0 344.145 LOG: LogicalIncreaseRestartDecodingForSlot s1 candidate_restart_valid_lsn 1/E979D4C8 (0/0) candidate_restart_lsn 1/E96FB4D0 (0/0) 344.147 LOG: LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn 1/E979D4C8 344.156 LOG: LogicalIncreaseXminForSlot candidate_catalog_xmin 30699 candidate_xmin_lsn 1/E993AD68 (0/0) ... 344.235 LOG: LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn 1/E9F33AF8 344.240 LOG: LogicalConfirmReceivedLocation 1/E9DCCD78 344.240 LOG: LogicalConfirmReceivedLocation updating data.restart_lsn to 1/E96FB4D0 (from 1/E979D4C8) candidate_restart_valid 0/0 (from 1/E979D4C8) candidate_restart_lsn 0/0 (from 1/E96FB4D0) 345.536 LOG: server process (PID 2518127) was terminated by signal 6: Aborted We start decoding at 1/E96FB4D0, and right after startup we get a confirmation, and LogicalConfirmReceivedLocation updates restart_lsn to 1/E979D4C8. But then LogicalIncreaseRestartDecodingForSlot comes along, and stores the restart_decoding_lsn 1/E96FB4D0 (which is the initial restart_lsn) into candidate_restart_lsn. And then a little bit later we get another confirmation, we call LogicalConfirmReceivedLocation which propagates candidate_restart_lsn into data.restart_lsn. This is how restart_lsn moves backwards, causing issues. I've reproduced this on PG14 and current master, but I believe the issue exists since the introduction of logical replication in 9.4 :-( I'm not claiming this is the only way how this can happen, but all the cases I've seen in my stress testing look like this. Moreover, I'm not claiming this is the only LSN field that can move backwards like this. It seems to me various other candidate_ fields have the same issue, but may have consequences other than discarding "unnecessary" WAL. I've been removed of this [1] thread from 2022. I'm 99% sure it's the same issue - it happened shortly after a reconnect, etc. And it seems to me Masahiko-san was about the causes in [2]. I don't think the fix suggested in that message (changing the branch to "else if") can work, though. At least it did not really help in my testing. And I'm not sure it'd fix all the issues - it only affects restart_lsn, but AFAICS the same issue (LSNs moving backwards) can happen for the other LSN slot field (candidate_xmin_lsn). I don't see how the backwards move could be valid for any of those fields, and for "propagating" the candidate values into restart_lsn. But there's no protection against the backward moves - so either it's considered to be OK (which seems incorrect), or it was not expected to happen in practice. The 0001 patch adds an assert preventing those backward moves on all the fields. This means it fails with ABORT earlier, even before a checkpoint gets a chance to invalidate the slot or remove the segment. 0002 part replaces the asserts with elog(LOG), and instead tweaks the updates to do Max(old,new) to prevent the backward moves. With this I'm no longer able to reproduce the issue - and there's a lot of LOG messages about the (prevented) backward moves. Unfortunately, I'm not sure this is quite correct. Because consider for example this: slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); I suspect this means the fields could get "out of sync". Not sure what could break because of this. I have to admit the "protocol" for the candidate fields (and how the values propagate) is not very clear to me. And AFAICS it's not really described/explained anywhere :-( Note: While working on this, I realized PG14 and PG15 needs the fix eb27d3dc8887, which was backpatched only to 16+. But I hit that on 14 too during testing. I already pinged Daniel about this, but cherry-pick that before testing before he has time for that. regards [1] https://www.postgresql.org/message-id/Yz2hivgyjS1RfMKs%40depesz.com [2] https://www.postgresql.org/message-id/CAD21AoBVhYnGBuW_o%3DwEGgTp01qiHNAx1a14b1X9kFXmuBe%3Dsg%40mail.gmail.com -- Tomas Vondra
logrep-script.tgz
Description: application/compressed-tar
From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Wed, 6 Nov 2024 12:41:25 +0100 Subject: [PATCH vnew 1/2] WIP: add elog messages and asserts shows how the LSNs move, protects against move backwards --- src/backend/access/transam/xlog.c | 34 ++++++++++++ src/backend/replication/logical/logical.c | 60 +++++++++++++++++++++ src/backend/replication/logical/snapbuild.c | 6 +++ src/backend/replication/slot.c | 17 +++++- 4 files changed, 115 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f1a795bba9f..ba9415b9149 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) { + XLogRecPtr oldlsn; + SpinLockAcquire(&XLogCtl->info_lck); + oldlsn = XLogCtl->replicationSlotMinLSN; XLogCtl->replicationSlotMinLSN = lsn; SpinLockRelease(&XLogCtl->info_lck); + + elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn)); + + /* no backward moves */ + Assert(lsn >= oldlsn); } @@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void) retval = XLogCtl->replicationSlotMinLSN; SpinLockRelease(&XLogCtl->info_lck); + elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval)); + return retval; } @@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); + elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + + elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* @@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); + elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + /* * Make more log segments if needed. (Do this after recycling old log * segments, since that may supply some of the needed files.) @@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + + elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* @@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(endptr, &_logSegNo); + + elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); } _logSegNo--; + elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); + /* * Try to recycle segments on a useful timeline. If we've been promoted * since the beginning of this restartpoint, use the new timeline chosen @@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) } } + elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno); + /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) *logSegNo = segno; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f7b2c85d9b..16fed53063b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotReserveWal(); else { + XLogRecPtr oldlsn; SpinLockAcquire(&slot->mutex); + oldlsn = slot->data.restart_lsn; slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); + + elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s", + LSN_FORMAT_ARGS(restart_lsn), + LSN_FORMAT_ARGS(oldlsn), + NameStr(slot->data.name)); } /* ---- @@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (current_lsn <= slot->data.confirmed_flush) { + elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)", + xmin, LSN_FORMAT_ARGS(current_lsn), + LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) { + elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)", + xmin, LSN_FORMAT_ARGS(current_lsn), + LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; } @@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ else if (current_lsn <= slot->data.confirmed_flush) { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)", + NameStr(slot->data.name), + LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), + LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); + + /* don't allow LSNs to go back */ + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + + /* also don't even consider going back for actual restart_lsn */ + Assert(slot->data.restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; @@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ if (slot->candidate_restart_valid == InvalidXLogRecPtr) { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)", + NameStr(slot->data.name), + LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), + LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); + + /* don't allow LSNs to go back */ + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + + /* + * also don't even consider going back for actual restart_lsn + * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + */ + Assert(slot->data.restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); @@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { Assert(lsn != InvalidXLogRecPtr); + elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn)); + /* Do an unlocked check for candidate_lsn first. */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr || MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr) @@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); + elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)", + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid), + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn); + MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; @@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; SpinLockRelease(&MyReplicationSlot->mutex); + elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update"); ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7b761510f1e..05184516704 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * anything because we hadn't reached a consistent state yet. */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) + { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn)); LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + } /* * No in-progress transaction, can reuse the last serialized snapshot if @@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact else if (txn == NULL && builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) + { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot)); LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); + } } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 037a347cba0..8a7aa257293 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void) Assert(ReplicationSlotCtl != NULL); + elog(LOG, "ReplicationSlotsComputeRequiredLSN / start"); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void) restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); + elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn)); + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void) } LWLockRelease(ReplicationSlotControlLock); + elog(LOG, "ReplicationSlotsComputeRequiredLSN / done"); + XLogSetReplicationSlotMinimumLSN(min_required); } @@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotRelease(); ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)", NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); + LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb))); /* done with this slot for now */ break; @@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X", + oldestSegno, LSN_FORMAT_ARGS(oldestLSN)); + restart: LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) @@ -1327,6 +1336,7 @@ restart: if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) { /* if the lock was released, start from scratch */ + elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated); goto restart; } } @@ -1337,6 +1347,7 @@ restart: */ if (invalidated) { + elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required"); ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); } @@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name) slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; + elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn)); + slot->in_use = true; slot->active_pid = 0; -- 2.39.5
From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Wed, 6 Nov 2024 12:46:16 +0100 Subject: [PATCH vnew 2/2] WIP: change asserts to elog + defense When updating LSNs, use Max() with the preceding value to protect against moves backwards. --- src/backend/replication/logical/logical.c | 57 ++++++++++++----------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 16fed53063b..988f4add977 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); /* our candidate can directly be used */ updated_xmin = true; @@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); } SpinLockRelease(&slot->mutex); @@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* also don't even consider going back for actual restart_lsn */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); /* our candidate can directly be used */ updated_lsn = true; @@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* - * also don't even consider going back for actual restart_lsn - * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); - */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); + + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", @@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid), LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn)); - /* don't allow the LSN to go backwards */ - Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn); + if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn"); - MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; + MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn, + MyReplicationSlot->candidate_restart_lsn); MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; updated_restart = true; -- 2.39.5
From 8cd1b0472f1189be9ba19455df50fd1cc1f730e5 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Wed, 6 Nov 2024 12:41:25 +0100 Subject: [PATCH v14 1/2] WIP: add elog messages and asserts shows how the LSNs move, protects against move backwards --- src/backend/access/transam/xlog.c | 34 ++++++++++++ src/backend/replication/logical/logical.c | 60 +++++++++++++++++++++ src/backend/replication/logical/snapbuild.c | 6 +++ src/backend/replication/slot.c | 17 +++++- 4 files changed, 115 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f1a795bba9f..ba9415b9149 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2782,9 +2782,17 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn) { + XLogRecPtr oldlsn; + SpinLockAcquire(&XLogCtl->info_lck); + oldlsn = XLogCtl->replicationSlotMinLSN; XLogCtl->replicationSlotMinLSN = lsn; SpinLockRelease(&XLogCtl->info_lck); + + elog(LOG, "XLogSetReplicationSlotMinimumLSN set %X/%X (old %X/%X)", LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(oldlsn)); + + /* no backward moves */ + Assert(lsn >= oldlsn); } @@ -2801,6 +2809,8 @@ XLogGetReplicationSlotMinimumLSN(void) retval = XLogCtl->replicationSlotMinLSN; SpinLockRelease(&XLogCtl->info_lck); + elog(LOG, "XLogGetReplicationSlotMinimumLSN got %X/%X", LSN_FORMAT_ARGS(retval)); + return retval; } @@ -9462,12 +9472,19 @@ CreateCheckPoint(int flags) if (PriorRedoPtr != InvalidXLogRecPtr) UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr); + elog(LOG, "CreateCheckPoint / slot invalidation start / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + + elog(LOG, "CreateCheckPoint / slot invalidation after KeepLogSeg / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* @@ -9476,10 +9493,15 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); + elog(LOG, "CreateCheckPoint / slot invalidation recalculate / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr); + elog(LOG, "CreateCheckPoint / slot invalidation done / RedoRecPtr %X/%X recptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(recptr), _logSegNo); + /* * Make more log segments if needed. (Do this after recycling old log * segments, since that may supply some of the needed files.) @@ -9877,6 +9899,10 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); + + elog(LOG, "CreateRestartPoint / slot invalidation start / RedoRecPtr %X/%X receivePtr %X/%X replayPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(receivePtr), LSN_FORMAT_ARGS(replayPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); + if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* @@ -9885,9 +9911,15 @@ CreateRestartPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(endptr, &_logSegNo); + + elog(LOG, "CreateRestartPoint / slot invalidation recalculate / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); } _logSegNo--; + elog(LOG, "CreateRestartPoint / slot invalidation done / RedoRecPtr %X/%X endptr %X/%X _logSegNo %lu", + LSN_FORMAT_ARGS(RedoRecPtr), LSN_FORMAT_ARGS(endptr), _logSegNo); + /* * Try to recycle segments on a useful timeline. If we've been promoted * since the beginning of this restartpoint, use the new timeline chosen @@ -10109,6 +10141,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) } } + elog(LOG, "KeepLogSeg recptr %X/%X logSegNo %lu segno %lu", LSN_FORMAT_ARGS(recptr), *logSegNo, segno); + /* don't delete WAL segments newer than the calculated segment */ if (segno < *logSegNo) *logSegNo = segno; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f7b2c85d9b..16fed53063b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -375,9 +375,16 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotReserveWal(); else { + XLogRecPtr oldlsn; SpinLockAcquire(&slot->mutex); + oldlsn = slot->data.restart_lsn; slot->data.restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); + + elog(LOG, "CreateInitDecodingContext updated restart_lsn %X/%X (old %X/%X) for slot %s", + LSN_FORMAT_ARGS(restart_lsn), + LSN_FORMAT_ARGS(oldlsn), + NameStr(slot->data.name)); } /* ---- @@ -1598,6 +1605,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (current_lsn <= slot->data.confirmed_flush) { + elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)", + xmin, LSN_FORMAT_ARGS(current_lsn), + LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -1611,6 +1625,13 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) */ else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr) { + elog(LOG, "LogicalIncreaseXminForSlot candidate_catalog_xmin %u candidate_xmin_lsn %X/%X (%X/%X)", + xmin, LSN_FORMAT_ARGS(current_lsn), + LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(slot->candidate_xmin_lsn <= current_lsn); + slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; } @@ -1653,6 +1674,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ else if (current_lsn <= slot->data.confirmed_flush) { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)", + NameStr(slot->data.name), + LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), + LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); + + /* don't allow LSNs to go back */ + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + + /* also don't even consider going back for actual restart_lsn */ + Assert(slot->data.restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; @@ -1667,6 +1700,21 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart */ if (slot->candidate_restart_valid == InvalidXLogRecPtr) { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot %s candidate_restart_valid_lsn %X/%X (%X/%X) candidate_restart_lsn %X/%X (%X/%X)", + NameStr(slot->data.name), + LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), + LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); + + /* don't allow LSNs to go back */ + Assert(slot->candidate_restart_valid <= current_lsn); + Assert(slot->candidate_restart_lsn <= restart_lsn); + + /* + * also don't even consider going back for actual restart_lsn + * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + */ + Assert(slot->data.restart_lsn <= restart_lsn); + slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); @@ -1707,6 +1755,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { Assert(lsn != InvalidXLogRecPtr); + elog(LOG, "LogicalConfirmReceivedLocation %X/%X", LSN_FORMAT_ARGS(lsn)); + /* Do an unlocked check for candidate_lsn first. */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr || MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr) @@ -1746,6 +1796,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr); + elog(LOG, "LogicalConfirmReceivedLocation updating data.restart_lsn to %X/%X (from %X/%X) candidate_restart_valid 0/0 (from %X/%X) candidate_restart_lsn 0/0 (from %X/%X)", + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid), + LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn)); + + /* don't allow the LSN to go backwards */ + Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn); + MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; @@ -1774,6 +1833,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; SpinLockRelease(&MyReplicationSlot->mutex); + elog(LOG, "LogicalConfirmReceivedLocation calculating required LSN/xmin after update"); ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7b761510f1e..05184516704 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1270,7 +1270,10 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * anything because we hadn't reached a consistent state yet. */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) + { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot restart_decoding_lsn %X/%X", LSN_FORMAT_ARGS(txn->restart_decoding_lsn)); LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + } /* * No in-progress transaction, can reuse the last serialized snapshot if @@ -1279,8 +1282,11 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact else if (txn == NULL && builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) + { + elog(LOG, "LogicalIncreaseRestartDecodingForSlot last_serialized_snapshot %X/%X", LSN_FORMAT_ARGS(builder->last_serialized_snapshot)); LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); + } } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 037a347cba0..8a7aa257293 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -827,6 +827,8 @@ ReplicationSlotsComputeRequiredLSN(void) Assert(ReplicationSlotCtl != NULL); + elog(LOG, "ReplicationSlotsComputeRequiredLSN / start"); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -840,6 +842,8 @@ ReplicationSlotsComputeRequiredLSN(void) restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); + elog(LOG, "ReplicationSlotsComputeRequiredLSN found slot %s with restart_lsn %X/%X", NameStr(s->data.name), LSN_FORMAT_ARGS(restart_lsn)); + if (restart_lsn != InvalidXLogRecPtr && (min_required == InvalidXLogRecPtr || restart_lsn < min_required)) @@ -847,6 +851,8 @@ ReplicationSlotsComputeRequiredLSN(void) } LWLockRelease(ReplicationSlotControlLock); + elog(LOG, "ReplicationSlotsComputeRequiredLSN / done"); + XLogSetReplicationSlotMinimumLSN(min_required); } @@ -1285,9 +1291,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotRelease(); ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size (%d)", NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); + LSN_FORMAT_ARGS(restart_lsn), max_slot_wal_keep_size_mb))); /* done with this slot for now */ break; @@ -1315,6 +1321,9 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + elog(LOG, "InvalidateObsoleteReplicationSlots oldestSegno %lu oldestLSN %X/%X", + oldestSegno, LSN_FORMAT_ARGS(oldestLSN)); + restart: LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) @@ -1327,6 +1336,7 @@ restart: if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) { /* if the lock was released, start from scratch */ + elog(LOG, "InvalidateObsoleteReplicationSlots restart invalidated=%d", invalidated); goto restart; } } @@ -1337,6 +1347,7 @@ restart: */ if (invalidated) { + elog(LOG, "InvalidateObsoleteReplicationSlots slot invalidated, recalculate required"); ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); } @@ -1854,6 +1865,8 @@ RestoreSlotFromDisk(const char *name) slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; + elog(LOG, "RestoreSlotFromDisk restart_lsn %X/%X candidate_xmin_lsn 0/0 candidate_restart_lsn 0/0 candidate_restart_valid 0/0", LSN_FORMAT_ARGS(slot->data.restart_lsn)); + slot->in_use = true; slot->active_pid = 0; -- 2.39.5
From 81981f60949ab4c0fb9418b17dfcc37c0aa063f1 Mon Sep 17 00:00:00 2001 From: tomas <tomas> Date: Wed, 6 Nov 2024 12:46:16 +0100 Subject: [PATCH v14 2/2] WIP: change asserts to elog + defense When updating LSNs, use Max() with the preceding value to protect against moves backwards. --- src/backend/replication/logical/logical.c | 57 ++++++++++++----------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 16fed53063b..988f4add977 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1609,11 +1609,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); /* our candidate can directly be used */ updated_xmin = true; @@ -1629,11 +1630,12 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) xmin, LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_xmin_lsn)); - /* don't allow the LSN to go backwards */ - Assert(slot->candidate_xmin_lsn <= current_lsn); + if (slot->candidate_xmin_lsn > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseXminForSlot: slot->candidate_xmin_lsn > current_lsn"); + /* XXX doesn't the Max() put the fields out of sync? */ slot->candidate_catalog_xmin = xmin; - slot->candidate_xmin_lsn = current_lsn; + slot->candidate_xmin_lsn = Max(slot->candidate_xmin_lsn, current_lsn); } SpinLockRelease(&slot->mutex); @@ -1679,15 +1681,17 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* also don't even consider going back for actual restart_lsn */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); /* our candidate can directly be used */ updated_lsn = true; @@ -1705,18 +1709,18 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart LSN_FORMAT_ARGS(current_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_valid), LSN_FORMAT_ARGS(restart_lsn), LSN_FORMAT_ARGS(slot->candidate_restart_lsn)); - /* don't allow LSNs to go back */ - Assert(slot->candidate_restart_valid <= current_lsn); - Assert(slot->candidate_restart_lsn <= restart_lsn); + if (slot->candidate_restart_valid > current_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_valid > current_lsn"); - /* - * also don't even consider going back for actual restart_lsn - * XXX This is the first assert we hit, called from snapbuild.c: LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); - */ - Assert(slot->data.restart_lsn <= restart_lsn); + if (slot->candidate_restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->candidate_restart_lsn > restart_lsn"); + + if (slot->data.restart_lsn > restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > restart_lsn"); + + slot->candidate_restart_valid = Max(slot->candidate_restart_valid, current_lsn); + slot->candidate_restart_lsn = Max(slot->data.restart_lsn, Max(slot->candidate_restart_lsn, restart_lsn)); - slot->candidate_restart_valid = current_lsn; - slot->candidate_restart_lsn = restart_lsn; SpinLockRelease(&slot->mutex); elog(DEBUG1, "got new restart lsn %X/%X at %X/%X", @@ -1802,10 +1806,11 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_valid), LSN_FORMAT_ARGS(MyReplicationSlot->candidate_restart_lsn)); - /* don't allow the LSN to go backwards */ - Assert(MyReplicationSlot->candidate_restart_lsn >= MyReplicationSlot->data.restart_lsn); + if (MyReplicationSlot->candidate_restart_lsn < MyReplicationSlot->data.restart_lsn) + elog(LOG, "ASSERT: LogicalIncreaseRestartDecodingForSlot: slot->data.restart_lsn > MyReplicationSlot->candidate_restart_lsn"); - MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn; + MyReplicationSlot->data.restart_lsn = Max(MyReplicationSlot->data.restart_lsn, + MyReplicationSlot->candidate_restart_lsn); MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr; MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr; updated_restart = true; -- 2.39.5