Hi, We usually want to release lwlocks, and definitely spinlocks, before calling SetLatch(), to avoid putting a system call into the locked region so that we minimise the time held. There are a few places where we don't do that, possibly because it's not just a simple latch to hold a pointer to but rather a set of them that needs to be collected from some data structure and we don't have infrastructure to help with that. There are also cases where we semi-reliably create lock contention, because the backends that wake up immediately try to acquire the very same lock.
One example is heavyweight lock wakeups. If you run BEGIN; LOCK TABLE t; ... and then N other sessions wait in SELECT * FROM t;, and then you run ... COMMIT;, you'll see the first session wake all the others while it still holds the partition lock itself. They'll all wake up and begin to re-acquire the same partition lock in exclusive mode, immediately go back to sleep on *that* wait list, and then wake each other up one at a time in a chain. We could avoid the first double-bounce by not setting the latches until after we've released the partition lock. We could avoid the rest of them by not re-acquiring the partition lock at all, which ... if I'm reading right ... shouldn't actually be necessary in modern PostgreSQL? Or if there is another reason to re-acquire then maybe the comment should be updated. Presumably no one really does that repeatedly while there is a long queue of non-conflicting waiters, so I'm not claiming it's a major improvement, but it's at least a micro-optimisation. There are some other simpler mechanical changes including synchronous replication, SERIALIZABLE DEFERRABLE and condition variables (this one inspired by Yura Sokolov's patches[1]). Actually I'm not at all sure about the CV implementation, I feel like a more ambitious change is needed to make our CVs perform. See attached sketch patches. I guess the main thing that may not be good enough is the use of a fixed sized latch buffer. Memory allocation in don't-throw-here environments like the guts of lock code might be an issue, which is why it just gives up and flushes when full; maybe it should try to allocate and fall back to flushing only if that fails. These sketch patches aren't proposals, just observations in need of more study. [1] https://postgr.es/m/1edbb61981fe1d99c3f20e3d56d6c88999f4227c.camel%40postgrespro.ru
From b64d5782e2c3a2e34274a3bf9df4449afaee94dc Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 26 Oct 2022 15:51:45 +1300 Subject: [PATCH 1/8] Provide SetLatches() for batched deferred latches. If we have a way to buffer a set of wakeup targets and process them at a later time, we can: * move SetLatch() system calls out from under LWLocks, so that locks can be released faster; this is especially interesting in cases where the target backends will immediately try to acquire the same lock, or generally when the lock is heavily contended * possibly gain some micro-opimization from issuing only two memory barriers for the whole batch of latches, not two for each latch to be set * provide the opportunity for potential future latch implementation mechanisms to deliver wakeups in a single system call Individual users of this facility will follow in separate patches. --- src/backend/storage/ipc/latch.c | 187 ++++++++++++++++++------------- src/include/storage/latch.h | 13 +++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 123 insertions(+), 78 deletions(-) diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index eb3a569aae..71fdc388c8 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -576,105 +576,136 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, } /* - * Sets a latch and wakes up anyone waiting on it. - * - * This is cheap if the latch is already set, otherwise not so much. - * - * NB: when calling this in a signal handler, be sure to save and restore - * errno around it. (That's standard practice in most signal handlers, of - * course, but we used to omit it in handlers that only set a flag.) - * - * NB: this function is called from critical sections and signal handlers so - * throwing an error is not a good idea. + * Set multiple latches at the same time. + * Note: modifies input array. */ -void -SetLatch(Latch *latch) +static void +SetLatchV(Latch **latches, int nlatches) { -#ifndef WIN32 - pid_t owner_pid; -#else - HANDLE handle; -#endif - - /* - * The memory barrier has to be placed here to ensure that any flag - * variables possibly changed by this process have been flushed to main - * memory, before we check/set is_set. - */ + /* Flush any other changes out to main memory just once. */ pg_memory_barrier(); - /* Quick exit if already set */ - if (latch->is_set) - return; + /* Keep only latches that are not already set, and set them. */ + for (int i = 0; i < nlatches; ++i) + { + Latch *latch = latches[i]; - latch->is_set = true; + if (!latch->is_set) + latch->is_set = true; + else + latches[i] = NULL; + } pg_memory_barrier(); - if (!latch->maybe_sleeping) - return; + /* Wake the remaining latches that might be sleeping. */ #ifndef WIN32 - - /* - * See if anyone's waiting for the latch. It can be the current process if - * we're in a signal handler. We use the self-pipe or SIGURG to ourselves - * to wake up WaitEventSetWaitBlock() without races in that case. If it's - * another process, send a signal. - * - * Fetch owner_pid only once, in case the latch is concurrently getting - * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't - * guaranteed to be true! In practice, the effective range of pid_t fits - * in a 32 bit integer, and so should be atomic. In the worst case, we - * might end up signaling the wrong process. Even then, you're very - * unlucky if a process with that bogus pid exists and belongs to - * Postgres; and PG database processes should handle excess SIGUSR1 - * interrupts without a problem anyhow. - * - * Another sort of race condition that's possible here is for a new - * process to own the latch immediately after we look, so we don't signal - * it. This is okay so long as all callers of ResetLatch/WaitLatch follow - * the standard coding convention of waiting at the bottom of their loops, - * not the top, so that they'll correctly process latch-setting events - * that happen before they enter the loop. - */ - owner_pid = latch->owner_pid; - if (owner_pid == 0) - return; - else if (owner_pid == MyProcPid) + for (int i = 0; i < nlatches; ++i) { + Latch *latch = latches[i]; + pid_t owner_pid; + + if (!latch || !latch->maybe_sleeping) + continue; + + /* + * See if anyone's waiting for the latch. It can be the current process + * if we're in a signal handler. We use the self-pipe or SIGURG to + * ourselves to wake up WaitEventSetWaitBlock() without races in that + * case. If it's another process, send a signal. + * + * Fetch owner_pid only once, in case the latch is concurrently getting + * owned or disowned. XXX: This assumes that pid_t is atomic, which + * isn't guaranteed to be true! In practice, the effective range of + * pid_t fits in a 32 bit integer, and so should be atomic. In the + * worst case, we might end up signaling the wrong process. Even then, + * you're very unlucky if a process with that bogus pid exists and + * belongs to Postgres; and PG database processes should handle excess + * SIGURG interrupts without a problem anyhow. + * + * Another sort of race condition that's possible here is for a new + * process to own the latch immediately after we look, so we don't + * signal it. This is okay so long as all callers of + * ResetLatch/WaitLatch follow the standard coding convention of + * waiting at the bottom of their loops, not the top, so that they'll + * correctly process latch-setting events that happen before they enter + * the loop. + */ + owner_pid = latch->owner_pid; + + if (owner_pid == MyProcPid) + { + if (waiting) + { #if defined(WAIT_USE_SELF_PIPE) - if (waiting) - sendSelfPipeByte(); + sendSelfPipeByte(); #else - if (waiting) - kill(MyProcPid, SIGURG); + kill(MyProcPid, SIGURG); #endif + } + } + else + kill(owner_pid, SIGURG); } - else - kill(owner_pid, SIGURG); - #else - - /* - * See if anyone's waiting for the latch. It can be the current process if - * we're in a signal handler. - * - * Use a local variable here just in case somebody changes the event field - * concurrently (which really should not happen). - */ - handle = latch->event; - if (handle) + for (int i = 0; i < nlatches; ++i) { - SetEvent(handle); + Latch *latch = latches[i]; - /* - * Note that we silently ignore any errors. We might be in a signal - * handler or other critical path where it's not safe to call elog(). - */ + if (latch && latch->maybe_sleeping) + { + HANDLE event = latch->event; + + if (event) + SetEvent(event); + } } #endif } +/* + * Sets a latch and wakes up anyone waiting on it. + * + * This is cheap if the latch is already set, otherwise not so much. + * + * NB: when calling this in a signal handler, be sure to save and restore + * errno around it. (That's standard practice in most signal handlers, of + * course, but we used to omit it in handlers that only set a flag.) + * + * NB: this function is called from critical sections and signal handlers so + * throwing an error is not a good idea. + */ +void +SetLatch(Latch *latch) +{ + SetLatchV(&latch, 1); +} + +/* + * Add a latch to a batch, to be set later as a group. + */ +void +AddLatch(LatchBatch *batch, Latch *latch) +{ + if (batch->size == lengthof(batch->latches)) + SetLatches(batch); + + batch->latches[batch->size++] = latch; +} + +/* + * Set all the latches accumulated in 'batch'. + */ +void +SetLatches(LatchBatch *batch) +{ + if (batch->size > 0) + { + SetLatchV(batch->latches, batch->size); + batch->size = 0; + } +} + /* * Clear the latch. Calling WaitLatch after this will sleep, unless * the latch is set again before the WaitLatch call. diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 68ab740f16..0edf364637 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -118,6 +118,17 @@ typedef struct Latch #endif } Latch; +#define LATCH_BATCH_SIZE 64 + +/* + * Container for setting multiple latches at a time. + */ +typedef struct LatchBatch +{ + int size; + Latch *latches[LATCH_BATCH_SIZE]; +} LatchBatch; + /* * Bitmasks for events that may wake-up WaitLatch(), WaitLatchOrSocket(), or * WaitEventSetWait(). @@ -163,6 +174,8 @@ extern void InitSharedLatch(Latch *latch); extern void OwnLatch(Latch *latch); extern void DisownLatch(Latch *latch); extern void SetLatch(Latch *latch); +extern void AddLatch(LatchBatch * batch, Latch *latch); +extern void SetLatches(LatchBatch * batch); extern void ResetLatch(Latch *latch); extern void ShutdownLatchSupport(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 2f02cc8f42..f05c41043d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1389,6 +1389,7 @@ LagTracker LargeObjectDesc LastAttnumInfo Latch +LatchBatch LerpFunc LexDescr LexemeEntry -- 2.35.1
From 7d68b9bad4e172fc5d56df8630af2c22986b4f81 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 26 Oct 2022 17:36:34 +1300 Subject: [PATCH 2/8] Use SetLatches() for condition variables. Drain condition variable wait lists in larger batches, not one at a time, while broadcasting. This is an idea from Yura Sokolov, which I've now combined with SetLatches() for slightly more efficiency. Since we're now performing loops, change the internal spinlock to an lwlock. XXX There is probably a better data structure/arrangement that would allow us to hold the lock for a shorter time. This patch is not very satisfying yet. Discussion: https://postgr.es/m/1edbb61981fe1d99c3f20e3d56d6c88999f4227c.camel%40postgrespro.ru --- src/backend/storage/lmgr/condition_variable.c | 97 +++++++++---------- src/backend/storage/lmgr/lwlock.c | 2 + src/include/storage/condition_variable.h | 4 +- src/include/storage/lwlock.h | 1 + 4 files changed, 49 insertions(+), 55 deletions(-) diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index de65dac3ae..4b9749ecdd 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -36,7 +36,7 @@ static ConditionVariable *cv_sleep_target = NULL; void ConditionVariableInit(ConditionVariable *cv) { - SpinLockInit(&cv->mutex); + LWLockInitialize(&cv->mutex, LWTRANCHE_CONDITION_VARIABLE); proclist_init(&cv->wakeup); } @@ -74,9 +74,9 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) cv_sleep_target = cv; /* Add myself to the wait queue. */ - SpinLockAcquire(&cv->mutex); + LWLockAcquire(&cv->mutex, LW_EXCLUSIVE); proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink); - SpinLockRelease(&cv->mutex); + LWLockRelease(&cv->mutex); } /* @@ -180,13 +180,13 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, * by something other than ConditionVariableSignal; though we don't * guarantee not to return spuriously, we'll avoid this obvious case. */ - SpinLockAcquire(&cv->mutex); + LWLockAcquire(&cv->mutex, LW_EXCLUSIVE); if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) { done = true; proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink); } - SpinLockRelease(&cv->mutex); + LWLockRelease(&cv->mutex); /* * Check for interrupts, and return spuriously if that caused the @@ -233,12 +233,12 @@ ConditionVariableCancelSleep(void) if (cv == NULL) return; - SpinLockAcquire(&cv->mutex); + LWLockAcquire(&cv->mutex, LW_EXCLUSIVE); if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink); else signaled = true; - SpinLockRelease(&cv->mutex); + LWLockRelease(&cv->mutex); /* * If we've received a signal, pass it on to another waiting process, if @@ -265,10 +265,10 @@ ConditionVariableSignal(ConditionVariable *cv) PGPROC *proc = NULL; /* Remove the first process from the wakeup queue (if any). */ - SpinLockAcquire(&cv->mutex); + LWLockAcquire(&cv->mutex, LW_EXCLUSIVE); if (!proclist_is_empty(&cv->wakeup)) proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink); - SpinLockRelease(&cv->mutex); + LWLockRelease(&cv->mutex); /* If we found someone sleeping, set their latch to wake them up. */ if (proc != NULL) @@ -287,6 +287,7 @@ ConditionVariableBroadcast(ConditionVariable *cv) { int pgprocno = MyProc->pgprocno; PGPROC *proc = NULL; + bool inserted_sentinel = false; bool have_sentinel = false; /* @@ -313,52 +314,42 @@ ConditionVariableBroadcast(ConditionVariable *cv) if (cv_sleep_target != NULL) ConditionVariableCancelSleep(); - /* - * Inspect the state of the queue. If it's empty, we have nothing to do. - * If there's exactly one entry, we need only remove and signal that - * entry. Otherwise, remove the first entry and insert our sentinel. - */ - SpinLockAcquire(&cv->mutex); - /* While we're here, let's assert we're not in the list. */ - Assert(!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink)); - - if (!proclist_is_empty(&cv->wakeup)) + do { - proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink); - if (!proclist_is_empty(&cv->wakeup)) - { - proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink); - have_sentinel = true; - } - } - SpinLockRelease(&cv->mutex); - - /* Awaken first waiter, if there was one. */ - if (proc != NULL) - SetLatch(&proc->procLatch); + int max_batch_size = Min(LATCH_BATCH_SIZE, 8); /* XXX */ + LatchBatch batch = {0}; - while (have_sentinel) - { - /* - * Each time through the loop, remove the first wakeup list entry, and - * signal it unless it's our sentinel. Repeat as long as the sentinel - * remains in the list. - * - * Notice that if someone else removes our sentinel, we will waken one - * additional process before exiting. That's intentional, because if - * someone else signals the CV, they may be intending to waken some - * third process that added itself to the list after we added the - * sentinel. Better to give a spurious wakeup (which should be - * harmless beyond wasting some cycles) than to lose a wakeup. - */ - proc = NULL; - SpinLockAcquire(&cv->mutex); - if (!proclist_is_empty(&cv->wakeup)) + LWLockAcquire(&cv->mutex, LW_EXCLUSIVE); + while (!proclist_is_empty(&cv->wakeup)) + { + if (batch.size == max_batch_size) + { + if (!inserted_sentinel) + { + /* First batch of many. Add sentinel. */ + proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink); + inserted_sentinel = true; + have_sentinel = true; + } + break; + } proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink); - have_sentinel = proclist_contains(&cv->wakeup, pgprocno, cvWaitLink); - SpinLockRelease(&cv->mutex); + if (proc == MyProc) + { + /* We hit our sentinel. We're done. */ + have_sentinel = false; + break; + } + else if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink)) + { + /* Someone else hit our sentinel. We're done. */ + have_sentinel = false; + } + AddLatch(&batch, &proc->procLatch); + } + LWLockRelease(&cv->mutex); - if (proc != NULL && proc != MyProc) - SetLatch(&proc->procLatch); - } + /* Awaken this batch of waiters, if there were some. */ + SetLatches(&batch); + } while (have_sentinel); } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index d274c9b1dc..03186da0a3 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -183,6 +183,8 @@ static const char *const BuiltinTrancheNames[] = { "PgStatsHash", /* LWTRANCHE_PGSTATS_DATA: */ "PgStatsData", + /* LWTRANCHE_CONDITION_VARIABLE: */ + "ConditionVariable", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h index e89175ebd5..82b8eeabf5 100644 --- a/src/include/storage/condition_variable.h +++ b/src/include/storage/condition_variable.h @@ -22,12 +22,12 @@ #ifndef CONDITION_VARIABLE_H #define CONDITION_VARIABLE_H +#include "storage/lwlock.h" #include "storage/proclist_types.h" -#include "storage/spin.h" typedef struct { - slock_t mutex; /* spinlock protecting the wakeup list */ + LWLock mutex; /* lock protecting the wakeup list */ proclist_head wakeup; /* list of wake-able processes */ } ConditionVariable; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index ca4eca76f4..dcbffe11a2 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -193,6 +193,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DSA, LWTRANCHE_PGSTATS_HASH, LWTRANCHE_PGSTATS_DATA, + LWTRANCHE_CONDITION_VARIABLE, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; -- 2.35.1
From 1dca369e595f1259736a015efcf13b1041d7780d Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Wed, 26 Oct 2022 16:43:31 +1300 Subject: [PATCH 3/8] Use SetLatches() for heavyweight locks. Collect wakeups into a LatchBatch to be set at once after the LockManager's internal partition lock is released. This avoids holding busy locks while issuing system calls. Currently, waiters immediately try to acquire that lock themselves, so deferring until it's released makes sense to avoid contention (a later patch may remove that lock acquisition though). --- src/backend/storage/lmgr/deadlock.c | 4 ++-- src/backend/storage/lmgr/lock.c | 36 +++++++++++++++++++---------- src/backend/storage/lmgr/proc.c | 29 ++++++++++++++--------- src/include/storage/lock.h | 5 ++-- src/include/storage/proc.h | 5 ++-- 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index cd9c0418ec..1b7454a8fe 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -214,7 +214,7 @@ InitDeadLockChecking(void) * and (b) we are typically invoked inside a signal handler. */ DeadLockState -DeadLockCheck(PGPROC *proc) +DeadLockCheck(PGPROC *proc, LatchBatch *wakeups) { int i, j; @@ -272,7 +272,7 @@ DeadLockCheck(PGPROC *proc) #endif /* See if any waiters for the lock can be woken up now */ - ProcLockWakeup(GetLocksMethodTable(lock), lock); + ProcLockWakeup(GetLocksMethodTable(lock), lock, wakeups); } /* Return code tells caller if we had to escape a deadlock or not */ diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 3d1049cf75..f59ae20069 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -374,14 +374,14 @@ static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); static void FinishStrongLockAcquire(void); -static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); +static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups); static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock); static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, PROCLOCK *proclock, LockMethod lockMethodTable); static void CleanUpLock(LOCK *lock, PROCLOCK *proclock, LockMethod lockMethodTable, uint32 hashcode, - bool wakeupNeeded); + bool wakeupNeeded, LatchBatch *wakeups); static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, LOCKTAG *locktag, LOCKMODE lockmode, bool decrement_strong_lock_count); @@ -787,6 +787,7 @@ LockAcquireExtended(const LOCKTAG *locktag, LWLock *partitionLock; bool found_conflict; bool log_lock = false; + LatchBatch wakeups = {0}; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); @@ -1098,7 +1099,7 @@ LockAcquireExtended(const LOCKTAG *locktag, locktag->locktag_type, lockmode); - WaitOnLock(locallock, owner); + WaitOnLock(locallock, owner, &wakeups); TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, @@ -1138,6 +1139,8 @@ LockAcquireExtended(const LOCKTAG *locktag, LWLockRelease(partitionLock); + SetLatches(&wakeups); + /* * Emit a WAL record if acquisition of this lock needs to be replayed in a * standby server. @@ -1634,7 +1637,7 @@ UnGrantLock(LOCK *lock, LOCKMODE lockmode, static void CleanUpLock(LOCK *lock, PROCLOCK *proclock, LockMethod lockMethodTable, uint32 hashcode, - bool wakeupNeeded) + bool wakeupNeeded, LatchBatch *wakeups) { /* * If this was my last hold on this lock, delete my entry in the proclock @@ -1674,7 +1677,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock, else if (wakeupNeeded) { /* There are waiters on this lock, so wake them up. */ - ProcLockWakeup(lockMethodTable, lock); + ProcLockWakeup(lockMethodTable, lock, wakeups); } } @@ -1811,7 +1814,7 @@ MarkLockClear(LOCALLOCK *locallock) * The appropriate partition lock must be held at entry. */ static void -WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) +WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups) { LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); LockMethod lockMethodTable = LockMethods[lockmethodid]; @@ -1856,7 +1859,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) */ PG_TRY(); { - if (ProcSleep(locallock, lockMethodTable) != PROC_WAIT_STATUS_OK) + if (ProcSleep(locallock, lockMethodTable, wakeups) != PROC_WAIT_STATUS_OK) { /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit @@ -1915,7 +1918,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) * NB: this does not clean up any locallock object that may exist for the lock. */ void -RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) +RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode, LatchBatch *wakeups) { LOCK *waitLock = proc->waitLock; PROCLOCK *proclock = proc->waitProcLock; @@ -1957,7 +1960,7 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) */ CleanUpLock(waitLock, proclock, LockMethods[lockmethodid], hashcode, - true); + true, wakeups); } /* @@ -1982,6 +1985,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) PROCLOCK *proclock; LWLock *partitionLock; bool wakeupNeeded; + LatchBatch wakeups = {0}; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); @@ -2160,10 +2164,12 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) CleanUpLock(lock, proclock, lockMethodTable, locallock->hashcode, - wakeupNeeded); + wakeupNeeded, &wakeups); LWLockRelease(partitionLock); + SetLatches(&wakeups); + RemoveLocalLock(locallock); return true; } @@ -2188,6 +2194,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) PROCLOCK *proclock; int partition; bool have_fast_path_lwlock = false; + LatchBatch wakeups = {0}; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); @@ -2434,10 +2441,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) CleanUpLock(lock, proclock, lockMethodTable, LockTagHashCode(&lock->tag), - wakeupNeeded); + wakeupNeeded, &wakeups); } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); + + SetLatches(&wakeups); } /* loop over partitions */ #ifdef LOCK_DEBUG @@ -3137,6 +3146,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, uint32 proclock_hashcode; LWLock *partitionLock; bool wakeupNeeded; + LatchBatch wakeups = {0}; hashcode = LockTagHashCode(locktag); partitionLock = LockHashPartitionLock(hashcode); @@ -3190,10 +3200,12 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, CleanUpLock(lock, proclock, lockMethodTable, hashcode, - wakeupNeeded); + wakeupNeeded, &wakeups); LWLockRelease(partitionLock); + SetLatches(&wakeups); + /* * Decrement strong lock count. This logic is needed only for 2PC. */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 13fa07b0ff..f0e19b74a7 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -700,6 +700,7 @@ LockErrorCleanup(void) { LWLock *partitionLock; DisableTimeoutParams timeouts[2]; + LatchBatch wakeups = {0}; HOLD_INTERRUPTS(); @@ -733,7 +734,7 @@ LockErrorCleanup(void) if (MyProc->links.next != NULL) { /* We could not have been granted the lock yet */ - RemoveFromWaitQueue(MyProc, lockAwaited->hashcode); + RemoveFromWaitQueue(MyProc, lockAwaited->hashcode, &wakeups); } else { @@ -750,6 +751,7 @@ LockErrorCleanup(void) lockAwaited = NULL; LWLockRelease(partitionLock); + SetLatches(&wakeups); RESUME_INTERRUPTS(); } @@ -1042,7 +1044,7 @@ ProcQueueInit(PROC_QUEUE *queue) * NOTES: The process queue is now a priority queue for locking. */ ProcWaitStatus -ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) +ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) { LOCKMODE lockmode = locallock->tag.mode; LOCK *lock = locallock->lock; @@ -1188,7 +1190,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (early_deadlock) { - RemoveFromWaitQueue(MyProc, hashcode); + RemoveFromWaitQueue(MyProc, hashcode, wakeups); return PROC_WAIT_STATUS_ERROR; } @@ -1204,6 +1206,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * LockErrorCleanup will clean up if cancel/die happens. */ LWLockRelease(partitionLock); + SetLatches(wakeups); /* * Also, now that we will successfully clean up after an ereport, it's @@ -1662,8 +1665,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * to twiddle the lock's request counts too --- see RemoveFromWaitQueue. * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK. */ -PGPROC * -ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) +static PGPROC * +ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus, LatchBatch *wakeups) { PGPROC *retProc; @@ -1686,8 +1689,8 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) proc->waitStatus = waitStatus; pg_atomic_write_u64(&MyProc->waitStart, 0); - /* And awaken it */ - SetLatch(&proc->procLatch); + /* Schedule it to be awoken */ + AddLatch(wakeups, &proc->procLatch); return retProc; } @@ -1700,7 +1703,7 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) * The appropriate lock partition lock must be held by caller. */ void -ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) +ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock, LatchBatch *wakeups) { PROC_QUEUE *waitQueue = &(lock->waitProcs); int queue_size = waitQueue->size; @@ -1728,7 +1731,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) { /* OK to waken */ GrantLock(lock, proc->waitProcLock, lockmode); - proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK); + proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK, wakeups); /* * ProcWakeup removes proc from the lock's waiting process queue @@ -1762,6 +1765,7 @@ static void CheckDeadLock(void) { int i; + LatchBatch wakeups = {0}; /* * Acquire exclusive lock on the entire shared lock data structures. Must @@ -1796,7 +1800,7 @@ CheckDeadLock(void) #endif /* Run the deadlock check, and set deadlock_state for use by ProcSleep */ - deadlock_state = DeadLockCheck(MyProc); + deadlock_state = DeadLockCheck(MyProc, &wakeups); if (deadlock_state == DS_HARD_DEADLOCK) { @@ -1813,7 +1817,8 @@ CheckDeadLock(void) * return from the signal handler. */ Assert(MyProc->waitLock != NULL); - RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag))); + RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)), + &wakeups); /* * We're done here. Transaction abort caused by the error that @@ -1837,6 +1842,8 @@ CheckDeadLock(void) check_done: for (i = NUM_LOCK_PARTITIONS; --i >= 0;) LWLockRelease(LockHashPartitionLockByIndex(i)); + + SetLatches(&wakeups); } /* diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index e4e1495b24..163bf41293 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -19,6 +19,7 @@ #endif #include "storage/backendid.h" +#include "storage/latch.h" #include "storage/lockdefs.h" #include "storage/lwlock.h" #include "storage/shmem.h" @@ -575,7 +576,7 @@ extern bool LockCheckConflicts(LockMethod lockMethodTable, LOCK *lock, PROCLOCK *proclock); extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode); extern void GrantAwaitedLock(void); -extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode); +extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode, LatchBatch *wakeups); extern Size LockShmemSize(void); extern LockData *GetLockStatusData(void); extern BlockedProcsData *GetBlockerStatusData(int blocked_pid); @@ -592,7 +593,7 @@ extern void lock_twophase_postabort(TransactionId xid, uint16 info, extern void lock_twophase_standby_recover(TransactionId xid, uint16 info, void *recdata, uint32 len); -extern DeadLockState DeadLockCheck(PGPROC *proc); +extern DeadLockState DeadLockCheck(PGPROC *proc, LatchBatch *wakeups); extern PGPROC *GetBlockingAutoVacuumPgproc(void); extern void DeadLockReport(void) pg_attribute_noreturn(); extern void RememberSimpleDeadLock(PGPROC *proc1, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 8d096fdeeb..4dba46cfcb 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -449,9 +449,8 @@ extern bool HaveNFreeProcs(int n); extern void ProcReleaseLocks(bool isCommit); extern void ProcQueueInit(PROC_QUEUE *queue); -extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); -extern PGPROC *ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus); -extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); +extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups); +extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock, LatchBatch *wakeups); extern void CheckDeadLockAlert(void); extern bool IsWaitingForLock(void); extern void LockErrorCleanup(void); -- 2.35.1
From 2cef9eb9236a19845f967de3f392124016bbbba9 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 27 Oct 2022 10:56:39 +1300 Subject: [PATCH 4/8] Don't re-acquire LockManager partition lock after wakeup. Change the contract of WaitOnLock() and ProcSleep() to return with the partition lock released. ProcSleep() re-acquired the lock to prevent interrupts, which was a problem at the time the ancestor of that code was committed in fe548629c50, because then we had signal handlers that longjmp'd out of there. Now, that can happen only at points where we explicitly run CHECK_FOR_INTERRUPTS(), so we can remove the lock, which leads to the idea of making the function always release. While an earlier commit fixed the problem that the backend woke us up before it had even released the lock itself, this lock reacquisition point was still contended when multiple backends woke at the same time. XXX Right? Or what am I missing? --- src/backend/storage/lmgr/lock.c | 18 ++++++++++++------ src/backend/storage/lmgr/proc.c | 20 ++++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index f59ae20069..42c244f5fb 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -787,6 +787,7 @@ LockAcquireExtended(const LOCKTAG *locktag, LWLock *partitionLock; bool found_conflict; bool log_lock = false; + bool partitionLocked = false; LatchBatch wakeups = {0}; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) @@ -995,6 +996,7 @@ LockAcquireExtended(const LOCKTAG *locktag, partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); + partitionLocked = true; /* * Find or create lock and proclock entries with this tag @@ -1099,7 +1101,10 @@ LockAcquireExtended(const LOCKTAG *locktag, locktag->locktag_type, lockmode); + Assert(LWLockHeldByMeInMode(partitionLock, LW_EXCLUSIVE)); WaitOnLock(locallock, owner, &wakeups); + Assert(!LWLockHeldByMeInMode(partitionLock, LW_EXCLUSIVE)); + partitionLocked = false; TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, @@ -1124,7 +1129,6 @@ LockAcquireExtended(const LOCKTAG *locktag, PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ - LWLockRelease(partitionLock); elog(ERROR, "LockAcquire failed"); } PROCLOCK_PRINT("LockAcquire: granted", proclock); @@ -1137,9 +1141,11 @@ LockAcquireExtended(const LOCKTAG *locktag, */ FinishStrongLockAcquire(); - LWLockRelease(partitionLock); - - SetLatches(&wakeups); + if (partitionLocked) + { + LWLockRelease(partitionLock); + SetLatches(&wakeups); + } /* * Emit a WAL record if acquisition of this lock needs to be replayed in a @@ -1811,7 +1817,8 @@ MarkLockClear(LOCALLOCK *locallock) * Caller must have set MyProc->heldLocks to reflect locks already held * on the lockable object by this process. * - * The appropriate partition lock must be held at entry. + * The appropriate partition lock must be held at entry. It is not held on + * exit. */ static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups) @@ -1868,7 +1875,6 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, LatchBatch *wakeups) awaitedLock = NULL; LOCK_PRINT("WaitOnLock: aborting on lock", locallock->lock, locallock->tag.mode); - LWLockRelease(LockHashPartitionLock(locallock->hashcode)); /* * Now that we aren't holding the partition lock, we can give an diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index f0e19b74a7..2a03cd4b1f 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -1033,7 +1033,7 @@ ProcQueueInit(PROC_QUEUE *queue) * Caller must have set MyProc->heldLocks to reflect locks already held * on the lockable object by this process (under all XIDs). * - * The lock table's partition lock must be held at entry, and will be held + * The lock table's partition lock must be held at entry, and will be released * at exit. * * Result: PROC_WAIT_STATUS_OK if we acquired the lock, PROC_WAIT_STATUS_ERROR if not (deadlock). @@ -1062,6 +1062,12 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) PGPROC *leader = MyProc->lockGroupLeader; int i; + /* + * Every way out of this function will release the partition lock and send + * buffered latch wakeups. + */ + Assert(LWLockHeldByMeInMode(partitionLock, LW_EXCLUSIVE)); + /* * If group locking is in use, locks held by members of my locking group * need to be included in myHeldLocks. This is not required for relation @@ -1148,6 +1154,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) /* Skip the wait and just grant myself the lock. */ GrantLock(lock, proclock, lockmode); GrantAwaitedLock(); + LWLockRelease(partitionLock); + SetLatches(wakeups); return PROC_WAIT_STATUS_OK; } /* Break out of loop to put myself before him */ @@ -1191,6 +1199,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) if (early_deadlock) { RemoveFromWaitQueue(MyProc, hashcode, wakeups); + LWLockRelease(partitionLock); + SetLatches(wakeups); return PROC_WAIT_STATUS_ERROR; } @@ -1525,6 +1535,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) } LWLockRelease(partitionLock); + SetLatches(wakeups); if (deadlock_state == DS_SOFT_DEADLOCK) ereport(LOG, @@ -1626,13 +1637,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable, LatchBatch *wakeups) standbyWaitStart, GetCurrentTimestamp(), NULL, false); - /* - * Re-acquire the lock table's partition lock. We have to do this to hold - * off cancel/die interrupts before we can mess with lockAwaited (else we - * might have a missed or duplicated locallock update). - */ - LWLockAcquire(partitionLock, LW_EXCLUSIVE); - /* * We no longer want LockErrorCleanup to do anything. */ -- 2.35.1
From e1ab9a2f56127c3731c0d5e0aa4fd0fc8243cdfa Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 27 Oct 2022 12:40:12 +1300 Subject: [PATCH 5/8] Use SetLatches() for SERIALIZABLE DEFERRABLE wakeups. Don't issue SetLatch()'s system call while holding a highly contended LWLock. Collect them in a LatchBatch to be set after the lock is released. Once woken, other backends will immediately try to acquire that lock anyway so it's better to wake after releasing. Take this opportunity to retire the confusingly named ProcSendSignal() and replace it with something that gives the latch pointer we need. There was only one other caller, in bufmgr.c, which is easily changed. --- src/backend/storage/buffer/bufmgr.c | 2 +- src/backend/storage/ipc/procsignal.c | 2 -- src/backend/storage/lmgr/predicate.c | 5 ++++- src/backend/storage/lmgr/proc.c | 12 ------------ src/include/storage/proc.h | 4 +++- 5 files changed, 8 insertions(+), 17 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6b95381481..4d754caa30 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1918,7 +1918,7 @@ UnpinBuffer(BufferDesc *buf) buf_state &= ~BM_PIN_COUNT_WAITER; UnlockBufHdr(buf, buf_state); - ProcSendSignal(wait_backend_pgprocno); + SetLatch(GetProcLatchByNumber(wait_backend_pgprocno)); } else UnlockBufHdr(buf, buf_state); diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 7767657f27..280e4b91dd 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -254,8 +254,6 @@ CleanupProcSignalState(int status, Datum arg) * * On success (a signal was sent), zero is returned. * On error, -1 is returned, and errno is set (typically to ESRCH or EPERM). - * - * Not to be confused with ProcSendSignal */ int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId) diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e8120174d6..a9e9fa01e7 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -3339,6 +3339,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) nextConflict, possibleUnsafeConflict; SERIALIZABLEXACT *roXact; + LatchBatch wakeups = {0}; /* * We can't trust XactReadOnly here, because a transaction which started @@ -3672,7 +3673,7 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) */ if (SxactIsDeferrableWaiting(roXact) && (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact))) - ProcSendSignal(roXact->pgprocno); + AddLatch(&wakeups, GetProcLatchByNumber(roXact->pgprocno)); possibleUnsafeConflict = nextConflict; } @@ -3698,6 +3699,8 @@ ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe) LWLockRelease(SerializableXactHashLock); + SetLatches(&wakeups); + LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE); /* Add this to the list of transactions to check for later cleanup. */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2a03cd4b1f..c644e7351d 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -1890,18 +1890,6 @@ ProcWaitForSignal(uint32 wait_event_info) CHECK_FOR_INTERRUPTS(); } -/* - * ProcSendSignal - set the latch of a backend identified by pgprocno - */ -void -ProcSendSignal(int pgprocno) -{ - if (pgprocno < 0 || pgprocno >= ProcGlobal->allProcCount) - elog(ERROR, "pgprocno out of range"); - - SetLatch(&ProcGlobal->allProcs[pgprocno].procLatch); -} - /* * BecomeLockGroupLeader - designate process as lock group leader * diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 4dba46cfcb..735e863ed1 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -413,6 +413,9 @@ extern PGDLLIMPORT PGPROC *PreparedXactProcs; /* Accessor for PGPROC given a pgprocno. */ #define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)]) +/* Accessor for procLatch given a pgprocno. */ +#define GetProcLatchByNumber(n) (&(GetPGProcByNumber(n))->procLatch) + /* * We set aside some extra PGPROC structures for auxiliary processes, * ie things that aren't full-fledged backends but need shmem access. @@ -456,7 +459,6 @@ extern bool IsWaitingForLock(void); extern void LockErrorCleanup(void); extern void ProcWaitForSignal(uint32 wait_event_info); -extern void ProcSendSignal(int pgprocno); extern PGPROC *AuxiliaryPidGetProc(int pid); -- 2.35.1
From 351ce81b4d1b2a61fcfcc9902d7122e5410c8b63 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 27 Oct 2022 12:59:45 +1300 Subject: [PATCH 6/8] Use SetLatches() for synchronous replication wakeups. Don't issue SetLatch() system calls while holding the central synchronous replication lock. --- src/backend/replication/syncrep.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 1a022b11a0..31cf05ed2f 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -100,7 +100,7 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, LatchBatch *wakeups); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, @@ -449,6 +449,7 @@ SyncRepReleaseWaiters(void) int numwrite = 0; int numflush = 0; int numapply = 0; + LatchBatch wakeups = {0}; /* * If this WALSender is serving a standby that is not on the list of @@ -518,21 +519,23 @@ SyncRepReleaseWaiters(void) if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) { walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, &wakeups); } if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, &wakeups); } if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) { walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; - numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, &wakeups); } LWLockRelease(SyncRepLock); + SetLatches(&wakeups); + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", numwrite, LSN_FORMAT_ARGS(writePtr), numflush, LSN_FORMAT_ARGS(flushPtr), @@ -876,7 +879,7 @@ SyncRepGetStandbyPriority(void) * The caller must hold SyncRepLock in exclusive mode. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, LatchBatch *wakeups) { volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; @@ -929,7 +932,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Wake only when we have set state and removed from queue. */ - SetLatch(&(thisproc->procLatch)); + AddLatch(wakeups, &thisproc->procLatch); numprocs++; } @@ -951,6 +954,8 @@ SyncRepUpdateSyncStandbysDefined(void) if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) { + LatchBatch wakeups = {0}; + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* @@ -963,7 +968,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, &wakeups); } /* @@ -976,6 +981,8 @@ SyncRepUpdateSyncStandbysDefined(void) WalSndCtl->sync_standbys_defined = sync_standbys_defined; LWLockRelease(SyncRepLock); + + SetLatches(&wakeups); } } -- 2.35.1