From a3220cf4d879c3cd042e5f754ecfa9259a1f562b Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 28 Mar 2024 20:19:22 +0800
Subject: [PATCH v4] advance the restart_lsn of synced slots using logical
 decoding

Since we don't perform logical decoding for the synced slots when syncing the
lsn/xmin of slot, no logical snapshots will be serialized to disk. So, when
user starts to use these synced slots after promotion, it needs to re-build the
consistent snapshot from the restart_lsn if the WAL(xl_running_xacts) at
restart_lsn position indicates that there are running transactions. This
however could cause the data that before the consistent point to be missed.

To address the issue, if there is no existing serialized snapshot at the
syncing restart_lsn, we progress the lsn/xmin of the slot on the standby using
fast-forward logical decoding. This approach allows us to determine if the
synced slot can reach a consistent state, ensuring that the required snapshot
is saved to disk during decoding. Slots that achieve consistency are marked as
sync-ready.
---
 src/backend/replication/logical/logical.c     | 148 +++++++++++++++++-
 src/backend/replication/logical/slotsync.c    | 125 ++++++++++-----
 src/backend/replication/logical/snapbuild.c   |  23 +++
 src/backend/replication/slotfuncs.c           | 118 +-------------
 src/include/replication/logical.h             |   2 +
 src/include/replication/snapbuild.h           |   2 +
 .../t/040_standby_failover_slots_sync.pl      |  23 ++-
 7 files changed, 271 insertions(+), 170 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..bbc7cdaf50 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -516,17 +517,24 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
+	/*
+	 * Do not allow decoding if the replication slot belongs to a different
+	 * database unless we are in fast-forward mode. In fast-forward mode, we
+	 * ignore storage-level changes and do not need to access the database
+	 * object.
+	 */
+	if (slot->data.database != MyDatabaseId && !fast_forward)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
 	/*
-	 * Do not allow consumption of a "synchronized" slot until the standby
-	 * gets promoted.
+	 * Do not allow consumption of a "synchronized" slot until the standby gets
+	 * promoted unless we are syncing replication slots, in which case we need
+	 * to advance the LSN and xmin of the slot during decoding.
 	 */
