Thanks for noticing it. Sorry. Attached. On Thu, Nov 14, 2024 at 12:04 PM Michael Paquier <mich...@paquier.xyz> wrote: > > On Thu, Nov 14, 2024 at 11:45:56AM +0530, Ashutosh Bapat wrote: > > Here's a quick and dirty patch which describes the idea. I didn't get > > time to implement code to move SnapBuild::restart_lsn if > > SnapBuild::start_decoding_at moves forward while building initial > > snapshot. I am not sure whether that's necessary either. > > > > I have added three elogs to see if the logic is working as expected. I > > see two of the elogs in patch in the server log when I run tests from > > tests/subscription and tests/recovery. But I do not see the third one. > > That either means that the situation causing the bug is not covered by > > those tests or the fix is not triggered. If you run your reproduction > > and still see the crashes please provide the output of those elog > > messages along with the rest of the elogs you have added. > > Forgot the attachment, perhaps? > -- > Michael
-- Best Wishes, Ashutosh Bapat
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3fe1774a1e9..e42fe52de39 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -147,7 +147,7 @@ CheckLogicalDecodingRequirements(void) * CreateDecodingContext() performing common tasks. */ static LogicalDecodingContext * -StartupDecodingContext(List *output_plugin_options, +StartupDecodingContext(List *output_plugin_options, XLogRecPtr restart_lsn, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, @@ -212,7 +212,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = - AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, + AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, restart_lsn, start_lsn, need_full_snapshot, in_create, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -438,7 +438,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, restart_lsn, xmin_horizon, need_full_snapshot, false, true, xl_routine, prepare_write, do_write, update_progress); @@ -591,7 +591,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } - ctx = StartupDecodingContext(output_plugin_options, + ctx = StartupDecodingContext(output_plugin_options, slot->data.restart_lsn, start_lsn, InvalidTransactionId, false, fast_forward, false, xl_routine, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index a6a4da32668..62f92b51917 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -184,6 +184,7 @@ static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char * SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, + XLogRecPtr restart_lsn, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, @@ -217,6 +218,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; + builder->restart_lsn = restart_lsn; builder->in_slot_creation = in_slot_creation; builder->building_full_snapshot = need_full_snapshot; builder->two_phase_at = two_phase_at; @@ -1163,7 +1165,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact * anything because we hadn't reached a consistent state yet. */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) - LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + { + elog(LOG, "running xact record at %X/%X, builder::restart_lsn = %X/%X, restart_decoding_lsn of txn (%d): %X/%X", + LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->restart_lsn), txn->xid, LSN_FORMAT_ARGS(txn->restart_decoding_lsn)); + + if (txn->restart_decoding_lsn >= builder->restart_lsn) + LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + else + elog(LOG, "skipped candidates generated by running transaction record at %X/%X", + LSN_FORMAT_ARGS(lsn)); + + } /* * No in-progress transaction, can reuse the last serialized snapshot if @@ -1172,8 +1184,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact else if (txn == NULL && builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) - LogicalIncreaseRestartDecodingForSlot(lsn, + { + + elog(LOG, "running xact record at %X/%X, last serialized snapshot (LSN) %X/%X, builder::restart_lsn %X/%X, builder::rb::current_restart_decoding_lsn %X/%X", + LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->last_serialized_snapshot), LSN_FORMAT_ARGS(builder->restart_lsn), LSN_FORMAT_ARGS(builder->reorder->current_restart_decoding_lsn)); + + if (builder->last_serialized_snapshot >= builder->restart_lsn) + LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); + else + elog(LOG, "skipped candidates generated by running transaction record at %X/%X", + LSN_FORMAT_ARGS(lsn)); + } } diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 3c1454df993..f9a5cc7bab2 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -64,7 +64,8 @@ struct xl_running_xacts; extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder, - TransactionId xmin_horizon, XLogRecPtr start_lsn, + TransactionId xmin_horizon, XLogRecPtr restart_lsn, + XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at); diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h index 1e295c75076..9d912ea87bb 100644 --- a/src/include/replication/snapbuild_internal.h +++ b/src/include/replication/snapbuild_internal.h @@ -43,6 +43,13 @@ struct SnapBuild */ XLogRecPtr start_decoding_at; + /* + * LSN from which the WAL reader will start reading WAL. Ignore + * candidate_restart_lsn before this values produced by running transaction + * log records. + */ + XLogRecPtr restart_lsn; + /* * LSN at which two-phase decoding was enabled or LSN at which we found a * consistent point at the time of slot creation.