From 15ccf4a8c4dff76fd729dd6922162be34f57c69c Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 28 Mar 2024 20:19:22 +0800
Subject: [PATCH v4] advance the restart_lsn of synced slots using logical
 decoding

---
 src/backend/replication/logical/logical.c     | 148 +++++++++++++++++-
 src/backend/replication/logical/slotsync.c    |  75 ++++++---
 src/backend/replication/slotfuncs.c           | 118 +-------------
 src/include/replication/logical.h             |   2 +
 .../t/040_standby_failover_slots_sync.pl      |  23 ++-
 5 files changed, 208 insertions(+), 158 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..bbc7cdaf50 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -516,17 +517,24 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
+	/*
+	 * Do not allow decoding if the replication slot belongs to a different
+	 * database unless we are in fast-forward mode. In fast-forward mode, we
+	 * ignore storage-level changes and do not need to access the database
+	 * object.
+	 */
+	if (slot->data.database != MyDatabaseId && !fast_forward)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
 	/*
-	 * Do not allow consumption of a "synchronized" slot until the standby
-	 * gets promoted.
+	 * Do not allow consumption of a "synchronized" slot until the standby gets
+	 * promoted unless we are syncing replication slots, in which case we need
+	 * to advance the LSN and xmin of the slot during decoding.
 	 */
-	if (RecoveryInProgress() && slot->data.synced)
+	if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
 		ereport(ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2042,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
 
 	return has_pending_wal;
 }
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *ready_for_decoding will be set to true if the logical decoding reaches
+ * the consistent point; Otherwise, it will be set to false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
+											   bool *ready_for_decoding)
+{
+	LogicalDecodingContext *ctx;
+	ResourceOwner old_resowner = CurrentResourceOwner;
+	XLogRecPtr	retlsn;
+
+	Assert(moveto != InvalidXLogRecPtr);
+
+	if (ready_for_decoding)
+		*ready_for_decoding = false;
+
+	PG_TRY();
+	{
+		/*
+		 * Create our decoding context in fast_forward mode, passing start_lsn
+		 * as InvalidXLogRecPtr, so that we start processing from my slot's
+		 * confirmed_flush.
+		 */
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									NIL,
+									true,	/* fast_forward */
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
+
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
+		/*
+		 * Start reading at the slot's restart_lsn, which we know to point to
+		 * a valid record.
+		 */
+		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+		/* invalidate non-timetravel entries */
+		InvalidateSystemCaches();
+
+		/* Decode records until we reach the requested target */
+		while (ctx->reader->EndRecPtr < moveto)
+		{
+			char	   *errm = NULL;
+			XLogRecord *record;
+
+			/*
+			 * Read records.  No changes are generated in fast_forward mode,
+			 * but snapbuilder/slot statuses are updated properly.
+			 */
+			record = XLogReadRecord(ctx->reader, &errm);
+			if (errm)
+				elog(ERROR, "could not find record while advancing replication slot: %s",
+					 errm);
+
+			/*
+			 * Process the record.  Storage-level changes are ignored in
+			 * fast_forward mode, but other modules (such as snapbuilder)
+			 * might still have critical updates to do.
+			 */
+			if (record)
+				LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		if (DecodingContextReady(ctx) && ready_for_decoding)
+			*ready_for_decoding = true;
+
+		/*
+		 * Logical decoding could have clobbered CurrentResourceOwner during
+		 * transaction management, so restore the executor's value.  (This is
+		 * a kluge, but it's not worth cleaning up right now.)
+		 */
+		CurrentResourceOwner = old_resowner;
+
+		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+		{
+			LogicalConfirmReceivedLocation(moveto);
+
+			/*
+			 * If only the confirmed_flush LSN has changed the slot won't get
+			 * marked as dirty by the above. Callers on the walsender
+			 * interface are expected to keep track of their own progress and
+			 * don't need it written out. But SQL-interface users cannot
+			 * specify their own start positions and it's harder for them to
+			 * keep track of their progress, so we should make more of an
+			 * effort to save it for them.
+			 *
+			 * Dirty the slot so it is written out at the next checkpoint. The
+			 * LSN position advanced to may still be lost on a crash but this
+			 * makes the data consistent after a clean shutdown.
+			 */
+			ReplicationSlotMarkDirty();
+		}
+
+		retlsn = MyReplicationSlot->data.confirmed_flush;
+
+		/* free context, call shutdown callback */
+		FreeDecodingContext(ctx);
+
+		InvalidateSystemCaches();
+	}
+	PG_CATCH();
+	{
+		/* clear all timetravel entries */
+		InvalidateSystemCaches();
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return retlsn;
+}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..df8b76c8ab 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,12 @@
  * which slot sync worker can perform the sync periodically or user can call
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn, they would become unreliable after promotion due to potential
+ * data loss from changes before reaching a consistent point. So, we mark such
+ * slots as RS_TEMPORARY. Once they successfully reach the consistent point,
+ * they will be marked to RS_PERSISTENT.
+ *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,7 +55,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
-#include "replication/slot.h"
+#include "replication/logical.h"
 #include "replication/slotsync.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -147,28 +153,46 @@ static void slotsync_failure_callback(int code, Datum arg);
  *
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
+ *
+ * If the LSN of the slot is modified, the ready_for_decoding will be set to
+ * true if the slot can reach a consistent point; otherwise, it will be set to
+ * false.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+						 bool *ready_for_decoding)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		xmin_changed;
-	bool		restart_lsn_changed;
 	NameData	plugin_name;
+	bool		updated_lsn = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-	xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
-	restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush)
+	{
+		/*
+		 * By advancing the restart_lsn, confirmed_lsn, and xmin using
+		 * fast-forward logical decoding, we can verify whether a consistent
+		 * snapshot can be built. This process also involves saving necessary
+		 * snapshots to disk during decoding, ensuring that logical decoding
+		 * efficiently reaches a consistent point at the restart_lsn without
+		 * the potential loss of data during snapshot creation.
+		 *
+		 * XXX we could optimize this by skipping logical decoding advancement
+		 * if a logical snapshot at restart_lsn is already saved on disk.
+		 */
+		LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn,
+													   ready_for_decoding);
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+		updated_lsn = true;
+	}
 
