Hi, On Sat, Oct 4, 2025 at 10:25 AM Xuneng Zhou <[email protected]> wrote: > > Hi, > > On Fri, Oct 3, 2025 at 9:50 PM Xuneng Zhou <[email protected]> wrote: > > > > Hi Michael, > > > > Thanks for your review. > > > > On Fri, Oct 3, 2025 at 2:24 PM Michael Paquier <[email protected]> wrote: > > > > > > On Thu, Oct 02, 2025 at 11:06:14PM +0800, Xuneng Zhou wrote: > > > > v5-0002 separates the waitlsn_cmp() comparator function into two > > > > distinct > > > > functions (waitlsn_replay_cmp and waitlsn_flush_cmp) for the replay > > > > and flush heaps, respectively. > > > > > > The primary goal that you want to achieve here is a replacement of the > > > wait/sleep logic of read_local_xlog_page_guts() with a condition > > > variable, and design a new facility to make the callback more > > > responsive on polls. That's a fine idea in itself. However I would > > > suggest to implement something that does not depend entirely on WAIT > > > FOR, which is how your patch is presented. Instead of having your > > > patch depend on an entirely different feature, it seems to me that you > > > should try to extract from this other feature the basics that you are > > > looking for, and make them shared between the WAIT FOR patch and what > > > you are trying to achieve here. You should not need something as > > > complex as what the other feature needs for a page read callback in > > > the backend. > > > > > > At the end, I suspect that you will reuse a slight portion of it (or > > > perhaps nothing at all, actually, but I did not look at the full scope > > > of it). You should try to present your patch so as it is in a > > > reviewable state, so as others would be able to read it and understand > > > it. WAIT FOR is much more complex than what you want to do here > > > because it covers larger areas of the code base and needs to worry > > > about more cases. So, you should implement things so as the basic > > > pieces you want to build on top of are simpler, not more complicated. > > > Simpler means easier to review and easier to catch problems, designed > > > in a way that depends on how you want to fix your problem, not > > > designed in a way that depends on how a completely different feature > > > deals with its own problems. > > > > The core infrastructure shared by both this patch and the WAIT FOR > > command patch is primarily in xlogwait.c, which provides the mechanism > > for waiting until a given LSN is reached. Other parts of the code in > > the WAIT FOR patch—covering the SQL command implementation, > > documentation, and tests—is not relevant for the current patch. What > > we need is only the infrastructure in xlogwait.c, on which we can > > implement the optimization for read_local_xlog_page_guts. > > > > Regarding complexity: the initial optimization idea was to introduce > > condition-variable based waiting, as Heikki suggested in his comment: > > > > /* > > * Loop waiting for xlog to be available if necessary > > * > > * TODO: The walsender has its own version of this function, which uses a > > * condition variable to wake up whenever WAL is flushed. We could use the > > * same infrastructure here, instead of the check/sleep/repeat style of > > * loop. > > */ > > > > I reviewed the relevant code in WalSndWakeup and WalSndWait. While > > these mechanisms reduce polling overhead, they don’t prevent false > > wakeups. Addressing that would likely require a request queue that > > maps waiters to their target LSNs and issues targeted wakeups—a much > > more complex design. Given that read_local_xlog_page_guts is not as > > performance-sensitive as its equivalents, this added complexity may > > not be justified. So I implemented the initial version of the > > optimization like WalSndWakeup and WalSndWait. > > > > After this, I came across the WAIT FOR patch in the mailing list and > > noticed that the infrastructure in xlogwait.c aligns well with our > > needs. Based on that, I built the current patch using this shared > > infra. > > > > If we prioritise simplicity and can tolerate occasional false wakeups, > > then waiting in read_local_xlog_page_guts can be implemented in a much > > simpler way than the current version. At the same time, the WAIT FOR > > command seems to be a valuable feature in its own right, and both > > patches can naturally share the same infrastructure. We could also > > extract the infra and implement the current patch on it, then Wait for > > could utilize it as well. Personally, I don’t have a strong preference > > between the two approaches. > > > > Another potential use for this infra could be static XLogRecPtr > WalSndWaitForWal(XLogRecPtr loc), I'm planning to hack a version as > well. > > Best, > Xuneng
v6 refactors and extends the infrastructure from the WAIT FOR command patch, applying it to read_local_xlog_page_guts. I'm also thinking of creating a standalone patch/commit for the extended infra in xlogwait, so it can be reused in different threads. Best, Xuneng
From a6b3ce216180b8d9f2570d46dc9a767b626d5cda Mon Sep 17 00:00:00 2001 From: alterego655 <[email protected]> Date: Sat, 4 Oct 2025 14:52:11 +0800 Subject: [PATCH v6] Replace inefficient polling loops in read_local_xlog_page_guts with latch-based waiting when WAL data is not yet available. This eliminates CPU-intensive busy waiting and improves responsiveness by waking processes immediately when their target LSN becomes available. The queue of waiters is stored in the shared memory as an LSN-ordered pairing heap, where the waiter with the nearest LSN stays on the top. During the replay or flush of WAL, waiters whose LSNs have already been replayed or flushed are deleted from the shared memory pairing heap and woken up by setting their latches. Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru Author: Xuneng Zhou <[email protected]> Author: Kartyshov Ivan <[email protected]> Author: Alexander Korotkov <[email protected]> Reviewed-by: Michael Paquier <[email protected]> Reviewed-by: Peter Eisentraut <[email protected]> Reviewed-by: Dilip Kumar <[email protected]> Reviewed-by: Amit Kapila <[email protected]> Reviewed-by: Alexander Lakhin <[email protected]> Reviewed-by: Bharath Rupireddy <[email protected]> Reviewed-by: Euler Taveira <[email protected]> Reviewed-by: Heikki Linnakangas <[email protected]> Reviewed-by: Kyotaro Horiguchi <[email protected]> --- src/backend/access/transam/Makefile | 3 +- src/backend/access/transam/meson.build | 1 + src/backend/access/transam/xact.c | 6 + src/backend/access/transam/xlog.c | 25 + src/backend/access/transam/xlogrecovery.c | 11 + src/backend/access/transam/xlogutils.c | 48 +- src/backend/access/transam/xlogwait.c | 551 ++++++++++++++++++ src/backend/lib/pairingheap.c | 18 +- src/backend/replication/walsender.c | 4 - src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/lmgr/proc.c | 6 + .../utils/activity/wait_event_names.txt | 3 + src/include/access/xlogwait.h | 113 ++++ src/include/lib/pairingheap.h | 3 + src/include/storage/lwlocklist.h | 1 + 15 files changed, 781 insertions(+), 15 deletions(-) create mode 100644 src/backend/access/transam/xlogwait.c create mode 100644 src/include/access/xlogwait.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 661c55a9db7..a32f473e0a2 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -36,7 +36,8 @@ OBJS = \ xlogreader.o \ xlogrecovery.o \ xlogstats.o \ - xlogutils.o + xlogutils.o \ + xlogwait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index e8ae9b13c8e..74a62ab3eab 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -24,6 +24,7 @@ backend_sources += files( 'xlogrecovery.c', 'xlogstats.c', 'xlogutils.c', + 'xlogwait.c', ) # used by frontend programs to build a frontend xlogreader diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2cf3d4e92b7..092e197eba3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2843,6 +2844,11 @@ AbortTransaction(void) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index eceab341255..cff53106f76 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,6 +62,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -2912,6 +2913,15 @@ XLogFlush(XLogRecPtr record) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN))) + WaitLSNWakeupFlush(LogwrtResult.Flush); + /* * If we still haven't flushed to the request point then we have a * problem; most likely, the requested flush point is past end of XLOG. @@ -3094,6 +3104,15 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + /* + * If we flushed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (LogwrtResult.Flush >= pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN))) + WaitLSNWakeupFlush(LogwrtResult.Flush); + /* * Great, done. To take some work off the critical path, try to initialize * as many of the no-longer-needed WAL buffers for future use as we can. @@ -6225,6 +6244,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before reaching the target LSN. + */ + WaitLSNWakeupReplay(InvalidXLogRecPtr); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 52ff4d119e6..1859d2084e8 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,6 +40,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1838,6 +1839,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN))) + WaitLSNWakeupReplay(XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 38176d9688e..0ea02a45c6b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -23,6 +23,7 @@ #include "access/xlogrecovery.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/smgr.h" @@ -880,12 +881,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, loc = targetPagePtr + reqLen; /* - * Loop waiting for xlog to be available if necessary - * - * TODO: The walsender has its own version of this function, which uses a - * condition variable to wake up whenever WAL is flushed. We could use the - * same infrastructure here, instead of the check/sleep/repeat style of - * loop. + * Waiting for xlog to be available if necessary. */ while (1) { @@ -927,7 +923,6 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, if (state->currTLI == currTLI) { - if (loc <= read_upto) break; @@ -947,7 +942,44 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, } CHECK_FOR_INTERRUPTS(); - pg_usleep(1000L); + + /* + * Wait for LSN using appropriate method based on server state. + */ + if (!RecoveryInProgress()) + { + /* Primary: wait for flush */ + WaitForLSNFlush(loc); + } + else + { + /* Standby: wait for replay */ + WaitLSNResult result = WaitForLSNReplay(loc, 0); + + switch (result) + { + case WAIT_LSN_RESULT_SUCCESS: + /* LSN was replayed, loop back to recheck timeline */ + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + /* + * Promoted while waiting. This is the tricky case. + * We're now a primary, so loop back and use flush + * logic instead of replay logic. + */ + break; + + default: + /* Shouldn't happen without timeout */ + elog(ERROR, "unexpected wait result"); + } + } + + /* + * Loop back to recheck everything. + * Timeline might have changed during our wait. + */ } else { diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c new file mode 100644 index 00000000000..b4d5e9354ef --- /dev/null +++ b/src/backend/access/transam/xlogwait.c @@ -0,0 +1,551 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.c + * Implements waiting for WAL operations to reach specific LSNs. + * Used by internal WAL reading operations. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogwait.c + * + * NOTES + * This file implements waiting for WAL operations to reach specific LSNs + * on both physical standby and primary servers. The core idea is simple: + * every process that wants to wait publishes the LSN it needs to the + * shared memory, and the appropriate process (startup on standby, or + * WAL writer/backend on primary) wakes it once that LSN has been reached. + * + * The shared memory used by this module comprises a procInfos + * per-backend array with the information of the awaited LSN for each + * of the backend processes. The elements of that array are organized + * into a pairing heap waitersHeap, which allows for very fast finding + * of the least awaited LSN. + * + * In addition, the least-awaited LSN is cached as minWaitedLSN. The + * waiter process publishes information about itself to the shared + * memory and waits on the latch before it wakens up by the appropriate + * process, timeout is reached, standby is promoted, or the postmaster + * dies. Then, it cleans information about itself in the shared memory. + * + * On standby servers: After replaying a WAL record, the startup process + * first performs a fast path check minWaitedLSN > replayLSN. If this + * check is negative, it checks waitersHeap and wakes up the backend + * whose awaited LSNs are reached. + * + * On primary servers: After flushing WAL, the WAL writer or backend + * process performs a similar check against the flush LSN and wakes up + * waiters whose target flush LSNs have been reached. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <float.h> +#include <math.h> + +#include "access/xlog.h" +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +static int waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); + +static int waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); + +struct WaitLSNState *waitLSNState = NULL; + +/* Report the amount of shared memory space needed for WaitLSNState. */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, procInfos); + size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo))); + return size; +} + +/* Initialize the WaitLSNState in the shared memory. */ +void +WaitLSNShmemInit(void) +{ + bool found; + + waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", + WaitLSNShmemSize(), + &found); + if (!found) + { + /* Initialize replay heap and tracking */ + pg_atomic_init_u64(&waitLSNState->minWaitedReplayLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->replayWaitersHeap, waitlsn_replay_cmp, (void *)(uintptr_t)WAIT_LSN_REPLAY); + + /* Initialize flush heap and tracking */ + pg_atomic_init_u64(&waitLSNState->minWaitedFlushLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->flushWaitersHeap, waitlsn_flush_cmp, (void *)(uintptr_t)WAIT_LSN_FLUSH); + + /* Initialize process info array */ + memset(&waitLSNState->procInfos, 0, + (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo)); + } +} + +/* + * Comparison function for replay waiters heaps. Waiting processes are + * ordered by LSN, so that the waiter with smallest LSN is at the top. + */ +static int +waitlsn_replay_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, replayHeapNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Comparison function for flush waiters heaps. Waiting processes are + * ordered by LSN, so that the waiter with smallest LSN is at the top. + */ +static int +waitlsn_flush_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, flushHeapNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update minimum waited LSN for the specified operation type + */ +static void +updateMinWaitedLSN(WaitLSNOperation operation) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + + if (operation == WAIT_LSN_REPLAY) + { + if (!pairingheap_is_empty(&waitLSNState->replayWaitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->replayWaitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node); + minWaitedLSN = procInfo->waitLSN; + } + pg_atomic_write_u64(&waitLSNState->minWaitedReplayLSN, minWaitedLSN); + } + else /* WAIT_LSN_FLUSH */ + { + if (!pairingheap_is_empty(&waitLSNState->flushWaitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->flushWaitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node); + minWaitedLSN = procInfo->waitLSN; + } + pg_atomic_write_u64(&waitLSNState->minWaitedFlushLSN, minWaitedLSN); + } +} + +/* + * Add current process to appropriate waiters heap based on operation type + */ +static void +addLSNWaiter(XLogRecPtr lsn, WaitLSNOperation operation) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + procInfo->procno = MyProcNumber; + procInfo->waitLSN = lsn; + + if (operation == WAIT_LSN_REPLAY) + { + Assert(!procInfo->inReplayHeap); + pairingheap_add(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode); + procInfo->inReplayHeap = true; + updateMinWaitedLSN(WAIT_LSN_REPLAY); + } + else /* WAIT_LSN_FLUSH */ + { + Assert(!procInfo->inFlushHeap); + pairingheap_add(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode); + procInfo->inFlushHeap = true; + updateMinWaitedLSN(WAIT_LSN_FLUSH); + } + + LWLockRelease(WaitLSNLock); +} + +/* + * Remove current process from appropriate waiters heap based on operation type + */ +static void +deleteLSNWaiter(WaitLSNOperation operation) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + if (operation == WAIT_LSN_REPLAY && procInfo->inReplayHeap) + { + pairingheap_remove(&waitLSNState->replayWaitersHeap, &procInfo->replayHeapNode); + procInfo->inReplayHeap = false; + updateMinWaitedLSN(WAIT_LSN_REPLAY); + } + else if (operation == WAIT_LSN_FLUSH && procInfo->inFlushHeap) + { + pairingheap_remove(&waitLSNState->flushWaitersHeap, &procInfo->flushHeapNode); + procInfo->inFlushHeap = false; + updateMinWaitedLSN(WAIT_LSN_FLUSH); + } + + LWLockRelease(WaitLSNLock); +} + +/* + * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated + * on the stack. It should be enough to take single iteration for most cases. + */ +#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16) + +/* + * Remove waiters whose LSN has been reached from the heap and set their + * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap + * and set latches for all waiters. + * + * This function first accumulates waiters to wake up into an array, then + * wakes them up without holding a WaitLSNLock. The array size is static and + * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough + * to wake up all the waiters at once in the vast majority of cases. However, + * if there are more waiters, this function will loop to process them in + * multiple chunks. + */ +static void +wakeupWaiters(WaitLSNOperation operation, XLogRecPtr currentLSN) +{ + ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE]; + int numWakeUpProcs; + int i; + pairingheap *heap; + + /* Select appropriate heap */ + heap = (operation == WAIT_LSN_REPLAY) ? + &waitLSNState->replayWaitersHeap : + &waitLSNState->flushWaitersHeap; + + do + { + numWakeUpProcs = 0; + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + /* + * Iterate the waiters heap until we find LSN not yet reached. + * Record process numbers to wake up, but send wakeups after releasing lock. + */ + while (!pairingheap_is_empty(heap)) + { + pairingheap_node *node = pairingheap_first(heap); + WaitLSNProcInfo *procInfo; + + /* Get procInfo using appropriate heap node */ + if (operation == WAIT_LSN_REPLAY) + procInfo = pairingheap_container(WaitLSNProcInfo, replayHeapNode, node); + else + procInfo = pairingheap_container(WaitLSNProcInfo, flushHeapNode, node); + + if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN) + break; + + Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE); + wakeUpProcs[numWakeUpProcs++] = procInfo->procno; + (void) pairingheap_remove_first(heap); + + /* Update appropriate flag */ + if (operation == WAIT_LSN_REPLAY) + procInfo->inReplayHeap = false; + else + procInfo->inFlushHeap = false; + + if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE) + break; + } + + updateMinWaitedLSN(operation); + LWLockRelease(WaitLSNLock); + + /* + * Set latches for processes, whose waited LSNs are already reached. + * As the time consuming operations, we do this outside of + * WaitLSNLock. This is actually fine because procLatch isn't ever + * freed, so we just can potentially set the wrong process' (or no + * process') latch. + */ + for (i = 0; i < numWakeUpProcs; i++) + SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch); + + } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE); +} + +/* + * Wake up processes waiting for replay LSN to reach currentLSN + */ +void +WaitLSNWakeupReplay(XLogRecPtr currentLSN) +{ + /* Fast path check */ + if (pg_atomic_read_u64(&waitLSNState->minWaitedReplayLSN) > currentLSN) + return; + + wakeupWaiters(WAIT_LSN_REPLAY, currentLSN); +} + +/* + * Wake up processes waiting for flush LSN to reach currentLSN + */ +void +WaitLSNWakeupFlush(XLogRecPtr currentLSN) +{ + /* Fast path check */ + if (pg_atomic_read_u64(&waitLSNState->minWaitedFlushLSN) > currentLSN) + return; + + wakeupWaiters(WAIT_LSN_FLUSH, currentLSN); +} + +/* + * Clean up LSN waiters for exiting process + */ +void +WaitLSNCleanup(void) +{ + if (waitLSNState) + { + /* + * We do a fast-path check of the heap flags without the lock. These + * flags are set to true only by the process itself. So, it's only possible + * to get a false positive. But that will be eliminated by a recheck + * inside deleteLSNWaiter(). + */ + if (waitLSNState->procInfos[MyProcNumber].inReplayHeap) + deleteLSNWaiter(WAIT_LSN_REPLAY); + if (waitLSNState->procInfos[MyProcNumber].inFlushHeap) + deleteLSNWaiter(WAIT_LSN_FLUSH); + } +} + +/* + * Wait using MyLatch till the given LSN is replayed, a timeout happens, the + * replica gets promoted, or the postmaster dies. + * + * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was replayed. Returns + * WAIT_LSN_RESULT_TIMEOUT if the timeout was reached before the target LSN + * replayed. Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery, + * or replica got promoted before the target LSN replayed. + */ +WaitLSNResult +WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) +{ + XLogRecPtr currentLSN; + TimestampTz endtime = 0; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); + + if (!RecoveryInProgress()) + { + /* + * Recovery is not in progress. Given that we detected this in the + * very first check, this procedure was mistakenly called on primary. + * However, it's possible that standby was promoted concurrently to + * the procedure call, while target LSN is replayed. So, we still + * check the last replay LSN before reporting an error. + */ + if (PromoteIsTriggered() && targetLSN <= GetXLogReplayRecPtr(NULL)) + return WAIT_LSN_RESULT_SUCCESS; + return WAIT_LSN_RESULT_NOT_IN_RECOVERY; + } + else + { + /* If target LSN is already replayed, exit immediately */ + if (targetLSN <= GetXLogReplayRecPtr(NULL)) + return WAIT_LSN_RESULT_SUCCESS; + } + + if (timeout > 0) + { + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); + wake_events |= WL_TIMEOUT; + } + + /* + * Add our process to the replay waiters heap. It might happen that + * target LSN gets replayed before we do. Another check at the beginning + * of the loop below prevents the race condition. + */ + addLSNWaiter(targetLSN, WAIT_LSN_REPLAY); + + for (;;) + { + int rc; + long delay_ms = 0; + + /* Recheck that recovery is still in-progress */ + if (!RecoveryInProgress()) + { + /* + * Recovery was ended, but recheck if target LSN was already + * replayed. See the comment regarding deleteLSNWaiter() below. + */ + deleteLSNWaiter(WAIT_LSN_REPLAY); + currentLSN = GetXLogReplayRecPtr(NULL); + if (PromoteIsTriggered() && targetLSN <= currentLSN) + return WAIT_LSN_RESULT_SUCCESS; + return WAIT_LSN_RESULT_NOT_IN_RECOVERY; + } + else + { + /* Check if the waited LSN has been replayed */ + currentLSN = GetXLogReplayRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + } + + /* + * If the timeout value is specified, calculate the number of + * milliseconds before the timeout. Exit if the timeout is already + * reached. + */ + if (timeout > 0) + { + delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); + if (delay_ms <= 0) + break; + } + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, delay_ms, + WAIT_EVENT_WAIT_FOR_WAL_REPLAY); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN replay"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory replay heap. We might + * already be deleted by the startup process. The 'inReplayHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(WAIT_LSN_REPLAY); + + /* + * If we didn't reach the target LSN, we must be exited by timeout. + */ + if (targetLSN > currentLSN) + return WAIT_LSN_RESULT_TIMEOUT; + + return WAIT_LSN_RESULT_SUCCESS; +} + +/* + * Wait until targetLSN has been flushed on a primary server. + * Returns only after the condition is satisfied or on FATAL exit. + */ +void +WaitForLSNFlush(XLogRecPtr targetLSN) +{ + XLogRecPtr currentLSN; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS); + + /* We can only wait for flush when we are not in recovery */ + Assert(!RecoveryInProgress()); + + /* Quick exit if already flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + return; + + /* Add to flush waiters */ + addLSNWaiter(targetLSN, WAIT_LSN_FLUSH); + + /* Wait loop */ + for (;;) + { + int rc; + + /* Check if the waited LSN has been flushed */ + currentLSN = GetFlushRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, -1, + WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN flush"))); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory flush heap. We might + * already be deleted by the waker process. The 'inFlushHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(WAIT_LSN_FLUSH); + + return; +} diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c index 0aef8a88f1b..fa8431f7946 100644 --- a/src/backend/lib/pairingheap.c +++ b/src/backend/lib/pairingheap.c @@ -44,12 +44,26 @@ pairingheap_allocate(pairingheap_comparator compare, void *arg) pairingheap *heap; heap = (pairingheap *) palloc(sizeof(pairingheap)); + pairingheap_initialize(heap, compare, arg); + + return heap; +} + +/* + * pairingheap_initialize + * + * Same as pairingheap_allocate(), but initializes the pairing heap in-place + * rather than allocating a new chunk of memory. Useful to store the pairing + * heap in a shared memory. + */ +void +pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, + void *arg) +{ heap->ph_compare = compare; heap->ph_arg = arg; heap->ph_root = NULL; - - return heap; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..9955e829190 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1022,10 +1022,6 @@ StartReplication(StartReplicationCmd *cmd) /* * XLogReaderRoutine->page_read callback for logical decoding contexts, as a * walsender process. - * - * Inside the walsender we can do better than read_local_xlog_page, - * which has to do a plain sleep/busy loop, because the walsender's latch gets - * set every time WAL is flushed. */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f..10ffce8d174 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,6 +24,7 @@ #include "access/twophase.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); + size = add_size(size, WaitLSNShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void) WaitEventCustomShmemInit(); InjectionPointShmemInit(); AioShmemInit(); + WaitLSNShmemInit(); } /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 96f29aafc39..26b201eadb8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -947,6 +948,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 7553f6eacef..c1ac71ff7f2 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,8 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." +WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." @@ -355,6 +357,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +WaitLSN "Waiting to read or update shared Wait-for-LSN state." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h new file mode 100644 index 00000000000..f9c303a8c7f --- /dev/null +++ b/src/include/access/xlogwait.h @@ -0,0 +1,113 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.h + * Declarations for LSN replay waiting routines. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/access/xlogwait.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOG_WAIT_H +#define XLOG_WAIT_H + +#include "lib/pairingheap.h" +#include "port/atomics.h" +#include "postgres.h" +#include "storage/procnumber.h" +#include "storage/spin.h" +#include "tcop/dest.h" + +/* + * Result statuses for WaitForLSNReplay(). + */ +typedef enum +{ + WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */ + WAIT_LSN_RESULT_TIMEOUT, /* Timeout occurred */ + WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our + * wait */ +} WaitLSNResult; + +/* + * Wait operation types for LSN waiting facility. + */ +typedef enum WaitLSNOperation +{ + WAIT_LSN_REPLAY, /* Waiting for replay on standby */ + WAIT_LSN_FLUSH /* Waiting for flush on primary */ +} WaitLSNOperation; + +/* + * WaitLSNProcInfo - the shared memory structure representing information + * about the single process, which may wait for LSN operations. An item of + * waitLSNState->procInfos array. + */ +typedef struct WaitLSNProcInfo +{ + /* LSN, which this process is waiting for */ + XLogRecPtr waitLSN; + + /* Process to wake up once the waitLSN is reached */ + ProcNumber procno; + + /* Type-safe heap membership flags */ + bool inReplayHeap; /* In replay waiters heap */ + bool inFlushHeap; /* In flush waiters heap */ + + /* Separate heap nodes for type safety */ + pairingheap_node replayHeapNode; + pairingheap_node flushHeapNode; +} WaitLSNProcInfo; + +/* + * WaitLSNState - the shared memory state for the LSN waiting facility. + */ +typedef struct WaitLSNState +{ + /* + * The minimum replay LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after replaying a + * WAL record. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedReplayLSN; + + /* + * A pairing heap of replay waiting processes ordered by LSN values (least LSN is + * on top). Protected by WaitLSNLock. + */ + pairingheap replayWaitersHeap; + + /* + * The minimum flush LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after flushing + * WAL. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedFlushLSN; + + /* + * A pairing heap of flush waiting processes ordered by LSN values (least LSN is + * on top). Protected by WaitLSNLock. + */ + pairingheap flushWaitersHeap; + + /* + * An array with per-process information, indexed by the process number. + * Protected by WaitLSNLock. + */ + WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + + +extern PGDLLIMPORT WaitLSNState *waitLSNState; + +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNWakeupReplay(XLogRecPtr currentLSN); +extern void WaitLSNWakeupFlush(XLogRecPtr currentLSN); +extern void WaitLSNCleanup(void); +extern WaitLSNResult WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); +extern void WaitForLSNFlush(XLogRecPtr targetLSN); + +#endif /* XLOG_WAIT_H */ diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h index 3c57d3fda1b..567586f2ecf 100644 --- a/src/include/lib/pairingheap.h +++ b/src/include/lib/pairingheap.h @@ -77,6 +77,9 @@ typedef struct pairingheap extern pairingheap *pairingheap_allocate(pairingheap_comparator compare, void *arg); +extern void pairingheap_initialize(pairingheap *heap, + pairingheap_comparator compare, + void *arg); extern void pairingheap_free(pairingheap *heap); extern void pairingheap_add(pairingheap *heap, pairingheap_node *node); extern pairingheap_node *pairingheap_first(pairingheap *heap); diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 06a1ffd4b08..5b0ce383408 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, WaitLSN) /* * There also exist several built-in LWLock tranches. As with the predefined -- 2.51.0
