From 12200bc2f81cf59bb2231a0330a05e376d957d88 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 11 Apr 2024 16:15:43 +0800
Subject: [PATCH v5 2/2] write the changed xmin to disk before computing global
 oldest xmin

Ensure that when updating the catalog_xmin of the synced slots, it is first
written to disk before changing the in-memory value (effective_catalog_xmin).
This is to prevent a scenario where the in-memory value change triggers vacuum
to remove catalog tuples before the catalog_xmin is written to disk, In the
event of a crash before the catalog_xmin is persisted, we would not know that
some catalog tuples have been removed.
---
 src/backend/replication/logical/slotsync.c | 49 +++++++++++++++-------
 1 file changed, 34 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 0d4735fbfa..361fc7e602 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -168,7 +168,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 						 bool *found_consistent_snapshot)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		slot_updated = false;
+	bool		updated_xmin_or_lsn = false;
+	bool		updated_config = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
@@ -203,7 +204,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 			slot->data.restart_lsn = remote_slot->restart_lsn;
 			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 			slot->data.catalog_xmin = remote_slot->catalog_xmin;
-			slot->effective_catalog_xmin = remote_slot->catalog_xmin;
 			SpinLockRelease(&slot->mutex);
 
 			if (found_consistent_snapshot)
@@ -224,10 +224,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 										   LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
 		}
 
-		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
-
-		slot_updated = true;
+		updated_xmin_or_lsn = true;
 	}
 
 	if (remote_dbid != slot->data.database ||
@@ -247,10 +244,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		slot->data.failover = remote_slot->failover;
 		SpinLockRelease(&slot->mutex);
 
-		slot_updated = true;
+		updated_config = true;
 	}
 
-	return slot_updated;
+	/*
+	 * We have to write the changed xmin to disk *before* we change the
+	 * in-memory value, otherwise after a crash we wouldn't know that some
+	 * catalog tuples might have been removed already.
+	 */
+	if (updated_config || updated_xmin_or_lsn)
+	{
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+	}
+
+	/*
+	 * Now the new xmin is safely on disk, we can let the global value
+	 * advance. We do not take ProcArrayLock or similar since we only advance
+	 * xmin here and there's not much harm done by a concurrent computation
+	 * missing that.
+	 */
+	if (updated_xmin_or_lsn)
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
+
+	return updated_config || updated_xmin_or_lsn;
 }
 
 /*
@@ -667,13 +691,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
-			{
-				ReplicationSlotMarkDirty();
-				ReplicationSlotSave();
-
-				slot_updated = true;
-			}
+			slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
+													NULL);
 		}
 	}
 	/* Otherwise create the slot first. */
-- 
2.31.1

