From 6a0b0e58d43c185c391eaeda73ba00bccd1dff95 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Mon, 14 Jul 2025 22:31:15 +0800
Subject: [PATCH v6] Replace polling with waiting in XactLockTableWait on 
 standby servers

Previously, XactLockTableWait() and ConditionalXactLockTableWait() used
inefficient polling when waiting for transactions to complete on standby servers.
This caused excessive CPU usage and poor response latency during transaction waits.

This polling occurred because standby servers do not maintain the primary's
lock table, so these functions fall back to repeatedly checking
TransactionIdIsInProgress() until the target transaction completes.
This situation commonly arises during logical replication slot creation on hot standby.

Replace the polling mechanism with per-XID event-driven waiting using a hash table
of condition variables. When transactions complete during KnownAssignedXids updates,
WakeXidWaiters() immediately wakes only the processes waiting for specific XIDs.
This eliminates polling while providing precise notifications.

The changes include a new XACT_COMPLETE wait event, a partitioned XidWaitHash table
mapping transaction IDs to condition variables, and new functions StandbyXidWait(),
WakeXidWaiters(). XactLockTableWait() now tries per-XID waiting on standby servers
instead of falling back to inefficient polling.
---
 src/backend/storage/ipc/procarray.c           | 157 ++++++++++++++++++
 src/backend/storage/lmgr/lmgr.c               |  36 +++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/storage/procarray.h               |   3 +
 4 files changed, 195 insertions(+), 2 deletions(-)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5b945a9ee3..448e5b1d491 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -306,6 +306,32 @@ static GlobalVisState GlobalVisTempRels;
  */
 static TransactionId ComputeXidHorizonsResultLastXmin;
 
+/*
+ * XID waiter hash table partition count
+ */
+#define NUM_XID_WAIT_PARTITIONS  16   /* Must be power of 2 */
+
+/*
+ * Hash table entry for per-XID waiting on standby servers.
+ */
+typedef struct XidWaitEntry
+{
+	TransactionId xid;				/* transaction ID being waited for */
+	ConditionVariable cv;			/* condition variable for this XID */
+	pg_atomic_uint32 waiter_count;	/* number of backends waiting */
+	bool		initialized;		/* true when entry is fully set up */
+} XidWaitEntry;
+
+/*
+ * Global hash table for XID waiting.
+ *
+ * This hash table maps transaction IDs to XidWaitEntry structures,
+ * enabling efficient per-XID waiting during hot standby recovery.
+ * The table is partitioned to reduce lock contention and uses the
+ * same infrastructure as PostgreSQL's lock manager.
+ */
+static HTAB *XidWaitHash = NULL;
+
 #ifdef XIDCACHE_DEBUG
 
 /* counters for XidCache measurement */
@@ -368,6 +394,7 @@ static void MaintainLatestCompletedXidRecovery(TransactionId latestXid);
 static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel,
 												  TransactionId xid);
 static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons);
+static void WakeXidWaiters(TransactionId xid);
 
 /*
  * Report shared-memory space needed by ProcArrayShmemInit
@@ -406,6 +433,12 @@ ProcArrayShmemSize(void)
 								 TOTAL_MAX_CACHED_SUBXIDS));
 		size = add_size(size,
 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
+
+		/*
+		 * XID waiter hash table sizing: allocate memory for the maximum
+		 * theoretical capacity but start with a smaller initial size.
+		 */
+		size = add_size(size, hash_estimate_size(TOTAL_MAX_CACHED_SUBXIDS, sizeof(XidWaitEntry)));
 	}
 
 	return size;
@@ -458,6 +491,25 @@ ProcArrayShmemInit(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+
+		/* Initialize XID waiter hash table for standby XID waiting */
+		{
+			HASHCTL info;
+
+			info.keysize = sizeof(TransactionId);
+			info.entrysize = sizeof(XidWaitEntry);
+			info.num_partitions = NUM_XID_WAIT_PARTITIONS;
+
+			/*
+			 * Start with 50% of theoretical capacity as baseline, but allow
+			 * growth up to full capacity if needed.
+			 */
+			XidWaitHash = ShmemInitHash("XID Wait Hash",
+									   TOTAL_MAX_CACHED_SUBXIDS / 2,  /* init_size */
+									   TOTAL_MAX_CACHED_SUBXIDS,      /* max_size */
+									   &info,
+									   HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
+		}
 	}
 }
 
@@ -5001,6 +5053,9 @@ KnownAssignedXidsRemove(TransactionId xid)
 	 * So, just ignore the search result.
 	 */
 	(void) KnownAssignedXidsSearch(xid, true);
+
+	/* Wake waiters for this specific XID as we expire it */
+	WakeXidWaiters(xid);
 }
 
 /*
@@ -5069,6 +5124,8 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 			if (!StandbyTransactionIdIsPrepared(knownXid))
 			{
 				KnownAssignedXidsValid[i] = false;
+				/* Wake waiters for this specific XID as we expire it */
+				WakeXidWaiters(knownXid);
 				count++;
 			}
 		}
@@ -5265,3 +5322,103 @@ KnownAssignedXidsReset(void)
 
 	LWLockRelease(ProcArrayLock);
 }