-	if (RecoveryInProgress() && slot->data.synced)
+	if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
 		ereport(ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2042,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
 
 	return has_pending_wal;
 }
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *ready_for_decoding will be set to true if the logical decoding reaches
+ * the consistent point; Otherwise, it will be set to false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
+											   bool *ready_for_decoding)
+{
+	LogicalDecodingContext *ctx;
+	ResourceOwner old_resowner = CurrentResourceOwner;
+	XLogRecPtr	retlsn;
+
+	Assert(moveto != InvalidXLogRecPtr);
+
+	if (ready_for_decoding)
+		*ready_for_decoding = false;
+
+	PG_TRY();
+	{
+		/*
+		 * Create our decoding context in fast_forward mode, passing start_lsn
+		 * as InvalidXLogRecPtr, so that we start processing from my slot's
+		 * confirmed_flush.
+		 */
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									NIL,
+									true,	/* fast_forward */
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
+
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
+		/*
+		 * Start reading at the slot's restart_lsn, which we know to point to
+		 * a valid record.
+		 */
+		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+		/* invalidate non-timetravel entries */
+		InvalidateSystemCaches();
+
+		/* Decode records until we reach the requested target */
+		while (ctx->reader->EndRecPtr < moveto)
+		{
+			char	   *errm = NULL;
+			XLogRecord *record;
+
+			/*
+			 * Read records.  No changes are generated in fast_forward mode,
+			 * but snapbuilder/slot statuses are updated properly.
+			 */
+			record = XLogReadRecord(ctx->reader, &errm);
+			if (errm)
+				elog(ERROR, "could not find record while advancing replication slot: %s",
+					 errm);
+
+			/*
+			 * Process the record.  Storage-level changes are ignored in
+			 * fast_forward mode, but other modules (such as snapbuilder)
+			 * might still have critical updates to do.
+			 */
+			if (record)
+				LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		if (DecodingContextReady(ctx) && ready_for_decoding)
+			*ready_for_decoding = true;
+
+		/*
+		 * Logical decoding could have clobbered CurrentResourceOwner during
+		 * transaction management, so restore the executor's value.  (This is
+		 * a kluge, but it's not worth cleaning up right now.)
+		 */
+		CurrentResourceOwner = old_resowner;
+
+		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+		{
+			LogicalConfirmReceivedLocation(moveto);
+
+			/*
+			 * If only the confirmed_flush LSN has changed the slot won't get
+			 * marked as dirty by the above. Callers on the walsender
+			 * interface are expected to keep track of their own progress and
+			 * don't need it written out. But SQL-interface users cannot
+			 * specify their own start positions and it's harder for them to
+			 * keep track of their progress, so we should make more of an
+			 * effort to save it for them.
+			 *
+			 * Dirty the slot so it is written out at the next checkpoint. The
+			 * LSN position advanced to may still be lost on a crash but this
+			 * makes the data consistent after a clean shutdown.
+			 */
+			ReplicationSlotMarkDirty();
+		}
+
+		retlsn = MyReplicationSlot->data.confirmed_flush;
+
+		/* free context, call shutdown callback */
+		FreeDecodingContext(ctx);
+
+		InvalidateSystemCaches();
+	}
+	PG_CATCH();
+	{
+		/* clear all timetravel entries */
+		InvalidateSystemCaches();
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return retlsn;
+}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..886a9fcc7e 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,12 @@
  * which slot sync worker can perform the sync periodically or user can call
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn, they would become unreliable after promotion due to potential
+ * data loss from changes before reaching a consistent point. So, we mark such
+ * slots as RS_TEMPORARY. Once they successfully reach the consistent point,
+ * they will be marked to RS_PERSISTENT.
+ *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +55,9 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
-#include "replication/slot.h"
+#include "replication/logical.h"
 #include "replication/slotsync.h"
+#include "replication/snapbuild.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -147,50 +154,83 @@ static void slotsync_failure_callback(int code, Datum arg);
  *
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
+ *
+ * If the LSN of the slot is modified, the ready_for_decoding will be set to
+ * true if the slot can reach a consistent point; otherwise, it will be set to
+ * false.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+						 bool *ready_for_decoding)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		xmin_changed;
-	bool		restart_lsn_changed;
-	NameData	plugin_name;
+	bool		slot_updated = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-	xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
-	restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
+		remote_slot->restart_lsn != slot->data.restart_lsn ||
+		remote_slot->catalog_xmin != slot->data.catalog_xmin)
+	{
+		if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
+		{
+			/*
+			 * Update the slot info directly if there is a serialized snapshot
+			 * at the restart_lsn, as the slot can quickly reach consistency at
+			 * restart_lsn by restoring the snapshot.
+			 */
+			SpinLockAcquire(&slot->mutex);
+			slot->data.restart_lsn = remote_slot->restart_lsn;
+			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+			slot->data.catalog_xmin = remote_slot->catalog_xmin;
+			slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+			SpinLockRelease(&slot->mutex);
 
-	if (!xmin_changed &&
-		!restart_lsn_changed &&
-		remote_dbid == slot->data.database &&
-		remote_slot->two_phase == slot->data.two_phase &&
-		remote_slot->failover == slot->data.failover &&
-		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
-		return false;
+			if (ready_for_decoding)
+				*ready_for_decoding = true;
+		}
+		else
+		{
+			/*
+			 * By advancing the restart_lsn, confirmed_lsn, and xmin using
+			 * fast-forward logical decoding, we can verify whether a
+			 * consistent snapshot can be built. This process also involves
+			 * saving necessary snapshots to disk during decoding, ensuring
+			 * that logical decoding efficiently reaches a consistent point at
+			 * the restart_lsn without the potential loss of data during
+			 * snapshot creation.
+			 */
+			LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn,
+														   ready_for_decoding);
+		}
 
-	/* Avoid expensive operations while holding a spinlock. */
-	namestrcpy(&plugin_name, remote_slot->plugin);
-
-	SpinLockAcquire(&slot->mutex);
-	slot->data.plugin = plugin_name;
-	slot->data.database = remote_dbid;
-	slot->data.two_phase = remote_slot->two_phase;
-	slot->data.failover = remote_slot->failover;
-	slot->data.restart_lsn = remote_slot->restart_lsn;
-	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
-	slot->data.catalog_xmin = remote_slot->catalog_xmin;
-	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
-	SpinLockRelease(&slot->mutex);
-
-	if (xmin_changed)
 		ReplicationSlotsComputeRequiredXmin(false);
-
-	if (restart_lsn_changed)
 		ReplicationSlotsComputeRequiredLSN();
 
-	return true;
+		slot_updated = true;
+	}
+
+	if (remote_dbid != slot->data.database ||
+		remote_slot->two_phase != slot->data.two_phase ||
+		remote_slot->failover != slot->data.failover ||
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+	{
+		NameData	plugin_name;
+
+		/* Avoid expensive operations while holding a spinlock. */
+		namestrcpy(&plugin_name, remote_slot->plugin);
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.plugin = plugin_name;
+		slot->data.database = remote_dbid;
+		slot->data.two_phase = remote_slot->two_phase;
+		slot->data.failover = remote_slot->failover;
+		SpinLockRelease(&slot->mutex);
+
+		slot_updated = true;
+	}
+
+	return slot_updated;
 }
 
 /*
@@ -413,6 +453,7 @@ static bool
 update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool	ready_for_decoding = false;
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +484,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		return false;
 	}
 
-	/* First time slot update, the function must return true */
-	if (!update_local_synced_slot(remote_slot, remote_dbid))
-		elog(ERROR, "failed to update slot");
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&ready_for_decoding);
+
+	/*
+	 * Don't persist the slot if it cannot reach the consistent point from the
+	 * restart_lsn.
+	 */
+	if (!ready_for_decoding)
+	{
+		elog(DEBUG1, "The synced slot could not find consistent point from %X/%X",
+			 LSN_FORMAT_ARGS(slot->data.restart_lsn));
+		return false;
+	}
 
 	ReplicationSlotPersist();
 
