From 4dfb1607c255e351e0e60b3c4175c7299ec6dbf0 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Thu, 4 Sep 2025 12:50:59 +0800
Subject: [PATCH v1 1/2] Fix conflict-relevant data retention for prepared
 transactions

Previously, conflict-relevant data could be prematurely removed before applying
prepared transactions on the publisher that are in the commit phase. This
occurred because GetOldestActiveTransactionId() was called on the publisher,
which failed to account for the backend executing COMMIT PREPARED, as this
backend does not have an xid stored in PGPROC.

This commit fixes the issue by introducing a new function to traverse global
transactions, identifying prepared transactions in the commit phase.

Additionally, an injection point has been added to ensure that data is not
prematurely removed when a concurrent prepared transaction is being committed on
the publisher.
---
 src/backend/access/transam/twophase.c | 56 +++++++++++++++++++++++++++
 src/backend/replication/walsender.c   | 12 ++++++
 src/include/access/twophase.h         |  2 +
 3 files changed, 70 insertions(+)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 7918176fc58..e14e7551129 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2809,3 +2809,59 @@ LookupGXactBySubid(Oid subid)
 
 	return found;
 }
+
+/*
+ * TwoPhaseGetXidByLockingProc
+ *		Return the oldest transaction ID from prepared transactions that are
+ *		currently in the commit phase.
+ *
+ * This function only considers transactions in the currently connected
+ * database. If no matching transactions are found, it returns
+ * InvalidTransactionId.
+ */
+TransactionId
+TwoPhaseGetOldestXidInCommit(void)
+{
+	TransactionId oldestRunningXid = InvalidTransactionId;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+		PGPROC	   *proc = GetPGProcByNumber(gxact->pgprocno);
+		PGPROC	   *commitproc;
+		TransactionId xid;
+
+		if (!gxact->valid)
+			continue;
+
+		if (MyDatabaseId != proc->databaseId)
+			continue;
+
+		if (gxact->locking_backend == INVALID_PROC_NUMBER)
+			continue;
+
+		/*
+		 * Get the backend that is handling the transaction. It's safe to
+		 * access this backend while holding TwoPhaseStateLock, as the backend
+		 * can only be destroyed after either removing or unlocking the
+		 * current global transaction, both of which require an exclusive
+		 * TwoPhaseStateLock.
+		 */
+		commitproc = GetPGProcByNumber(gxact->locking_backend);
+
+		if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
+			continue;
+
+		xid = XidFromFullTransactionId(gxact->fxid);
+
+		if (!TransactionIdIsValid(oldestRunningXid) ||
+			TransactionIdPrecedes(xid, oldestRunningXid))
+			oldestRunningXid = xid;
+	}
+
+	LWLockRelease(TwoPhaseStateLock);
+
+	return oldestRunningXid;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e3dce9dc68d..59822f22b8d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -51,6 +51,7 @@
 
 #include "access/timeline.h"
 #include "access/transam.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
@@ -2719,6 +2720,7 @@ ProcessStandbyPSRequestMessage(void)
 {
 	XLogRecPtr	lsn = InvalidXLogRecPtr;
 	TransactionId oldestXidInCommit;
+	TransactionId oldestGXidInCommit;
 	FullTransactionId nextFullXid;
 	FullTransactionId fullOldestXidInCommit;
 	WalSnd	   *walsnd = MyWalSnd;
@@ -2746,6 +2748,16 @@ ProcessStandbyPSRequestMessage(void)
 	 * ones replicated.
 	 */
 	oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+	oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
+
+	/*
+	 * Update the oldest xid for standby transmission if an older prepared
+	 * transaction exists and is currently in commit phase.
+	 */
+	if (TransactionIdIsValid(oldestGXidInCommit) &&
+		TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
+		oldestXidInCommit = oldestGXidInCommit;
+
 	nextFullXid = ReadNextFullTransactionId();
 	fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
 															 oldestXidInCommit);
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 509bdad9a5d..64463e9f4af 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -68,4 +68,6 @@ extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
 								   int szgid);
 extern bool LookupGXactBySubid(Oid subid);
 
+extern TransactionId TwoPhaseGetOldestXidInCommit(void);
+
 #endif							/* TWOPHASE_H */
-- 
2.51.0.windows.1

