On Mon, Oct 20, 2025, at 00:06, Joel Jacobson wrote: > Attachments: > * 0001-optimize_listen_notify-v20.patch > * 0002-optimize_listen_notify-v20-alt1.txt > * 0002-optimize_listen_notify-v20-alt3.txt > * 0002-optimize_listen_notify-v20-alt2.txt
My apologies, I forgot to attach 0002-optimize_listen_notify-v20.patch. /Joel
0001-optimize_listen_notify-v20.patch
Description: Binary data
0002-optimize_listen_notify-v20.patch
Description: Binary data
From afff0f3f8b01cfde369c564025313e6acc9a610a Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:08:05 +0200 Subject: [PATCH] Implements idea #1: advisoryPos --- src/backend/commands/async.c | 63 +++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..6a02f5e3acc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -286,6 +291,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* backend could skip queue to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +681,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1320,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2031,9 +2040,13 @@ SignalBackends(void) * Even though we may take and release NotifyQueueLock multiple times * while writing, the heavyweight lock guarantees this region contains * only our messages. Therefore, any backend still positioned at the - * queue head from before our write can be safely advanced to the current + * queue head from before our write can be advised to skip to the current * queue head without waking it. * + * We use the advisoryPos field rather than directly modifying pos. + * The backend controls its own pos field and will check advisoryPos + * when it's safe to do so. + * * False-positive possibility: if a backend was previously signaled but * hasn't yet awoken, we'll skip advancing it (because wakeupPending is * true). This is safe - the backend will advance its pointer when it @@ -2048,6 +2061,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition advisoryPos; int64 lag; int32 pid; @@ -2055,15 +2069,31 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(i); - /* Direct advancement for idle backends at the old head */ + /* + * Direct advancement for idle backends at the old head. + * + * We check advisoryPos rather than pos to allow accumulating advances + * from multiple consecutive notifying backends. If we checked pos, + * only the first notifier could advance idle backends; subsequent + * notifiers would find pos unchanged (since the backend hasn't woken + * up yet) and fail to advance further. + */ if (pendingNotifies != NULL && - QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + QUEUE_POS_EQUAL(advisoryPos, queueHeadBeforeWrite)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - pos = queueHeadAfterWrite; + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; + advisoryPos = queueHeadAfterWrite; } + /* + * For lag calculation, use whichever position is further ahead. + * This ensures we don't spuriously wake a backend that has been + * directly advanced. + */ + pos = QUEUE_POS_MAX(pos, advisoryPos); + /* Signal backends that have fallen too far behind */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(pos)); @@ -2302,6 +2332,7 @@ static void asyncQueueReadAllNotifications(void) { volatile QueuePosition pos; + QueuePosition advisoryPos; QueuePosition head; Snapshot snapshot; @@ -2319,6 +2350,21 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; + + /* + * Check if another backend has set an advisory position for us. + * If so, and if we haven't yet read past that point, we can safely + * adopt the advisory position and skip the intervening notifications. + */ + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + + if (!QUEUE_POS_EQUAL(advisoryPos, pos) && + QUEUE_POS_PRECEDES(pos, advisoryPos)) + { + pos = advisoryPos; + QUEUE_BACKEND_POS(MyProcNumber) = pos; + } + LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) @@ -2440,6 +2486,13 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; + /* + * Advance advisoryPos to our current position if it has fallen behind, + * but preserve any newer advisory position that may have been set by + * another backend while we were processing notifications. + */ + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = + QUEUE_POS_MAX(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); -- 2.50.1
From c403098ae4e4d06f109eb6292a67c6577e123010 Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:35:44 +0200 Subject: [PATCH] Implement idea #3 --- src/backend/commands/async.c | 150 ++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 65 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..b34e4a2247b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -2304,6 +2309,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool reachedStop; /* page_buffer must be adequately aligned, so use a union */ union @@ -2372,77 +2378,69 @@ asyncQueueReadAllNotifications(void) * It is possible that we fail while trying to send a message to our * frontend (for example, because of encoding conversion failure). If * that happens it is critical that we not try to send the same message - * over and over again. Therefore, we place a PG_TRY block here that will - * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake NotifyQueueLock and move the position - * before handling each individual message, but that seems like too much - * lock traffic.) + * over and over again. Therefore, we must advance our queue position + * regularly as we process messages. + * + * We must also be careful about concurrency: SignalBackends() can + * directly advance our position while we're reading. To preserve such + * advancement, asyncQueueProcessPageEntries updates our position in + * shared memory for each message, only writing if our position is ahead. + * Shared lock is sufficient since we're only updating our own position. */ - PG_TRY(); + do { - bool reachedStop; + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; - do + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. + * + * We are not holding NotifyQueueLock here! The queue can only + * extend beyond the head pointer (see above). + * asyncQueueProcessPageEntries will update our backend's position + * for each message to ensure we don't reprocess messages if we fail + * partway through, and to preserve any direct advancement that + * SignalBackends() might perform concurrently. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf, + snapshot); - /* - * Process messages up to the stop position, end of page, or an - * uncommitted message. - * - * Our stop position is what we found to be the head's position - * when we entered this function. It might have changed already. - * But if it has, we will receive (or have already received and - * queued) another signal and come here again. - * - * We are not holding NotifyQueueLock here! The queue can only - * extend beyond the head pointer (see above) and we leave our - * backend's pointer where it is so nobody will truncate or - * rewrite pages under us. Especially we don't want to hold a lock - * while sending the notifications to the frontend. - */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); - } while (!reachedStop); - } - PG_FINALLY(); - { - /* Update shared state */ - LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; - LWLockRelease(NotifyQueueLock); - } - PG_END_TRY(); + } while (!reachedStop); /* Done with snapshot */ UnregisterSnapshot(snapshot); @@ -2490,6 +2488,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, */ reachedEndOfPage = asyncQueueAdvance(current, qe->length); + /* + * Update our position in shared memory immediately after advancing, + * before we attempt to process the message. This ensures we won't + * reprocess this message if NotifyMyFrontEnd fails. + * + * Only write if our position is ahead of the shared position. + * If the shared position is already ahead (due to direct advancement + * by SignalBackends), preserve it by not overwriting. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + { + QueuePosition sharedPos = QUEUE_BACKEND_POS(MyProcNumber); + + if (QUEUE_POS_PRECEDES(sharedPos, *current)) + QUEUE_BACKEND_POS(MyProcNumber) = *current; + } + LWLockRelease(NotifyQueueLock); + /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { @@ -2515,6 +2531,10 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * messages. */ *current = thisentry; + /* Update shared memory to reflect the backed-up position */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyProcNumber) = *current; + LWLockRelease(NotifyQueueLock); reachedStop = true; break; } -- 2.50.1
From 928cc032706ac154153279adbdfba95f6af2fae4 Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:12:47 +0200 Subject: [PATCH] Implement idea #2: donePos --- src/backend/commands/async.c | 57 +++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..c81807107d1 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -285,7 +285,8 @@ typedef struct QueueBackendStatus int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ - QueuePosition pos; /* backend has read queue up to here */ + QueuePosition pos; /* next position to read from */ + QueuePosition donePos; /* backend has definitively processed up to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_DONEPOS(i) (asyncQueueControl->backend[i].donePos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +676,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1315,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2048,6 +2052,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition donePos; int64 lag; int32 pid; @@ -2055,6 +2060,7 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + donePos = QUEUE_BACKEND_DONEPOS(i); /* Direct advancement for idle backends at the old head */ if (pendingNotifies != NULL && @@ -2064,9 +2070,17 @@ SignalBackends(void) pos = queueHeadAfterWrite; } - /* Signal backends that have fallen too far behind */ + /* + * Signal backends that have fallen too far behind. + * + * We use donePos rather than pos for the lag check because donePos + * is what matters for queue truncation (see asyncQueueAdvanceTail). + * A backend may have been directly advanced (pos is recent) while + * donePos is still far behind, holding back the tail. We need to + * wake such backends so they can advance their donePos. + */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)); + QUEUE_POS_PAGE(donePos)); if (lag >= QUEUE_CLEANUP_DELAY) { @@ -2319,14 +2333,25 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; - LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) { - /* Nothing to do, we have read all notifications already. */ + /* + * Nothing to do, we have read all notifications already. + * + * Update donePos to match pos before returning. This is important + * when our position was advanced via direct advancement: we need to + * update donePos so the queue tail can advance. Without this, + * backends that have been directly advanced would hold back queue + * truncation indefinitely. + */ + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + LWLockRelease(NotifyQueueLock); return; } + LWLockRelease(NotifyQueueLock); + /*---------- * Get snapshot we'll use to decide which xacts are still in progress. * This is trickier than it might seem, because of race conditions. @@ -2437,9 +2462,19 @@ asyncQueueReadAllNotifications(void) } PG_FINALLY(); { - /* Update shared state */ + /* + * Update shared state. + * + * We update donePos to what we actually read (the local pos variable), + * as this is used for truncation safety. For the read position (pos), + * we use the maximum of our local position and the current shared + * position, in case another backend used direct advancement to skip us + * ahead while we were reading. This prevents us from going backwards + * and potentially pointing to a truncated page. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2589,7 +2624,13 @@ asyncQueueAdvanceTail(void) for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + /* + * Use donePos rather than pos for truncation safety. The donePos + * field represents what the backend has definitively processed, while + * pos can be advanced by other backends via direct advancement. This + * prevents truncating pages that a backend is still reading from. + */ + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i)); } QUEUE_TAIL = min; oldtailpage = QUEUE_STOP_PAGE; -- 2.50.1
