Thanks for noticing it. Sorry. Attached.

On Thu, Nov 14, 2024 at 12:04 PM Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Thu, Nov 14, 2024 at 11:45:56AM +0530, Ashutosh Bapat wrote:
> > Here's a quick and dirty patch which describes the idea. I didn't get
> > time to implement code to move SnapBuild::restart_lsn if
> > SnapBuild::start_decoding_at moves forward while building initial
> > snapshot. I am not sure whether that's necessary either.
> >
> > I have added three elogs to see if the logic is working as expected. I
> > see two of the elogs in patch in the server log when I run tests from
> > tests/subscription and tests/recovery. But I do not see the third one.
> > That either means that the situation causing the bug is not covered by
> > those tests or the fix is not triggered. If you run your reproduction
> > and still see the crashes please provide the output of those elog
> > messages along with the rest of the elogs you have added.
>
> Forgot the attachment, perhaps?
> --
> Michael



-- 
Best Wishes,
Ashutosh Bapat
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3fe1774a1e9..e42fe52de39 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -147,7 +147,7 @@ CheckLogicalDecodingRequirements(void)
  * CreateDecodingContext() performing common tasks.
  */
 static LogicalDecodingContext *
-StartupDecodingContext(List *output_plugin_options,
+StartupDecodingContext(List *output_plugin_options, XLogRecPtr restart_lsn,
 					   XLogRecPtr start_lsn,
 					   TransactionId xmin_horizon,
 					   bool need_full_snapshot,
@@ -212,7 +212,7 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
-		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
+		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, restart_lsn, start_lsn,
 								need_full_snapshot, in_create, slot->data.two_phase_at);
 
 	ctx->reorder->private_data = ctx;
@@ -438,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, restart_lsn, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false, true,
 								 xl_routine, prepare_write, do_write,
 								 update_progress);
@@ -591,7 +591,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 		start_lsn = slot->data.confirmed_flush;
 	}
 
-	ctx = StartupDecodingContext(output_plugin_options,
+	ctx = StartupDecodingContext(output_plugin_options, slot->data.restart_lsn,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, false, xl_routine, prepare_write,
 								 do_write, update_progress);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index a6a4da32668..62f92b51917 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -184,6 +184,7 @@ static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *
 SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
+						XLogRecPtr restart_lsn,
 						XLogRecPtr start_lsn,
 						bool need_full_snapshot,
 						bool in_slot_creation,
@@ -217,6 +218,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
+	builder->restart_lsn = restart_lsn;
 	builder->in_slot_creation = in_slot_creation;
 	builder->building_full_snapshot = need_full_snapshot;
 	builder->two_phase_at = two_phase_at;
@@ -1163,7 +1165,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * anything because we hadn't reached a consistent state yet.
 	 */
 	if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
-		LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+	{
+		elog(LOG, "running xact record at %X/%X, builder::restart_lsn = %X/%X, restart_decoding_lsn of txn (%d): %X/%X",
+			LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->restart_lsn), txn->xid, LSN_FORMAT_ARGS(txn->restart_decoding_lsn));
+
+		if (txn->restart_decoding_lsn >= builder->restart_lsn)
+			LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+		else
+			elog(LOG, "skipped candidates generated by running transaction record at %X/%X",
+				LSN_FORMAT_ARGS(lsn));
+
+	}
 
 	/*
 	 * No in-progress transaction, can reuse the last serialized snapshot if
@@ -1172,8 +1184,18 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	else if (txn == NULL &&
 			 builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 			 builder->last_serialized_snapshot != InvalidXLogRecPtr)
-		LogicalIncreaseRestartDecodingForSlot(lsn,
+	{
+
+		elog(LOG, "running xact record at %X/%X, last serialized snapshot (LSN) %X/%X, builder::restart_lsn %X/%X, builder::rb::current_restart_decoding_lsn %X/%X",
+			LSN_FORMAT_ARGS(lsn), LSN_FORMAT_ARGS(builder->last_serialized_snapshot), LSN_FORMAT_ARGS(builder->restart_lsn), LSN_FORMAT_ARGS(builder->reorder->current_restart_decoding_lsn));
+
+		if (builder->last_serialized_snapshot >= builder->restart_lsn)
+			LogicalIncreaseRestartDecodingForSlot(lsn,
 											  builder->last_serialized_snapshot);
+		else
+			elog(LOG, "skipped candidates generated by running transaction record at %X/%X",
+					LSN_FORMAT_ARGS(lsn));
+	}
 }
 
 
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 3c1454df993..f9a5cc7bab2 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -64,7 +64,8 @@ struct xl_running_xacts;
 extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
-										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
+										  TransactionId xmin_horizon, XLogRecPtr restart_lsn,
+										  XLogRecPtr start_lsn,
 										  bool need_full_snapshot,
 										  bool in_slot_creation,
 										  XLogRecPtr two_phase_at);
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
index 1e295c75076..9d912ea87bb 100644
--- a/src/include/replication/snapbuild_internal.h
+++ b/src/include/replication/snapbuild_internal.h
@@ -43,6 +43,13 @@ struct SnapBuild
 	 */
 	XLogRecPtr	start_decoding_at;
 
+	/*
+	 * LSN from which the WAL reader will start reading WAL. Ignore
+	 * candidate_restart_lsn before this values produced by running transaction
+	 * log records.
+	 */
+	XLogRecPtr restart_lsn;
+
 	/*
 	 * LSN at which two-phase decoding was enabled or LSN at which we found a
 	 * consistent point at the time of slot creation.

Reply via email to