On Fri, Oct 24, 2025, at 11:55, Arseniy Mukhin wrote:
> On Wed, Oct 22, 2025 at 3:02 AM Joel Jacobson <[email protected]> wrote:
>> How about doing some more work in vac_update_datfrozenxid()?
>>
>> Pseudo-code sketch:
>>
...
> I agree we need to add something like this. Looks like with v10 it's
> possible for the listen/notify queue to block datfrozenxid advancing
> even without extreme circumstances (without hanging listeners etc).

Attached, two implementations of the sketched out idea, which can be
applied on top of the v10 patch. They should both be functionally
equivalent.

vacuum_notify_queue_cleanup-with-code-dup.txt:

This version doesn't try at all to avoid code duplication;
asyncQueueAdvanceTailNoListeners is very similar to SignalBackends, and
asyncQueueAdvanceTailNoListeners is very similar to
asyncQueueAdvanceTail. I think this might be preferable, if the channel
hash optimization that we're working on in the other thread, as a bonus
solves these fundamental problems, so that these added safety
functionality can be eliminated. I think it's quite likely we can
achieve that, but not certain.

vacuum_notify_queue_cleanup-without-code-dup.txt:

This version instead equips SignalBackends and asyncQueueAdvanceTail
with a new boolean input parameter to control their behavior, where
passing false gives the current behavior, used at the current call
sites, and vacuum would pass true, to signal all non-caught-up backends
in all databases and forcibly advance the tail when there are no
listening backends.

I have no strong preference for one or the other.

/Joel
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 7c9d7831c9f..1805ddb36ce 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -444,13 +444,11 @@ static void asyncQueueNotificationToEntry(Notification 
*n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                                                
 QueuePosition stop,
                                                                                
 char *page_buffer,
                                                                                
 Snapshot snapshot);
-static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
 static void AddEventToPendingNotifies(Notification *n);
@@ -1011,7 +1009,7 @@ AtCommit_Notify(void)
         * PreCommit_Notify().
         */
        if (pendingNotifies != NULL)
-               SignalBackends();
+               SignalBackends(false);
 
        /*
         * If it's time to try to advance the global tail pointer, do that.
@@ -1025,7 +1023,7 @@ AtCommit_Notify(void)
        if (tryAdvanceTail)
        {
                tryAdvanceTail = false;
-               asyncQueueAdvanceTail();
+               asyncQueueAdvanceTail(false);
        }
 
        /* And clean up */
@@ -1483,7 +1481,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
        double          usage;
 
        /* Advance the queue tail so we don't report a too-large result */
-       asyncQueueAdvanceTail();
+       asyncQueueAdvanceTail(false);
 
        LWLockAcquire(NotifyQueueLock, LW_SHARED);
        usage = asyncQueueUsage();
@@ -1576,9 +1574,16 @@ asyncQueueFillWarning(void)
  *
  * This is called during CommitTransaction(), so it's important for it
  * to have very low probability of failure.
+ *
+ * If all_databases is false (normal NOTIFY), we signal listeners in our own
+ * database unless they're caught up, and listeners in other databases only
+ * if they are far behind (QUEUE_CLEANUP_DELAY pages).
+ *
+ * If all_databases is true (VACUUM cleanup), we signal all listeners across
+ * all databases that aren't already caught up, with no distance filtering.
  */
-static void
-SignalBackends(void)
+void
+SignalBackends(bool all_databases)
 {
        int32      *pids;
        ProcNumber *procnos;
@@ -1604,25 +1609,21 @@ SignalBackends(void)
 
                Assert(pid != InvalidPid);
                pos = QUEUE_BACKEND_POS(i);
-               if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
-               {
-                       /*
-                        * Always signal listeners in our own database, unless 
they're
-                        * already caught up (unlikely, but possible).
-                        */
-                       if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-                               continue;
-               }
-               else
-               {
-                       /*
-                        * Listeners in other databases should be signaled only 
if they
-                        * are far behind.
-                        */
-                       if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-                                                                  
QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-                               continue;
-               }
+
+               /*
+                * Always skip backends that are already caught up.
+                */
+               if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+                       continue;
+
+               /*
+                * Skip if we're not signaling all databases AND this is a 
different
+                * database AND the listener is not far behind.
+                */
+               if (!all_databases && QUEUE_BACKEND_DBOID(i) != MyDatabaseId &&
+                       asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+                                                          QUEUE_POS_PAGE(pos)) 
< QUEUE_CLEANUP_DELAY)
+                       continue;
                /* OK, need to signal this one */
                pids[count] = pid;
                procnos[count] = i;
@@ -2188,15 +2189,38 @@ GetOldestQueuedNotifyXid(void)
        return oldestXid;
 }
 
