On 06/11/2025 17:13, Arseniy Mukhin wrote:
Let's say we have the queue:

(tail ... pos1 ... bad_entry_pos ... head)

bad_entry_pos - position of the entry where TransactionIdDidCommit fails.

We have the listener L1 with pos = pos1. It means every new listener
should process the queue from pos1 (as it's max(listener's pos)) up to
the queue head and when they try to do it, they will fail on
'bad_entry'.

Gotcha.

Another small change we could make is to check for listenChannels == NIL
before calling TransactionIdDidCommit.

There is a comment that describes why we need this initial reading in
Exec_ListenPreCommit:

     /*
      * This is our first LISTEN, so establish our pointer.
      *
      * We set our pointer to the global tail pointer and then move it forward
      * over already-committed notifications.  This ensures we cannot miss any
      * not-yet-committed notifications.  We might get a few more but that
      * doesn't hurt.

It seems that with such a check we will skip not-yet-committed
notifications and always get to the HEAD. If we decide that it's ok,
maybe we can just set 'pos' of every new listener to the HEAD without
reading?

Right, I didn't mean skipping asyncQueueReadAllNotifications() altogether. Just the TransactionIdDidCommit() calls in it, when listenChannels == NIL, see attached patch. The idea is that when 'listenChannels == NIL', we're not going send the notification to the frontend regardless of what TransactionIdDidCommit says. If we don't call TransactionIdDidCommit, we won't fail on bad entries.

I'm not sure how much this matters. We really shouldn't have bad entries in the queue to begin with. And if we do, a broken entry could cause all kinds of trouble. For example, if the broken entry's (bogus) XID is higher than current XID, XidInMVCCSnapshot() will return true and we'll get stuck on that.

- Heikki
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..58656b53e5d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2066,7 +2066,20 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                reachedStop = true;
                                break;
                        }
-                       else if (TransactionIdDidCommit(qe->xid))
+
+                       /*
+                        * Quick check for the case that we're not listening on 
any
+                        * channels, before calling TransactionIdDidCommit().  
This might
+                        * be marginally faster, but perhaps more importantly, 
it ensures
+                        * that if there's a bad entry in the queue for some 
reason for
+                        * which TransactionIdDidCommit() fails, we can skip 
over it on
+                        * the first LISTEN in a session, and not get stuck on 
it
+                        * indefinitely.
+                        */
+                       if (listenChannels == NIL)
+                               continue;
+
+                       if (TransactionIdDidCommit(qe->xid))
                        {
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;

Reply via email to