Just rebased the patch.
-------
regards
Yura Sokolov aka funny-falcon
From 080c9e0de5e6e10751347e1ff50b65df424744cb Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Mon, 3 Feb 2025 11:58:33 +0300
Subject: [PATCH v2] sinvaladt.c: use atomic operations on maxMsgNum
msgnumLock spinlock could be highly contended.
Comment states it was used as memory barrier.
Lets use atomic ops with memory barriers directly instead.
Note: patch uses pg_read_barrier()/pg_write_barrier() instead of
pg_atomic_read_membarrier_u32()/pg_atomic_write_membarrier_u32() since
no full barrier semantic is required, and explicit read/write barriers
are cheaper at least on x86_64.
---
src/backend/storage/ipc/sinvaladt.c | 74 ++++++++++++-----------------
1 file changed, 30 insertions(+), 44 deletions(-)
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
index 2da91738c32..c3b89afce9f 100644
--- a/src/backend/storage/ipc/sinvaladt.c
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -86,19 +86,10 @@
* has no need to touch anyone's ProcState, except in the infrequent cases
* when SICleanupQueue is needed. The only point of overlap is that
* the writer wants to change maxMsgNum while readers need to read it.
- * We deal with that by having a spinlock that readers must take for just
- * long enough to read maxMsgNum, while writers take it for just long enough
- * to write maxMsgNum. (The exact rule is that you need the spinlock to
- * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
- * spinlock to write maxMsgNum unless you are holding both locks.)
- *
- * Note: since maxMsgNum is an int and hence presumably atomically readable/
- * writable, the spinlock might seem unnecessary. The reason it is needed
- * is to provide a memory barrier: we need to be sure that messages written
- * to the array are actually there before maxMsgNum is increased, and that
- * readers will see that data after fetching maxMsgNum. Multiprocessors
- * that have weak memory-ordering guarantees can fail without the memory
- * barrier instructions that are included in the spinlock sequences.
+ * We deal with that by using atomic operations with proper memory barriers.
+ * (The exact rule is that you need the read barrier after atomic read
+ * maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * write barrier before write maxMsgNum unless you are holding both locks.)
*/
@@ -139,7 +130,7 @@ typedef struct ProcState
/* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
- int nextMsgNum; /* next message number to read */
+ uint32 nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */
bool signaled; /* backend has been sent catchup signal */
bool hasMessages; /* backend has unread messages */
@@ -167,11 +158,9 @@ typedef struct SISeg
/*
* General state information
*/
- int minMsgNum; /* oldest message still needed */
- int maxMsgNum; /* next message number to be assigned */
- int nextThreshold; /* # of messages to call SICleanupQueue */
-
- slock_t msgnumLock; /* spinlock protecting maxMsgNum */
+ uint32 minMsgNum; /* oldest message still needed */
+ pg_atomic_uint32 maxMsgNum; /* next message number to be assigned */
+ uint32 nextThreshold; /* # of messages to call SICleanupQueue */
/*
* Circular buffer holding shared-inval messages
@@ -243,9 +232,8 @@ SharedInvalShmemInit(void)
/* Clear message counters, save size of procState array, init spinlock */
shmInvalBuffer->minMsgNum = 0;
- shmInvalBuffer->maxMsgNum = 0;
+ pg_atomic_init_u32(&shmInvalBuffer->maxMsgNum, 0);
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
- SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */
@@ -303,7 +291,7 @@ SharedInvalBackendInit(bool sendOnly)
/* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid;
- stateP->nextMsgNum = segP->maxMsgNum;
+ stateP->nextMsgNum = pg_atomic_read_u32(&segP->maxMsgNum);
stateP->resetState = false;
stateP->signaled = false;
stateP->hasMessages = false;
@@ -382,8 +370,8 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
while (n > 0)
{
int nthistime = Min(n, WRITE_QUANTUM);
- int numMsgs;
- int max;
+ uint32 numMsgs;
+ uint32 max;
int i;
n -= nthistime;
@@ -399,7 +387,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
*/
for (;;)
{
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs + nthistime > MAXNUMMESSAGES ||
numMsgs >= segP->nextThreshold)
SICleanupQueue(true, nthistime);
@@ -410,17 +398,16 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
/*
* Insert new message(s) into proper slot of circular buffer
*/
- max = segP->maxMsgNum;
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
while (nthistime-- > 0)
{
segP->buffer[max % MAXNUMMESSAGES] = *data++;
max++;
}
- /* Update current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- segP->maxMsgNum = max;
- SpinLockRelease(&segP->msgnumLock);
+ /* Update current value of maxMsgNum using atomic with memory barrier */
+ pg_write_barrier();
+ pg_atomic_write_u32(&segP->maxMsgNum, max);
/*
* Now that the maxMsgNum change is globally visible, we give everyone
@@ -473,7 +460,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
{
SISeg *segP;
ProcState *stateP;
- int max;
+ uint32 max;
int n;
segP = shmInvalBuffer;
@@ -506,10 +493,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
*/
stateP->hasMessages = false;
- /* Fetch current value of maxMsgNum using spinlock */
- SpinLockAcquire(&segP->msgnumLock);
- max = segP->maxMsgNum;
- SpinLockRelease(&segP->msgnumLock);
+ /* Fetch current value of maxMsgNum using atomic with memory barrier */
+ max = pg_atomic_read_u32(&segP->maxMsgNum);
+ pg_read_barrier();
if (stateP->resetState)
{
@@ -576,11 +562,11 @@ void
SICleanupQueue(bool callerHasWriteLock, int minFree)
{
SISeg *segP = shmInvalBuffer;
- int min,
+ uint32 min,
minsig,
lowbound,
- numMsgs,
- i;
+ numMsgs;
+ int i;
ProcState *needSig = NULL;
/* Lock out all writers and readers */
@@ -595,14 +581,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* backends here it is possible for them to keep sending messages without
* a problem even when they are the only active backend.
*/
- min = segP->maxMsgNum;
- minsig = min - SIG_THRESHOLD;
- lowbound = min - MAXNUMMESSAGES + minFree;
+ min = pg_atomic_read_u32(&segP->maxMsgNum);
+ minsig = min - Min(min, SIG_THRESHOLD);
+ lowbound = min - Min(min, MAXNUMMESSAGES - minFree);
for (i = 0; i < segP->numProcs; i++)
{
ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
- int n = stateP->nextMsgNum;
+ uint32 n = stateP->nextMsgNum;
/* Ignore if already in reset state */
Assert(stateP->procPid != 0);
@@ -641,7 +627,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
if (min >= MSGNUMWRAPAROUND)
{
segP->minMsgNum -= MSGNUMWRAPAROUND;
- segP->maxMsgNum -= MSGNUMWRAPAROUND;
+ pg_atomic_sub_fetch_u32(&segP->maxMsgNum, MSGNUMWRAPAROUND);
for (i = 0; i < segP->numProcs; i++)
segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
}
@@ -650,7 +636,7 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
* Determine how many messages are still in the queue, and set the
* threshold at which we should repeat SICleanupQueue().
*/
- numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ numMsgs = pg_atomic_read_u32(&segP->maxMsgNum) - segP->minMsgNum;
if (numMsgs < CLEANUP_MIN)
segP->nextThreshold = CLEANUP_MIN;
else
--
2.43.0