+/*
+ * Check if there are any active listeners in the notification queue.
+ *
+ * Returns true if at least one backend is registered as a listener,
+ * false otherwise.
+ */
+bool
+asyncQueueHasListeners(void)
+{
+       bool            hasListeners;
+
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+       hasListeners = (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER);
+       LWLockRelease(NotifyQueueLock);
+
+       return hasListeners;
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
  *
- * This is (usually) called during CommitTransaction(), so it's important for
- * it to have very low probability of failure.
+ * If force_to_head is false (normal case), we compute the new tail as the
+ * minimum of all listener positions.  This is (usually) called during
+ * CommitTransaction(), so it's important for it to have very low probability
+ * of failure.
+ *
+ * If force_to_head is true (VACUUM cleanup), we advance the tail directly to
+ * the head, discarding all notifications, but only if there are no listeners.
  */
-static void
-asyncQueueAdvanceTail(void)
+void
+asyncQueueAdvanceTail(bool force_to_head)
 {
        QueuePosition min;
        int64           oldtailpage;
@@ -2224,12 +2248,38 @@ asyncQueueAdvanceTail(void)
         * to access the pages we are in the midst of truncating.
         */
        LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-       min = QUEUE_HEAD;
-       for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = 
QUEUE_NEXT_LISTENER(i))
+
+       if (force_to_head)
        {
-               Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-               min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+               /*
+                * Verify that there are still no listeners.  It's possible
+                * that a listener appeared since VACUUM checked.
+                */
+               if (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER)
+               {
+                       LWLockRelease(NotifyQueueLock);
+                       LWLockRelease(NotifyQueueTailLock);
+                       return;
+               }
+
+               /*
+                * Advance the logical tail to the head, discarding all 
notifications.
+                */
+               min = QUEUE_HEAD;
        }
+       else
+       {
+               /*
+                * Normal case: compute minimum position from all listeners.
+                */
+               min = QUEUE_HEAD;
+               for (ProcNumber i = QUEUE_FIRST_LISTENER; i != 
INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+               {
+                       Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
+                       min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+               }
+       }
+
        QUEUE_TAIL = min;
        oldtailpage = QUEUE_STOP_PAGE;
        LWLockRelease(NotifyQueueLock);
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 6c601ce81aa..e4bc292ebd9 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1739,11 +1739,24 @@ vac_update_datfrozenxid(void)
         * Also consider the oldest XID in the notification queue, since 
backends
         * will need to call TransactionIdDidCommit() on those XIDs when
         * processing the notifications.
+        *
+        * If the queue is blocking datfrozenxid advancement, attempt to clean 
it
+        * up.  If listeners exist, wake them to process their pending
+        * notifications.  If no listeners exist, discard all notifications.
+        * Either way, we back off datfrozenxid for this VACUUM cycle; the next
+        * VACUUM will benefit from the cleanup we've triggered.
         */
        oldestNotifyXid = GetOldestQueuedNotifyXid();
        if (TransactionIdIsValid(oldestNotifyXid) &&
                TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+       {
+               if (asyncQueueHasListeners())
+                       SignalBackends(true);
+               else
+                       asyncQueueAdvanceTail(true);
+
                newFrozenXid = oldestNotifyXid;
+       }
 
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index ac323ada492..bb442940c29 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,6 +29,11 @@ extern void NotifyMyFrontEnd(const char *channel,
 /* get oldest XID in the notification queue for vacuum freeze */
 extern TransactionId GetOldestQueuedNotifyXid(void);
 
+/* functions for vacuum to manage notification queue */
+extern bool asyncQueueHasListeners(void);
+extern void SignalBackends(bool all_databases);
+extern void asyncQueueAdvanceTail(bool force_to_head);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 7c9d7831c9f..c8c0ab66b66 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2188,6 +2188,164 @@ GetOldestQueuedNotifyXid(void)
        return oldestXid;
 }
 
+/*
+ * Check if there are any active listeners in the notification queue.
+ *
+ * Returns true if at least one backend is registered as a listener,
+ * false otherwise.
+ */
+bool
+asyncQueueHasListeners(void)
+{
+       bool            hasListeners;
+
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+       hasListeners = (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER);
+       LWLockRelease(NotifyQueueLock);
+
+       return hasListeners;
+}
+
+/*
+ * Wake all listening backends to process notifications.
+ *
+ * This is called by VACUUM when it needs to advance datfrozenxid but the
+ * notification queue has old XIDs.  We signal all listeners across all
+ * databases that aren't already caught up, so they can process their
+ * pending notifications and advance the queue tail.
+ */
+void
+asyncQueueWakeAllListeners(void)
+{
+       int32      *pids;
+       ProcNumber *procnos;
+       int                     count;
+
+       /*
+        * Identify backends that we need to signal.  We don't want to send
+        * signals while holding the NotifyQueueLock, so this loop just builds a
+        * list of target PIDs.
+        *
+        * XXX in principle these pallocs could fail, which would be bad. Maybe
+        * preallocate the arrays?  They're not that large, though.
+        */
+       pids = (int32 *) palloc(MaxBackends * sizeof(int32));
+       procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+       count = 0;
+
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+       for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = 
QUEUE_NEXT_LISTENER(i))
+       {
+               int32           pid = QUEUE_BACKEND_PID(i);
+               QueuePosition pos;
+
+               Assert(pid != InvalidPid);
+               pos = QUEUE_BACKEND_POS(i);
+
+               /*
+                * Signal listeners unless they're already caught up.
+                */
+               if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+                       continue;
+
+               /* OK, need to signal this one */
+               pids[count] = pid;
+               procnos[count] = i;
+               count++;
+       }
+       LWLockRelease(NotifyQueueLock);
+
+       /* Now send signals */
+       for (int i = 0; i < count; i++)
+       {
+               int32           pid = pids[i];
+
+               /*
+                * If we are signaling our own process, no need to involve the 
kernel;
+                * just set the flag directly.
+                */
+               if (pid == MyProcPid)
+               {
+                       notifyInterruptPending = true;
+                       continue;
+               }
+
+               /*
+                * Note: assuming things aren't broken, a signal failure here 
could
+                * only occur if the target backend exited since we released
+                * NotifyQueueLock; which is unlikely but certainly possible. 
So we
+                * just log a low-level debug message if it happens.
+                */
+               if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 
0)
+                       elog(DEBUG3, "could not signal backend with PID %d: 
%m", pid);
+       }
+
+       pfree(pids);
+       pfree(procnos);
+}
+
+/*
+ * Discard all notifications in the queue when there are no listeners.
+ *
+ * This is called by VACUUM when the notification queue has old XIDs but no
+ * active listeners exist.  We advance the tail to the head, effectively
+ * discarding all queued notifications, and truncate the SLRU segments.
+ */
+void
+asyncQueueAdvanceTailNoListeners(void)
+{
+       QueuePosition min;
+       int64           oldtailpage;
+       int64           newtailpage;
+       int64           boundary;
+
+       /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
+       LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
+       LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+       /*
+        * Verify that there are still no listeners.
+        */
+       if (QUEUE_FIRST_LISTENER != INVALID_PROC_NUMBER)
+       {
+               LWLockRelease(NotifyQueueLock);
+               LWLockRelease(NotifyQueueTailLock);
+               return;
+       }
+
+       /*
+        * Advance the logical tail to the head, discarding all notifications.
+        */
+       min = QUEUE_HEAD;
+       QUEUE_TAIL = min;
+       oldtailpage = QUEUE_STOP_PAGE;
+       LWLockRelease(NotifyQueueLock);
+
+       /*
+        * We can truncate something if the global tail advanced across an SLRU
+        * segment boundary.
+        *
+        * XXX it might be better to truncate only once every several segments, 
to
+        * reduce the number of directory scans.
+        */
+       newtailpage = QUEUE_POS_PAGE(min);
+       boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
+       if (asyncQueuePagePrecedes(oldtailpage, boundary))
+       {
+               /*
+                * SimpleLruTruncate() will ask for SLRU bank locks but will 
also
+                * release the lock again.
+                */
+               SimpleLruTruncate(NotifyCtl, newtailpage);
+
+               LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+               QUEUE_STOP_PAGE = newtailpage;
+               LWLockRelease(NotifyQueueLock);
+       }
+
+       LWLockRelease(NotifyQueueTailLock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 6c601ce81aa..f93f82e9040 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1739,11 +1739,24 @@ vac_update_datfrozenxid(void)
         * Also consider the oldest XID in the notification queue, since 
backends
         * will need to call TransactionIdDidCommit() on those XIDs when
         * processing the notifications.
+        *
+        * If the queue is blocking datfrozenxid advancement, attempt to clean 
it
+        * up.  If listeners exist, wake them to process their pending
+        * notifications.  If no listeners exist, discard all notifications.
+        * Either way, we back off datfrozenxid for this VACUUM cycle; the next
+        * VACUUM will benefit from the cleanup we've triggered.
         */
        oldestNotifyXid = GetOldestQueuedNotifyXid();
        if (TransactionIdIsValid(oldestNotifyXid) &&
                TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+       {
+               if (asyncQueueHasListeners())
+                       asyncQueueWakeAllListeners();
+               else
+                       asyncQueueAdvanceTailNoListeners();
+
                newFrozenXid = oldestNotifyXid;
+       }
 
        Assert(TransactionIdIsNormal(newFrozenXid));
        Assert(MultiXactIdIsValid(newMinMulti));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index ac323ada492..f9dccf342b5 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,6 +29,11 @@ extern void NotifyMyFrontEnd(const char *channel,
 /* get oldest XID in the notification queue for vacuum freeze */
 extern TransactionId GetOldestQueuedNotifyXid(void);
 
+/* functions for vacuum to manage notification queue */
+extern bool asyncQueueHasListeners(void);
+extern void asyncQueueWakeAllListeners(void);
+extern void asyncQueueAdvanceTailNoListeners(void);
+
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
 extern void Async_Listen(const char *channel);

Reply via email to