On Mon, Oct 20, 2025, at 00:06, Joel Jacobson wrote:
> Attachments:
> * 0001-optimize_listen_notify-v20.patch
> * 0002-optimize_listen_notify-v20-alt1.txt
> * 0002-optimize_listen_notify-v20-alt3.txt
> * 0002-optimize_listen_notify-v20-alt2.txt

My apologies, I forgot to attach 0002-optimize_listen_notify-v20.patch.

/Joel

Attachment: 0001-optimize_listen_notify-v20.patch
Description: Binary data

Attachment: 0002-optimize_listen_notify-v20.patch
Description: Binary data

From afff0f3f8b01cfde369c564025313e6acc9a610a Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 08:08:05 +0200
Subject: [PATCH] Implements idea #1: advisoryPos

---
 src/backend/commands/async.c | 63 +++++++++++++++++++++++++++++++++---
 1 file changed, 58 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4e6556fb8d1..6a02f5e3acc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
         (x).page != (y).page ? (x) : \
         (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+       (asyncQueuePagePrecedes((x).page, (y).page) || \
+        ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
        Oid                     dboid;                  /* backend's database 
OID, or InvalidOid */
        ProcNumber      nextListener;   /* id of next listener, or 
INVALID_PROC_NUMBER */
        QueuePosition pos;                      /* backend has read queue up to 
here */
+       QueuePosition advisoryPos;      /* backend could skip queue to here */
        bool            wakeupPending;  /* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)         (asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)         
(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)  
(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)        
(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -674,6 +681,7 @@ AsyncShmemInit(void)
                        QUEUE_BACKEND_DBOID(i) = InvalidOid;
                        QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
                        SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+                       SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
                        QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                }
        }
@@ -1312,6 +1320,7 @@ Exec_ListenPreCommit(void)
                        prevListener = i;
        }
        QUEUE_BACKEND_POS(MyProcNumber) = max;
+       QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = max;
        QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
        QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
        /* Insert backend into list of listeners at correct position */
@@ -2031,9 +2040,13 @@ SignalBackends(void)
         * Even though we may take and release NotifyQueueLock multiple times
         * while writing, the heavyweight lock guarantees this region contains
         * only our messages.  Therefore, any backend still positioned at the
-        * queue head from before our write can be safely advanced to the 
current
+        * queue head from before our write can be advised to skip to the 
current
         * queue head without waking it.
         *
+        * We use the advisoryPos field rather than directly modifying pos.
+        * The backend controls its own pos field and will check advisoryPos
+        * when it's safe to do so.
+        *
         * False-positive possibility: if a backend was previously signaled but
         * hasn't yet awoken, we'll skip advancing it (because wakeupPending is
         * true).  This is safe - the backend will advance its pointer when it
@@ -2048,6 +2061,7 @@ SignalBackends(void)
                 i = QUEUE_NEXT_LISTENER(i))
        {
                QueuePosition pos;
+               QueuePosition advisoryPos;
                int64           lag;
                int32           pid;
 
@@ -2055,15 +2069,31 @@ SignalBackends(void)
                        continue;
 
                pos = QUEUE_BACKEND_POS(i);
+               advisoryPos = QUEUE_BACKEND_ADVISORY_POS(i);
 
-               /* Direct advancement for idle backends at the old head */
+               /*
+                * Direct advancement for idle backends at the old head.
+                *
+                * We check advisoryPos rather than pos to allow accumulating 
advances
+                * from multiple consecutive notifying backends.  If we checked 
pos,
+                * only the first notifier could advance idle backends; 
subsequent
+                * notifiers would find pos unchanged (since the backend hasn't 
woken
+                * up yet) and fail to advance further.
+                */
                if (pendingNotifies != NULL &&
-                       QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+                       QUEUE_POS_EQUAL(advisoryPos, queueHeadBeforeWrite))
                {
-                       QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
-                       pos = queueHeadAfterWrite;
+                       QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
+                       advisoryPos = queueHeadAfterWrite;
                }
 
+               /*
+                * For lag calculation, use whichever position is further ahead.
+                * This ensures we don't spuriously wake a backend that has been
+                * directly advanced.
+                */
+               pos = QUEUE_POS_MAX(pos, advisoryPos);
+
                /* Signal backends that have fallen too far behind */
                lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
                                                                 
QUEUE_POS_PAGE(pos));
@@ -2302,6 +2332,7 @@ static void
 asyncQueueReadAllNotifications(void)
 {
        volatile QueuePosition pos;
+       QueuePosition advisoryPos;
        QueuePosition head;
        Snapshot        snapshot;
 
@@ -2319,6 +2350,21 @@ asyncQueueReadAllNotifications(void)
        QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
        pos = QUEUE_BACKEND_POS(MyProcNumber);
        head = QUEUE_HEAD;
+
+       /*
+        * Check if another backend has set an advisory position for us.
+        * If so, and if we haven't yet read past that point, we can safely
+        * adopt the advisory position and skip the intervening notifications.
+        */
+       advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+       if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+               QUEUE_POS_PRECEDES(pos, advisoryPos))
+       {
+               pos = advisoryPos;
+               QUEUE_BACKEND_POS(MyProcNumber) = pos;
+       }
+
        LWLockRelease(NotifyQueueLock);
 
        if (QUEUE_POS_EQUAL(pos, head))
@@ -2440,6 +2486,13 @@ asyncQueueReadAllNotifications(void)
                /* Update shared state */
                LWLockAcquire(NotifyQueueLock, LW_SHARED);
                QUEUE_BACKEND_POS(MyProcNumber) = pos;
+               /*
+                * Advance advisoryPos to our current position if it has fallen 
behind,
+                * but preserve any newer advisory position that may have been 
set by
+                * another backend while we were processing notifications.
+                */
+               QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) =
+                       QUEUE_POS_MAX(pos, 
QUEUE_BACKEND_ADVISORY_POS(MyProcNumber));
                LWLockRelease(NotifyQueueLock);
        }
        PG_END_TRY();
