Good day,

Investigating some performance issues of a client, our engineers found
msgnumLock to be contended.

Looking closer it is obvious it is not needed at all: it used only as
memory barrier. It is even stated in comment at file start:

* We deal with that by having a spinlock that readers must take for just
* long enough to read maxMsgNum, while writers take it for just long enough
* to write maxMsgNum.  (The exact rule is that you need the spinlock to
* read maxMsgNum if you are not holding SInvalWriteLock, and you need the
* spinlock to write maxMsgNum unless you are holding both locks.)
*
* Note: since maxMsgNum is an int and hence presumably atomically readable/
* writable, the spinlock might seem unnecessary.  The reason it is needed
* is to provide a memory barrier: we need to be sure that messages written
* to the array are actually there before maxMsgNum is increased, and that
* readers will see that data after fetching maxMsgNum.

So we changed maxMsgNum to be pg_atomic_uint32, and put appropriate memory
barriers:
- pg_write_barrier() before write to maxMsgNum (when only SInvalWriteLock
is held)
- pg_read_barrier() after read of maxMsgNum (when only SInvalReadLock is held)

It improved situation for our client.

Note: pg_(write|read)_barrier() is chosen instead of
pg_atomic_(read|write)_membarrier_u32() because it certainly cheaper at
least on x86_64 where it is translated to just a compiler barrier (empty
asm statement).
At least pg_atomic_read_membarrier_u32() is implemented currently as a
write operation, that's not good for contended place.

-----

regards
Yura Sokolov aka funny-falcon
From 351b3ecb5d085559beb13ffd6dd4416dab4b4a84 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 3 Feb 2025 11:58:33 +0300
Subject: [PATCH v0] sinvaladt.c: use atomic operations on maxMsgNum

msgnumLock spinlock could be highly contended.
Comment states it was used as memory barrier.
Lets use atomic ops with memory barriers directly instead.

Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
no full barrier semantic is required, and explicit read/write barriers
are cheaper at least on x86_64.
---
 src/backend/storage/ipc/sinvaladt.c | 56 ++++++++++++-----------------
 1 file changed, 23 insertions(+), 33 deletions(-)

diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2da91738c32..7d4912c6419 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -86,19 +86,10 @@
  * has no need to touch anyone's ProcState, except in the infrequent cases
  * when SICleanupQueue is needed.  The only point of overlap is that
  * the writer wants to change maxMsgNum while readers need to read it.
- * We deal with that by having a spinlock that readers must take for just
- * long enough to read maxMsgNum, while writers take it for just long enough
- * to write maxMsgNum.  (The exact rule is that you need the spinlock to
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
- * spinlock to write maxMsgNum unless you are holding both locks.)
- *
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
- * writable, the spinlock might seem unnecessary.  The reason it is needed
- * is to provide a memory barrier: we need to be sure that messages written
- * to the array are actually there before maxMsgNum is increased, and that
- * readers will see that data after fetching maxMsgNum.  Multiprocessors
- * that have weak memory-ordering guarantees can fail without the memory
- * barrier instructions that are included in the spinlock sequences.
+ * We deal with that by using atomic operations with proper memory barriers.
+ * (The exact rule is that you need the read barrier after atomic read
+ * maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * write barrier before write maxMsgNum unless you are holding both locks.)
  */
 
 
@@ -168,11 +159,9 @@ typedef struct SISeg
 	 * General state information
 	 */
 	int			minMsgNum;		/* oldest message still needed */
-	int			maxMsgNum;		/* next message number to be assigned */
+	pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */
 	int			nextThreshold;	/* # of messages to call SICleanupQueue */
 
-	slock_t		msgnumLock;		/* spinlock protecting maxMsgNum */
-
 	/*
 	 * Circular buffer holding shared-inval messages
 	 */
@@ -243,9 +232,8 @@ SharedInvalShmemInit(void)
 
 	/* Clear message counters, save size of procState array, init spinlock */
 	shmInvalBuffer->minMsgNum = 0;
-	shmInvalBuffer->maxMsgNum = 0;
+	pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0);
 	shmInvalBuffer->nextThreshold = CLEANUP_MIN;
-	SpinLockInit(&shmInvalBuffer->msgnumLock);
 
 	/* The buffer[] array is initially all unused, so we need not fill it */
 
@@ -303,7 +291,7 @@ SharedInvalBackendInit(bool sendOnly)
 
 	/* mark myself active, with all extant messages already read */
 	stateP->procPid = MyProcPid;
-	stateP->nextMsgNum = segP->maxMsgNum;
+	stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum);
 	stateP->resetState = false;
 	stateP->signaled = false;
 	stateP->hasMessages = false;
@@ -399,7 +387,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 		 */
 		for (;;)
 		{
-			numMsgs = segP->maxMsgNum - segP->minMsgNum;
+			numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
 			if (numMsgs + nthistime > MAXNUMMESSAGES ||
 				numMsgs >= segP->nextThreshold)
 				SICleanupQueue(true, nthistime);
@@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
 		/*
 		 * Insert new message(s) into proper slot of circular buffer
 		 */
-		max = segP->maxMsgNum;
+		max = pg_atomic_read_u32(&segP->maxMsgNum);
 		while (nthistime-- > 0)
 		{
 			segP->buffer[max % MAXNUMMESSAGES] = *data++;
 			max++;
 		}
 
-		/* Update current value of maxMsgNum using spinlock */
-		SpinLockAcquire(&segP->msgnumLock);
-		segP->maxMsgNum = max;
-		SpinLockRelease(&segP->msgnumLock);
+		/* Update current value of maxMsgNum using atomic with memory barrier */
+		pg_write_barrier();
+		pg_atomic_write_u32(&segP->maxMsgNum, max);
 
 		/*
 		 * Now that the maxMsgNum change is globally visible, we give everyone
@@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
 	 */
 	stateP->hasMessages = false;
 
-	/* Fetch current value of maxMsgNum using spinlock */
-	SpinLockAcquire(&segP->msgnumLock);
-	max = segP->maxMsgNum;
-	SpinLockRelease(&segP->msgnumLock);
+	/* Fetch current value of maxMsgNum using atomic with memory barrier */
+	max = pg_atomic_read_u32(&segP->maxMsgNum);
+	pg_read_barrier();
 
 	if (stateP->resetState)
 	{
@@ -595,7 +581,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	 * backends here it is possible for them to keep sending messages without
 	 * a problem even when they are the only active backend.
 	 */
-	min = segP->maxMsgNum;
+	min = pg_atomic_read_u32(&segP->maxMsgNum);
 	minsig = min - SIG_THRESHOLD;
 	lowbound = min - MAXNUMMESSAGES + minFree;
 
@@ -640,8 +626,12 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	 */
 	if (min >= MSGNUMWRAPAROUND)
 	{
-		segP->minMsgNum -= MSGNUMWRAPAROUND;
-		segP->maxMsgNum -= MSGNUMWRAPAROUND;
+		/*
+		 * we don't need memory barrier here, but using sub_fetch is just
+		 * simpler than read_u32+write_u32 pair, and this place is not
+		 * contented.
+		 */
+		pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
 		for (i = 0; i < segP->numProcs; i++)
 			segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
 	}
@@ -650,7 +640,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
 	 * Determine how many messages are still in the queue, and set the
 	 * threshold at which we should repeat SICleanupQueue().
 	 */
-	numMsgs = segP->maxMsgNum - segP->minMsgNum;
+	numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
 	if (numMsgs < CLEANUP_MIN)
 		segP->nextThreshold = CLEANUP_MIN;
 	else
-- 
2.43.0

Reply via email to