-	if (!xmin_changed &&
-		!restart_lsn_changed &&
-		remote_dbid == slot->data.database &&
+	if (remote_dbid == slot->data.database &&
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
-		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
-		return false;
+		return updated_lsn;
 
 	/* Avoid expensive operations while holding a spinlock. */
 	namestrcpy(&plugin_name, remote_slot->plugin);
@@ -178,18 +202,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.database = remote_dbid;
 	slot->data.two_phase = remote_slot->two_phase;
 	slot->data.failover = remote_slot->failover;
-	slot->data.restart_lsn = remote_slot->restart_lsn;
-	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
-	slot->data.catalog_xmin = remote_slot->catalog_xmin;
-	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
 	SpinLockRelease(&slot->mutex);
 
-	if (xmin_changed)
-		ReplicationSlotsComputeRequiredXmin(false);
-
-	if (restart_lsn_changed)
-		ReplicationSlotsComputeRequiredLSN();
-
 	return true;
 }
 
@@ -413,6 +427,7 @@ static bool
 update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool	ready_for_decoding = false;
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +458,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		return false;
 	}
 
-	/* First time slot update, the function must return true */
-	if (!update_local_synced_slot(remote_slot, remote_dbid))
-		elog(ERROR, "failed to update slot");
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&ready_for_decoding);
+
+	/*
+	 * Don't persist the slot if it cannot reach the consistent point from the
+	 * restart_lsn.
+	 */
+	if (!ready_for_decoding)
+	{
+		elog(DEBUG1, "The synced slot could not find consistent point from %X/%X",
+			 LSN_FORMAT_ARGS(slot->data.restart_lsn));
+		return false;
+	}
 
 	ReplicationSlotPersist();
 