-- 
2.50.1

From c403098ae4e4d06f109eb6292a67c6577e123010 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 08:35:44 +0200
Subject: [PATCH] Implement idea #3

---
 src/backend/commands/async.c | 150 ++++++++++++++++++++---------------
 1 file changed, 85 insertions(+), 65 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4e6556fb8d1..b34e4a2247b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
         (x).page != (y).page ? (x) : \
         (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+       (asyncQueuePagePrecedes((x).page, (y).page) || \
+        ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
@@ -2304,6 +2309,7 @@ asyncQueueReadAllNotifications(void)
        volatile QueuePosition pos;
        QueuePosition head;
        Snapshot        snapshot;
+       bool            reachedStop;
 
        /* page_buffer must be adequately aligned, so use a union */
        union
@@ -2372,77 +2378,69 @@ asyncQueueReadAllNotifications(void)
         * It is possible that we fail while trying to send a message to our
         * frontend (for example, because of encoding conversion failure).  If
         * that happens it is critical that we not try to send the same message
-        * over and over again.  Therefore, we place a PG_TRY block here that 
will
-        * forcibly advance our queue position before we lose control to an 
error.
-        * (We could alternatively retake NotifyQueueLock and move the position
-        * before handling each individual message, but that seems like too much
-        * lock traffic.)
+        * over and over again.  Therefore, we must advance our queue position
+        * regularly as we process messages.
+        *
+        * We must also be careful about concurrency: SignalBackends() can
+        * directly advance our position while we're reading.  To preserve such
+        * advancement, asyncQueueProcessPageEntries updates our position in
+        * shared memory for each message, only writing if our position is 
ahead.
+        * Shared lock is sufficient since we're only updating our own position.
         */
-       PG_TRY();
+       do
        {
-               bool            reachedStop;
+               int64           curpage = QUEUE_POS_PAGE(pos);
+               int                     curoffset = QUEUE_POS_OFFSET(pos);
+               int                     slotno;
+               int                     copysize;
 
-               do
+               /*
+                * We copy the data from SLRU into a local buffer, so as to 
avoid
+                * holding the SLRU lock while we are examining the entries and
+                * possibly transmitting them to our frontend.  Copy only the 
part
+                * of the page we will actually inspect.
+                */
+               slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+                                                                               
        InvalidTransactionId);
+               if (curpage == QUEUE_POS_PAGE(head))
                {
-                       int64           curpage = QUEUE_POS_PAGE(pos);
-                       int                     curoffset = 
QUEUE_POS_OFFSET(pos);
-                       int                     slotno;
-                       int                     copysize;
+                       /* we only want to read as far as head */
+                       copysize = QUEUE_POS_OFFSET(head) - curoffset;
+                       if (copysize < 0)
+                               copysize = 0;   /* just for safety */
+               }
+               else
+               {
+                       /* fetch all the rest of the page */
+                       copysize = QUEUE_PAGESIZE - curoffset;
+               }
+               memcpy(page_buffer.buf + curoffset,
+                          NotifyCtl->shared->page_buffer[slotno] + curoffset,
+                          copysize);
+               /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+               LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
 
-                       /*
-                        * We copy the data from SLRU into a local buffer, so 
as to avoid
-                        * holding the SLRU lock while we are examining the 
entries and
-                        * possibly transmitting them to our frontend.  Copy 
only the part
-                        * of the page we will actually inspect.
-                        */
-                       slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-                                                                               
                InvalidTransactionId);
-                       if (curpage == QUEUE_POS_PAGE(head))
-                       {
-                               /* we only want to read as far as head */
-                               copysize = QUEUE_POS_OFFSET(head) - curoffset;
-                               if (copysize < 0)
-                                       copysize = 0;   /* just for safety */
-                       }
-                       else
-                       {
-                               /* fetch all the rest of the page */
-                               copysize = QUEUE_PAGESIZE - curoffset;
-                       }
-                       memcpy(page_buffer.buf + curoffset,
-                                  NotifyCtl->shared->page_buffer[slotno] + 
curoffset,
-                                  copysize);
-                       /* Release lock that we got from 
SimpleLruReadPage_ReadOnly() */
-                       LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+               /*
+                * Process messages up to the stop position, end of page, or an
+                * uncommitted message.
+                *
+                * Our stop position is what we found to be the head's position
+                * when we entered this function. It might have changed already.
+                * But if it has, we will receive (or have already received and
+                * queued) another signal and come here again.
+                *
+                * We are not holding NotifyQueueLock here! The queue can only
+                * extend beyond the head pointer (see above).
+                * asyncQueueProcessPageEntries will update our backend's 
position
+                * for each message to ensure we don't reprocess messages if we 
fail
+                * partway through, and to preserve any direct advancement that
+                * SignalBackends() might perform concurrently.
+                */
+               reachedStop = asyncQueueProcessPageEntries(&pos, head,
+                                                                               
                   page_buffer.buf,
+                                                                               
                   snapshot);
 
-                       /*
-                        * Process messages up to the stop position, end of 
page, or an
-                        * uncommitted message.
-                        *
-                        * Our stop position is what we found to be the head's 
position
-                        * when we entered this function. It might have changed 
already.
-                        * But if it has, we will receive (or have already 
received and
-                        * queued) another signal and come here again.
-                        *
-                        * We are not holding NotifyQueueLock here! The queue 
can only
-                        * extend beyond the head pointer (see above) and we 
leave our
-                        * backend's pointer where it is so nobody will 
truncate or
-                        * rewrite pages under us. Especially we don't want to 
hold a lock
-                        * while sending the notifications to the frontend.
-                        */
-                       reachedStop = asyncQueueProcessPageEntries(&pos, head,
-                                                                               
                           page_buffer.buf,
-                                                                               
                           snapshot);
-               } while (!reachedStop);
-       }
-       PG_FINALLY();
-       {
-               /* Update shared state */
-               LWLockAcquire(NotifyQueueLock, LW_SHARED);
-               QUEUE_BACKEND_POS(MyProcNumber) = pos;
-               LWLockRelease(NotifyQueueLock);
-       }
-       PG_END_TRY();
+       } while (!reachedStop);
 
        /* Done with snapshot */
        UnregisterSnapshot(snapshot);
