Just rebased the patch.

-------
regards
Yura Sokolov aka funny-falcon
From 080c9e0de5e6e10751347e1ff50b65df424744cb Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 3 Feb 2025 11:58:33 +0300
Subject: [PATCH v2] sinvaladt.c: use atomic operations on maxMsgNum

msgnumLock spinlock could be highly contended.
Comment states it was used as memory barrier.
Lets use atomic ops with memory barriers directly instead.

Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
no full barrier semantic is required, and explicit read/write barriers
are cheaper at least on x86_64.
---
 src/backend/storage/ipc/sinvaladt.c | 74 ++++++++++++-----------------
 1 file changed, 30 insertions(+), 44 deletions(-)

diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2da91738c32..c3b89afce9f 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -86,19 +86,10 @@
  * has no need to touch anyone's ProcState, except in the infrequent cases
  * when SICleanupQueue is needed.  The only point of overlap is that
  * the writer wants to change maxMsgNum while readers need to read it.
- * We deal with that by having a spinlock that readers must take for just
- * long enough to read maxMsgNum, while writers take it for just long enough
- * to write maxMsgNum.  (The exact rule is that you need the spinlock to
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
- * spinlock to write maxMsgNum unless you are holding both locks.)
- *
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
- * writable, the spinlock might seem unnecessary.  The reason it is needed
- * is to provide a memory barrier: we need to be sure that messages written
- * to the array are actually there before maxMsgNum is increased, and that
- * readers will see that data after fetching maxMsgNum.  Multiprocessors
- * that have weak memory-ordering guarantees can fail without the memory
- * barrier instructions that are included in the spinlock sequences.
+ * We deal with that by using atomic operations with proper memory barriers.
+ * (The exact rule is that you need the read barrier after atomic read
+ * maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * write barrier before write maxMsgNum unless you are holding both locks.)
  */
 
 
@@ -139,7 +130,7 @@ typedef struct ProcState
 	/* procPid is zero in an inactive ProcState array entry. */
 	pid_t		procPid;		/* PID of backend, for signaling */
 	/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
-	int			nextMsgNum;		/* next message number to read */
+	uint32		nextMsgNum;		/* next message number to read */
 	bool		resetState;		/* backend needs to reset its state */
 	bool		signaled;		/* backend has been sent catchup signal */
 	bool		hasMessages;	/* backend has unread messages */
@@ -167,11 +158,9 @@ typedef struct SISeg
 	/*
 	 * General state information
 	 */
-	int			minMsgNum;		/* oldest message still needed */
-	int			maxMsgNum;		/* next message number to be assigned */
-	int			nextThreshold;	/* # of messages to call SICleanupQueue */
-
-	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */
+	uint32		minMsgNum;		/* oldest message still needed */
+	pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */
+	uint32		nextThreshold;	/* # of messages to call SICleanupQueue */
 
 	/*
 	 * Circular buffer holding shared-inval messages
@@ -243,9 +232,8 @@ SharedInvalShmemInit(void)
 
 	/* Clear message counters, save size of procState array, init spinlock */
 	shmInvalBuffer->minMsgNum = 0;
-	shmInvalBuffer->maxMsgNum = 0;
+	pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0);
 	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
-	SpinLockInit(&shmInvalBuffer->msgnumLock);
 
 	/* The buffer[] array is initially all unused, so we need not fill it */
 
@@ -303,7 +291,7 @@ SharedInvalBackendInit(bool sendOnly)
 
 	/* mark myself active, with all extant messages already read */
 	stateP->procPid = MyProcPid;
-	stateP->nextMsgNum = segP->maxMsgNum;
+	stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum);
 	stateP->resetState = false;
 	stateP->signaled = false;
 	stateP->hasMessages = false;