+
+/*
+ * Wait for XID completion using condition variables.
+ *
+ * This function implements efficient waiting for transaction completion
+ * on standby servers by using a hash table of condition variables keyed
+ * by transaction ID. This replaces polling-based approaches with direct
+ * event notification.
+ *
+ * The function handles the complete lifecycle of waiting: finding or
+ * creating the hash entry, managing waiter counts, and cleaning up
+ * when the last waiter finishes.
+ *
+ * Note: This function is only meaningful during hot standby recovery.
+ * Primary servers should use the lock-based waiting mechanisms.
+ */
+void
+StandbyXidWait(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	bool		found;
+	uint32		hashcode;
+
+	Assert(XidWaitHash);
+
+	/* Quick exit if not in hot standby mode yet or transaction already complete */
+	if (!InHotStandby || !TransactionIdIsInProgress(xid))
+		return;
+
+	/* Get hash code for partition locking */
+	hashcode = get_hash_value(XidWaitHash, &xid);
+
+	/* Find or create hash entry */
+	entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+									   HASH_ENTER, &found);
+
+	if (!found)
+	{
+		/* Initialize new entry */
+		entry->xid = xid;
+		ConditionVariableInit(&entry->cv);
+		pg_atomic_init_u32(&entry->waiter_count, 0);
+		entry->initialized = true;
+	}
+
+	/* Increment waiter count */
+	pg_atomic_fetch_add_u32(&entry->waiter_count, 1);
+
+	/* Standard PostgreSQL condition variable waiting pattern */
+	ConditionVariablePrepareToSleep(&entry->cv);
+
+	/* Wait loop with condition re-checking */
+	while (TransactionIdIsInProgress(xid))
+	{
+		ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE);
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/* Standard cleanup - PostgreSQL's exception system handles errors */
+	ConditionVariableCancelSleep();
+
+	/* Decrement waiter count and cleanup if last waiter */
+	if (pg_atomic_fetch_sub_u32(&entry->waiter_count, 1) == 1)
+	{
+		hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+								   HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * Wake waiters for a specific XID.
+ *
+ * This function is called when a transaction completes on the primary
+ * server and we need to wake up any standby processes that were waiting
+ * for that specific transaction ID.
+ *
+ * Uses the hash table to locate waiters for the specified XID and
+ * broadcasts on the associated condition variable to wake all waiting
+ * backends simultaneously.
+ */
+static void
+WakeXidWaiters(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	uint32		hashcode;
+
+	/* Quick exit if not in hot standby mode yet or hash table not available */
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	hashcode = get_hash_value(XidWaitHash, &xid);
+
+	entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+									   HASH_FIND, NULL);
+	if (entry && entry->initialized)
+	{
+		/* Wake all waiters for this specific XID */
+		ConditionVariableBroadcast(&entry->cv);
+	}
+}
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 3f6bf70bd3c..24d502e9e57 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -652,6 +652,10 @@ XactLockTableDelete(TransactionId xid)
  * is specified, an error context callback is set up.  If 'oper' is passed as
  * None, no error context callback is set up.
  *
+ * On standby servers, uses per-XID condition variable waiting instead of
+ * lock acquisition.  On primary servers, uses the standard lock table
+ * approach.
+ *
  * Note that this does the right thing for subtransactions: if we wait on a
  * subtransaction, we will exit as soon as it aborts or its top parent commits.
  * It takes some extra work to ensure this, because to save on shared memory
@@ -687,6 +691,21 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
 		error_context_stack = &callback;
 	}
 
+	/* Try per-XID wait on standby */
+	if (unlikely(RecoveryInProgress()))
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		StandbyXidWait(xid);
+
+		if (oper != XLTW_None)
+			error_context_stack = callback.previous;
+
+		return;
+	}
+
+	/* Try lock table wait on primary */
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -718,7 +737,6 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
 		 */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
@@ -734,6 +752,9 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
  *
  * As above, but only lock if we can get the lock without blocking.
  * Returns true if the lock was acquired.
+ *
+ * On standby servers, returns false if the transaction is still in progress
+ * On primary servers, uses conditional lock acquisition.
  */
 bool
 ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
@@ -741,6 +762,18 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 	LOCKTAG		tag;
 	bool		first = true;
 
+	if (unlikely(RecoveryInProgress()))
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		/* If the transaction is not in progress, we can return true immediately. */
+		if (!TransactionIdIsInProgress(xid))
+			return true;
+
+		return false;
+	}
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -761,7 +794,6 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 		/* See XactLockTableWait about this case */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..fdf3652cf46 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -160,6 +160,7 @@ WAL_BUFFER_INIT	"Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY	"Waiting for a new WAL summary to be generated."
+XACT_COMPLETE	"Waiting for a transaction to complete."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at transaction end."
 
 ABI_compatibility:
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index ef0b733ebe8..5166d97c424 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -100,4 +100,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+/* Per-XID waiting support for standby servers */
+extern void StandbyXidWait(TransactionId xid);
+
 #endif							/* PROCARRAY_H */
-- 
2.49.0