@@ -2490,6 +2488,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                 */
                reachedEndOfPage = asyncQueueAdvance(current, qe->length);
 
+               /*
+                * Update our position in shared memory immediately after 
advancing,
+                * before we attempt to process the message. This ensures we 
won't
+                * reprocess this message if NotifyMyFrontEnd fails.
+                *
+                * Only write if our position is ahead of the shared position.
+                * If the shared position is already ahead (due to direct 
advancement
+                * by SignalBackends), preserve it by not overwriting.
+                */
+               LWLockAcquire(NotifyQueueLock, LW_SHARED);
+               {
+                       QueuePosition sharedPos = 
QUEUE_BACKEND_POS(MyProcNumber);
+
+                       if (QUEUE_POS_PRECEDES(sharedPos, *current))
+                               QUEUE_BACKEND_POS(MyProcNumber) = *current;
+               }
+               LWLockRelease(NotifyQueueLock);
+
                /* Ignore messages destined for other databases */
                if (qe->dboid == MyDatabaseId)
                {
@@ -2515,6 +2531,10 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                 * messages.
                                 */
                                *current = thisentry;
+                               /* Update shared memory to reflect the 
backed-up position */
+                               LWLockAcquire(NotifyQueueLock, LW_SHARED);
+                               QUEUE_BACKEND_POS(MyProcNumber) = *current;
+                               LWLockRelease(NotifyQueueLock);
                                reachedStop = true;
                                break;
                        }
-- 
2.50.1

From 928cc032706ac154153279adbdfba95f6af2fae4 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 08:12:47 +0200
Subject: [PATCH] Implement idea #2: donePos