@@ -578,7 +629,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid))
+			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
 			{
 				ReplicationSlotMarkDirty();
 				ReplicationSlotSave();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ac24b51860..e37e22f441 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
 	}
 	FreeDir(snap_dir);
 }
+
+/*
+ * Check if a logical snapshot at the specified point has been serialized.
+ */
+bool
+SnapBuildSnapshotExists(XLogRecPtr lsn)
+{
+	char		path[MAXPGPATH];
+	int			ret;
+	struct stat stat_buf;
+
+	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+			LSN_FORMAT_ARGS(lsn));
+
+	ret = stat(path, &stat_buf);
+
+	if (ret != 0 && errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m", path)));
+
+	return ret == 0;
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..4ef93ea05b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing our logical replication slot forward.
- *
- * The slot's restart_lsn is used as start point for reading records, while
- * confirmed_flush is used as base point for the decoding context.
- *
- * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
- * because we need to digest WAL to advance restart_lsn allowing to recycle
- * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
- * mode, no changes are generated anyway.
+ * Advance our logical replication slot forward. See
+ * LogicalSlotAdvanceAndCheckReadynessForDecoding for details.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
-	LogicalDecodingContext *ctx;
-	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn;
-
-	Assert(moveto != InvalidXLogRecPtr);
-
-	PG_TRY();
-	{
-		/*
-		 * Create our decoding context in fast_forward mode, passing start_lsn
-		 * as InvalidXLogRecPtr, so that we start processing from my slot's
-		 * confirmed_flush.
-		 */
-		ctx = CreateDecodingContext(InvalidXLogRecPtr,
-									NIL,
-									true,	/* fast_forward */
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
-
-		/*
-		 * Wait for specified streaming replication standby servers (if any)
-		 * to confirm receipt of WAL up to moveto lsn.
-		 */
-		WaitForStandbyConfirmation(moveto);
-
-		/*
-		 * Start reading at the slot's restart_lsn, which we know to point to
-		 * a valid record.
-		 */
-		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
-
-		/* invalidate non-timetravel entries */
-		InvalidateSystemCaches();
-
-		/* Decode records until we reach the requested target */
-		while (ctx->reader->EndRecPtr < moveto)
-		{
-			char	   *errm = NULL;
-			XLogRecord *record;
-
-			/*
-			 * Read records.  No changes are generated in fast_forward mode,
-			 * but snapbuilder/slot statuses are updated properly.
-			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
-
-			/*
-			 * Process the record.  Storage-level changes are ignored in
-			 * fast_forward mode, but other modules (such as snapbuilder)
-			 * might still have critical updates to do.
-			 */
-			if (record)
-				LogicalDecodingProcessRecord(ctx, ctx->reader);
-
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * Logical decoding could have clobbered CurrentResourceOwner during
-		 * transaction management, so restore the executor's value.  (This is
-		 * a kluge, but it's not worth cleaning up right now.)
-		 */
-		CurrentResourceOwner = old_resowner;
-
-		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
-		{
-			LogicalConfirmReceivedLocation(moveto);
-
-			/*
-			 * If only the confirmed_flush LSN has changed the slot won't get
-			 * marked as dirty by the above. Callers on the walsender
-			 * interface are expected to keep track of their own progress and
-			 * don't need it written out. But SQL-interface users cannot
-			 * specify their own start positions and it's harder for them to
-			 * keep track of their progress, so we should make more of an
-			 * effort to save it for them.
-			 *
-			 * Dirty the slot so it is written out at the next checkpoint. The
-			 * LSN position advanced to may still be lost on a crash but this
-			 * makes the data consistent after a clean shutdown.
-			 */
-			ReplicationSlotMarkDirty();
-		}
-
-		retlsn = MyReplicationSlot->data.confirmed_flush;
-
-		/* free context, call shutdown callback */
-		FreeDecodingContext(ctx);
-
-		InvalidateSystemCaches();
-	}
-	PG_CATCH();
-	{
-		/* clear all timetravel entries */
-		InvalidateSystemCaches();
-
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return retlsn;
+	return LogicalSlotAdvanceAndCheckReadynessForDecoding(moveto, NULL);
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..f0abac2c75 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+extern XLogRecPtr LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
+																 bool *ready_for_decoding);
 
 #endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index fbdf362396..a3360a1c5e 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 										 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
+extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
+
 #endif							/* SNAPBUILD_H */
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..e7021050fc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -280,11 +280,11 @@ is( $standby1->safe_psql(
 	'logical slot is re-synced');
 
 # Reset the log_min_messages to the default value.
-$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$primary->reload;
+#$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$primary->reload;
 
-$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$standby1->reload;
+#$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$standby1->reload;
 
 ##################################################
 # Test that a synchronized slot can not be decoded, altered or dropped by the
@@ -479,8 +479,8 @@ $subscriber1->safe_psql(
 
 $subscriber1->wait_for_subscription_sync;
 
-# Do not allow any further advancement of the restart_lsn and
-# confirmed_flush_lsn for the lsub1_slot.
+# Do not allow any further advancement of the confirmed_flush_lsn for the
+# lsub1_slot.
 $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
 
 # Wait for the replication slot to become inactive on the publisher
@@ -489,20 +489,15 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
 	1);
 
-# Get the restart_lsn for the logical slot lsub1_slot on the primary
-my $primary_restart_lsn = $primary->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
-
 # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
 my $primary_flush_lsn = $primary->safe_psql('postgres',
 	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
 
-# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
-# to the standby
+# Confirm that confirmed_flush_lsn of lsub1_slot slot are synced to the standby
 ok( $standby1->poll_query_until(
 		'postgres',
-		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
-	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
+		"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
+	'confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
 ##################################################
 # Test that logical failover replication slots wait for the specified
-- 
2.30.0.windows.2