@@ -578,7 +603,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid))
+			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
 			{
 				ReplicationSlotMarkDirty();
 				ReplicationSlotSave();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..4ef93ea05b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing our logical replication slot forward.
- *
- * The slot's restart_lsn is used as start point for reading records, while
- * confirmed_flush is used as base point for the decoding context.
- *
- * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
- * because we need to digest WAL to advance restart_lsn allowing to recycle
- * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
- * mode, no changes are generated anyway.
+ * Advance our logical replication slot forward. See
+ * LogicalSlotAdvanceAndCheckReadynessForDecoding for details.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
-	LogicalDecodingContext *ctx;
-	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn;
-
-	Assert(moveto != InvalidXLogRecPtr);
-
-	PG_TRY();
-	{
-		/*
-		 * Create our decoding context in fast_forward mode, passing start_lsn
-		 * as InvalidXLogRecPtr, so that we start processing from my slot's
-		 * confirmed_flush.
-		 */
-		ctx = CreateDecodingContext(InvalidXLogRecPtr,
-									NIL,
-									true,	/* fast_forward */
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
-
-		/*
-		 * Wait for specified streaming replication standby servers (if any)
-		 * to confirm receipt of WAL up to moveto lsn.
-		 */
-		WaitForStandbyConfirmation(moveto);
-
-		/*
-		 * Start reading at the slot's restart_lsn, which we know to point to
-		 * a valid record.
-		 */
-		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
-
-		/* invalidate non-timetravel entries */
-		InvalidateSystemCaches();
-
-		/* Decode records until we reach the requested target */
-		while (ctx->reader->EndRecPtr < moveto)
-		{
-			char	   *errm = NULL;
-			XLogRecord *record;
-
-			/*
-			 * Read records.  No changes are generated in fast_forward mode,
-			 * but snapbuilder/slot statuses are updated properly.
-			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
-
-			/*
-			 * Process the record.  Storage-level changes are ignored in
-			 * fast_forward mode, but other modules (such as snapbuilder)
-			 * might still have critical updates to do.
-			 */
-			if (record)
-				LogicalDecodingProcessRecord(ctx, ctx->reader);
-
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * Logical decoding could have clobbered CurrentResourceOwner during
-		 * transaction management, so restore the executor's value.  (This is
-		 * a kluge, but it's not worth cleaning up right now.)
-		 */
-		CurrentResourceOwner = old_resowner;
-
-		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
-		{
-			LogicalConfirmReceivedLocation(moveto);
-
-			/*
-			 * If only the confirmed_flush LSN has changed the slot won't get
-			 * marked as dirty by the above. Callers on the walsender
-			 * interface are expected to keep track of their own progress and
-			 * don't need it written out. But SQL-interface users cannot
-			 * specify their own start positions and it's harder for them to
-			 * keep track of their progress, so we should make more of an
-			 * effort to save it for them.
-			 *
-			 * Dirty the slot so it is written out at the next checkpoint. The
-			 * LSN position advanced to may still be lost on a crash but this
-			 * makes the data consistent after a clean shutdown.
-			 */
-			ReplicationSlotMarkDirty();
-		}
-
-		retlsn = MyReplicationSlot->data.confirmed_flush;
-
-		/* free context, call shutdown callback */
-		FreeDecodingContext(ctx);
-
-		InvalidateSystemCaches();
-	}
-	PG_CATCH();
-	{
-		/* clear all timetravel entries */
-		InvalidateSystemCaches();
-
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return retlsn;
+	return LogicalSlotAdvanceAndCheckReadynessForDecoding(moveto, NULL);
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..f0abac2c75 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+extern XLogRecPtr LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto,
+																 bool *ready_for_decoding);
 
 #endif
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..e7021050fc 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -280,11 +280,11 @@ is( $standby1->safe_psql(
 	'logical slot is re-synced');
 
 # Reset the log_min_messages to the default value.
-$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$primary->reload;
+#$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$primary->reload;
 
-$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$standby1->reload;
+#$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$standby1->reload;
 
 ##################################################
 # Test that a synchronized slot can not be decoded, altered or dropped by the
@@ -479,8 +479,8 @@ $subscriber1->safe_psql(
 
 $subscriber1->wait_for_subscription_sync;
 
-# Do not allow any further advancement of the restart_lsn and
-# confirmed_flush_lsn for the lsub1_slot.
+# Do not allow any further advancement of the confirmed_flush_lsn for the
+# lsub1_slot.
 $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
 
 # Wait for the replication slot to become inactive on the publisher
@@ -489,20 +489,15 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
 	1);
 
-# Get the restart_lsn for the logical slot lsub1_slot on the primary
-my $primary_restart_lsn = $primary->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
-
 # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
 my $primary_flush_lsn = $primary->safe_psql('postgres',
 	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
 
-# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
-# to the standby
+# Confirm that confirmed_flush_lsn of lsub1_slot slot are synced to the standby
 ok( $standby1->poll_query_until(
 		'postgres',
-		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
-	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
+		"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
+	'confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
 ##################################################
 # Test that logical failover replication slots wait for the specified
-- 
2.34.1