---
 src/backend/commands/async.c | 57 +++++++++++++++++++++++++++++++-----
 1 file changed, 49 insertions(+), 8 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4e6556fb8d1..c81807107d1 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -285,7 +285,8 @@ typedef struct QueueBackendStatus
        int32           pid;                    /* either a PID or InvalidPid */
        Oid                     dboid;                  /* backend's database 
OID, or InvalidOid */
        ProcNumber      nextListener;   /* id of next listener, or 
INVALID_PROC_NUMBER */
-       QueuePosition pos;                      /* backend has read queue up to 
here */
+       QueuePosition pos;                      /* next position to read from */
+       QueuePosition donePos;          /* backend has definitively processed 
up to here */
        bool            wakeupPending;  /* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)         (asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)         
(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)           (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DONEPOS(i)       (asyncQueueControl->backend[i].donePos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)        
(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -674,6 +676,7 @@ AsyncShmemInit(void)
                        QUEUE_BACKEND_DBOID(i) = InvalidOid;
                        QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
                        SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+                       SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0);
                        QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                }
        }
@@ -1312,6 +1315,7 @@ Exec_ListenPreCommit(void)
                        prevListener = i;
        }
        QUEUE_BACKEND_POS(MyProcNumber) = max;
+       QUEUE_BACKEND_DONEPOS(MyProcNumber) = max;
        QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
        QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
        /* Insert backend into list of listeners at correct position */
@@ -2048,6 +2052,7 @@ SignalBackends(void)
                 i = QUEUE_NEXT_LISTENER(i))
        {
                QueuePosition pos;
+               QueuePosition donePos;
                int64           lag;
                int32           pid;
 
@@ -2055,6 +2060,7 @@ SignalBackends(void)
                        continue;
 
                pos = QUEUE_BACKEND_POS(i);
+               donePos = QUEUE_BACKEND_DONEPOS(i);
 
                /* Direct advancement for idle backends at the old head */
                if (pendingNotifies != NULL &&
@@ -2064,9 +2070,17 @@ SignalBackends(void)
                        pos = queueHeadAfterWrite;
                }
 
-               /* Signal backends that have fallen too far behind */
+               /*
+                * Signal backends that have fallen too far behind.
+                *
+                * We use donePos rather than pos for the lag check because 
donePos
+                * is what matters for queue truncation (see 
asyncQueueAdvanceTail).
+                * A backend may have been directly advanced (pos is recent) 
while
+                * donePos is still far behind, holding back the tail.  We need 
to
+                * wake such backends so they can advance their donePos.
+                */
                lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-                                                                
QUEUE_POS_PAGE(pos));
+                                                                
QUEUE_POS_PAGE(donePos));
 
                if (lag >= QUEUE_CLEANUP_DELAY)
                {
@@ -2319,14 +2333,25 @@ asyncQueueReadAllNotifications(void)
        QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
        pos = QUEUE_BACKEND_POS(MyProcNumber);
        head = QUEUE_HEAD;
-       LWLockRelease(NotifyQueueLock);
 
        if (QUEUE_POS_EQUAL(pos, head))
        {
-               /* Nothing to do, we have read all notifications already. */
+               /*
+                * Nothing to do, we have read all notifications already.
+                *
+                * Update donePos to match pos before returning.  This is 
important
+                * when our position was advanced via direct advancement: we 
need to
+                * update donePos so the queue tail can advance.  Without this,
+                * backends that have been directly advanced would hold back 
queue
+                * truncation indefinitely.
+                */
+               QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+               LWLockRelease(NotifyQueueLock);
                return;
        }
 
+       LWLockRelease(NotifyQueueLock);
+
        /*----------
         * Get snapshot we'll use to decide which xacts are still in progress.
         * This is trickier than it might seem, because of race conditions.
@@ -2437,9 +2462,19 @@ asyncQueueReadAllNotifications(void)
        }
        PG_FINALLY();
        {
-               /* Update shared state */
+               /*
+                * Update shared state.
+                *
+                * We update donePos to what we actually read (the local pos 
variable),
+                * as this is used for truncation safety.  For the read 
position (pos),
+                * we use the maximum of our local position and the current 
shared
+                * position, in case another backend used direct advancement to 
skip us
+                * ahead while we were reading.  This prevents us from going 
backwards
+                * and potentially pointing to a truncated page.
+                */
                LWLockAcquire(NotifyQueueLock, LW_SHARED);
-               QUEUE_BACKEND_POS(MyProcNumber) = pos;
+               QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+               QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, 
QUEUE_BACKEND_POS(MyProcNumber));
                LWLockRelease(NotifyQueueLock);
        }
        PG_END_TRY();
@@ -2589,7 +2624,13 @@ asyncQueueAdvanceTail(void)
        for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = 
QUEUE_NEXT_LISTENER(i))
        {
                Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-               min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+               /*
+                * Use donePos rather than pos for truncation safety.  The 
donePos
+                * field represents what the backend has definitively 
processed, while
+                * pos can be advanced by other backends via direct 
advancement.  This
+                * prevents truncating pages that a backend is still reading 
from.
+                */
+               min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i));
        }
        QUEUE_TAIL = min;
        oldtailpage = QUEUE_STOP_PAGE;
-- 
2.50.1

Reply via email to