@@ -382,8 +370,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 	while (n > 0)
 	{
 		int			nthistime = Min(n, WRITE_QUANTUM);
-		int			numMsgs;
-		int			max;
+		uint32		numMsgs;
+		uint32		max;
 		int			i;
 
 		n -= nthistime;
@@ -399,7 +387,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 		 */
 		for (;;)
 		{
-			numMsgs = segP->maxMsgNum - segP->minMsgNum;
+			numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
 			if (numMsgs + nthistime > MAXNUMMESSAGES ||
 				numMsgs >= segP->nextThreshold)
 				SICleanupQueue(true, nthistime);
@@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 		/*
 		 * Insert new message(s) into proper slot of circular buffer
 		 */
-		max = segP->maxMsgNum;
+		max = pg_atomic_read_u32(&segP->maxMsgNum);
 		while (nthistime-- > 0)
 		{
 			segP->buffer[max % MAXNUMMESSAGES] = *data++;
 			max++;
 		}
 
-		/* Update current value of maxMsgNum using spinlock */
-		SpinLockAcquire(&segP->msgnumLock);
-		segP->maxMsgNum = max;
-		SpinLockRelease(&segP->msgnumLock);
+		/* Update current value of maxMsgNum using atomic with memory barrier */
+		pg_write_barrier();
+		pg_atomic_write_u32(&segP->maxMsgNum, max);
 
 		/*
 		 * Now that the maxMsgNum change is globally visible, we give everyone
@@ -473,7 +460,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 {
 	SISeg	   *segP;
 	ProcState  *stateP;
-	int			max;
+	uint32		max;
 	int			n;
 
 	segP = shmInvalBuffer;
@@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 	 */
 	stateP->hasMessages = false;
 
-	/* Fetch current value of maxMsgNum using spinlock */
-	SpinLockAcquire(&segP->msgnumLock);
-	max = segP->maxMsgNum;
-	SpinLockRelease(&segP->msgnumLock);
+	/* Fetch current value of maxMsgNum using atomic with memory barrier */
+	max = pg_atomic_read_u32(&segP->maxMsgNum);
+	pg_read_barrier();
 
 	if (stateP->resetState)
 	{
@@ -576,11 +562,11 @@ void
 SICleanupQueue(bool callerHasWriteLock, int minFree)
 {
 	SISeg	   *segP = shmInvalBuffer;
-	int			min,
+	uint32		min,
 				minsig,
 				lowbound,
-				numMsgs,
-				i;
+				numMsgs;
+	int			i;
 	ProcState  *needSig = NULL;
 
 	/* Lock out all writers and readers */
@@ -595,14 +581,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	 * backends here it is possible for them to keep sending messages without
 	 * a problem even when they are the only active backend.
 	 */
-	min = segP->maxMsgNum;
-	minsig = min - SIG_THRESHOLD;
-	lowbound = min - MAXNUMMESSAGES + minFree;
+	min = pg_atomic_read_u32(&segP->maxMsgNum);
+	minsig = min - Min(min, SIG_THRESHOLD);
+	lowbound = min - Min(min, MAXNUMMESSAGES - minFree);
 
 	for (i = 0; i < segP->numProcs; i++)
 	{
 		ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
-		int			n = stateP->nextMsgNum;
+		uint32		n = stateP->nextMsgNum;
 
 		/* Ignore if already in reset state */
 		Assert(stateP->procPid != 0);
@@ -641,7 +627,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	if (min >= MSGNUMWRAPAROUND)
 	{
 		segP->minMsgNum -= MSGNUMWRAPAROUND;
-		segP->maxMsgNum -= MSGNUMWRAPAROUND;
+		pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
 		for (i = 0; i < segP->numProcs; i++)
 			segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
 	}
@@ -650,7 +636,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	 * Determine how many messages are still in the queue, and set the
 	 * threshold at which we should repeat SICleanupQueue().
 	 */
-	numMsgs = segP->maxMsgNum - segP->minMsgNum;
+	numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
 	if (numMsgs < CLEANUP_MIN)
 		segP->nextThreshold = CLEANUP_MIN;
 	else
-- 
2.43.0

Reply via email to