On Tue Sep 23, 2025 at 1:11 PM -03, Arseniy Mukhin wrote:
>> Thanks for the explanation! I'm just not sure if I understand why do we
>> need the LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE) on
>> PreCommit_Notify() if we already have the
>> LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock);
>>
>
> Good question. It seems that it would be enough to hold
> NotifyQueueLock only during the head update, so I don't understand it
> either.
>
IIUC correctly we acquire the LockSharedObject(DatabaseRelationId,
InvalidOid, 0, AccessExclusiveLock) to make other COMMIT's wait and the
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE) to make listener backends
wait while we are adding the entries on the queue.

> Thank you for the new version!
>
> The fix looks exactly how I thought about it. But while thinking about
> why we need to hold NotifyQueueLock in PreCommit_Notify I realized
> there is a concurrency bug in the 'head resetting' approach. I thought
> that listeners hold NotifyQueueLock lock while reading notifications
> from the queue, but now I see that they hold it only while reading the
> current head position. So we can end up in the next situation:
>
> There are 2 backends:  listener and writer (backend with notifications)
>
>
>         listener                                   writer
>   ----------------------------------------------------------------------
>                                          writer wants to commit, so it
>                                          adds notifications to the queue
>                                          in PreCommit_Notify() and advances
>                                          queue head.
>
>   ----------------------------------------------------------------------
> listener wants to read notifications. It
> gets into asyncQueueReadAllNotifications()
> and reads the current head (it already
> contains writer's notifications):
>
>
> asyncQueueReadAllNotifications()
>           ...
>     LWLockAcquire(NotifyQueueLock, LW_SHARED);
>     head = QUEUE_HEAD;
>     LWLockRelease(NotifyQueueLock);
>           ...
>
>   ----------------------------------------------------------------------
>                                          writer failed to commit, so it
>                                          resets queue head in AtAbort_Notify()
>                                          and completes the transaction.
>
>   ----------------------------------------------------------------------
> listener gets a snapshot where the writer
> is not in progress.
>
>         ...
>     snapshot = RegisterSnapshot(GetLatestSnapshot());
>         ...
>
>
> This way the listener reads the head that includes all writer's
> notifications and a snapshot where the writer is not in progress, so
> nothing stops the listener from sending these notifications and it's
> even possible to have the listener's position that is after the queue
> head, so yes, it's bad :( Sorry about that.
>
Yeah, this is bad. I'm wondering if we could reproduce such race
conditions scenarios with some TAP tests.

> Probably we can fix it (swap GetLatestSnapshot() with the head
> position reading for example) but now I think that 'committed' field
> approach is a more robust way to fix the bug. What do you think?
>
I also agree that the committed field seems a more safe approach.

> BTW one thing about 'committed' field fix version [0]. It seems that
> instead of remembering all notification positions, we can remember the
> head position after we acquire global lock and before we write
> anything to the queue and in case of abort we can just start from the
> saved position and mark all notifications until the end of the queue
> as 'committed = false'. The reason is the same as with resetting the
> head approach - as we hold global lock, no writers can add
> notifications to the queue, so when we in AtAbort_Notify() all
> notifications after the saved position are ours.
>
See the new attached version which implements this idea of using the
committed field approach. I was just a bit concenerd about a race
condition situation where the QUEUE_HEAD is changed by another publisher
process and just iterating over all entries from the saved previous
QUEUE_HEAD position until the current QUEUE_HEAD position we could mark
successfully committed notifications as not committed by mistake, so in
this new patch version I save the QUEUE_HEAD position before we add the
entries on the shared queue and also the QUEUE_HEAD position after these
entries are added so we ensure that we only process the entries of this
range although we have the global lock
LockSharedObject(DatabaseRelationId) that may prevent this situation.

What do you think?

--
Matheus Alcantara

From 8c1267711f26a7b32b8e2b9693469b3e407bb8a8 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths....@pm.me>
Date: Sat, 6 Sep 2025 11:29:02 -0300
Subject: [PATCH v3] Make AsyncQueueEntry's self contained

Previously the asyncQueueProcessPageEntries() use the
TransactionIdDidCommit() to check if the transaction that a notification
belongs is committed or not. Although this work for almost all scenarios
we may have some cases where if a notification is keep for to long on
the queue and the VACUUM FREEZE is executed during this time it may
remove clog files that is needed to check the transaction status of
these notifications which will cause errors to listener backends when
reading the async queue.

This commit fix this issue by making the AsyncQueueEntry self contained
by adding the "committed" boolean field so asyncQueueProcessPageEntries()
can use this to check if the transaction of the notification is
committed or not.

We set committed as true when adding the entry on the SLRU page buffer
cache when PreCommit_Notify() is called and if an error occur before
AtCommit_Notify() the AtAbort_Notify() will be called which will mark
the committed field as false. We do this by remembering the QUEUE_HEAD
position before the PreCommit_Notify() start adding entries on the
shared queue, and if the transaction crash we iterate from this saved
position until the new QUEUE_HEAD position marking the entries as not
committed.

Also this commit include TAP tests to exercise the VACUUM FREEZE issue
and also the scenario of an error being occur between the
PreCommit_Notify() and AtCommit_Notify() calls.

Co-authored-by: Arseniy Mukhin <arseniy.mukhin....@gmail.com>
---
 src/backend/commands/async.c                  | 168 +++++++++++++++++-
 src/backend/storage/lmgr/lmgr.c               |  19 ++
 src/include/storage/lmgr.h                    |   3 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_listen_notify/Makefile  |  17 ++
 .../modules/test_listen_notify/meson.build    |  14 ++
 .../test_listen_notify/t/001_xid_freeze.pl    |  66 +++++++
 .../t/002_aborted_tx_notifies.pl              |  66 +++++++
 src/tools/pgindent/typedefs.list              |   1 +
 10 files changed, 355 insertions(+), 1 deletion(-)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
 create mode 100644 
src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..49c9c8b0b5c 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -180,6 +180,8 @@ typedef struct AsyncQueueEntry
        Oid                     dboid;                  /* sender's database 
OID */
        TransactionId xid;                      /* sender's XID */
        int32           srcPid;                 /* sender's PID */
+       bool            committed;              /* Is transaction that the 
entry belongs
+                                                                * committed? */
        char            data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
 } AsyncQueueEntry;
 
@@ -401,8 +403,27 @@ struct NotificationHash
        Notification *event;            /* => the actual Notification struct */
 };
 
+/*  Information needed by At_AbortNotify() to remove entries from the queue 
for crashed transactions. */
+typedef struct AtAbortNotifyInfo
+{
+       /*
+        * head position before the transaction start adding entries on the 
shared
+        * queue
+        */
+       QueuePosition previousHead;
+
+       /*
+        * head position after the entries from the in-progress to commit
+        * transaction were added.
+        */
+       QueuePosition head;
+
+} AtAbortNotifyInfo;
+
 static NotificationList *pendingNotifies = NULL;
 
+static AtAbortNotifyInfo *atAbortInfo = NULL;
+
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
  * called from inside a signal handler. That just sets the
@@ -457,6 +478,7 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int     notification_match(const void *key1, const void *key2, Size 
keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void asyncQueueRollbackNotifications(void);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -922,6 +944,18 @@ PreCommit_Notify(void)
                LockSharedObject(DatabaseRelationId, InvalidOid, 0,
                                                 AccessExclusiveLock);
 
+               /*
+                * Before start adding entries on the shared queue, save the 
current
+                * QUEUE_HEAD so if the current in-progress to commit 
transaction
+                * crash we can mark the notifications added by this crashed
+                * transaction as not committed. See AtAbortt_Notify() for more 
info.
+                */
+               Assert(atAbortInfo == NULL);
+               atAbortInfo = palloc(sizeof(AtAbortNotifyInfo));
+               LWLockAcquire(NotifyQueueLock, LW_SHARED);
+               atAbortInfo->previousHead = QUEUE_HEAD;
+               LWLockRelease(NotifyQueueLock);
+
                /* Now push the notifications into the queue */
                nextNotify = list_head(pendingNotifies->events);
                while (nextNotify != NULL)
@@ -948,6 +982,17 @@ PreCommit_Notify(void)
                        LWLockRelease(NotifyQueueLock);
                }
 
+               /*
+                * Save the new QUEUE_HEAD position so if another publisher add
+                * entries on the shared queue and successfully commit the 
transaction
+                * we don't change the committed status of these notifications 
while
+                * marking the notification from a crashed transaction as not
+                * committed.
+                */
+               LWLockAcquire(NotifyQueueLock, LW_SHARED);
+               atAbortInfo->head = QUEUE_HEAD;
+               LWLockRelease(NotifyQueueLock);
+
                /* Note that we don't clear pendingNotifies; AtCommit_Notify 
will. */
        }
 }
@@ -1402,6 +1447,13 @@ asyncQueueAddEntries(ListCell *nextNotify)
                /* Construct a valid queue entry in local variable qe */
                asyncQueueNotificationToEntry(n, &qe);
 
+               /*
+                * Mark the entry as committed. If the transaction that this
+                * notification belongs fails to commit the AtAbort_Notify() 
will mark
+                * this entry as not committed.
+                */
+               qe.committed = true;
+
                offset = QUEUE_POS_OFFSET(queue_head);
 
                /* Check whether the entry really fits on the current page */
@@ -1678,6 +1730,16 @@ AtAbort_Notify(void)
        if (amRegisteredListener && listenChannels == NIL)
                asyncQueueUnregister();
 
+       /*
+        * AtAbort_Notify information is set when we are adding entries on the
+        * global shared queue at PreCommit_Notify(), so in case of a crash on 
the
+        * transaction between the PreCommit_Notify() and AtCommit_Notify() we 
use
+        * this information to mark the entries from the crashed transaction as
+        * not committed.
+        */
+       if (atAbortInfo != NULL)
+               asyncQueueRollbackNotifications();
+
        /* And clean up */
        ClearPendingActionsAndNotifies();
 }
@@ -2066,7 +2128,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                reachedStop = true;
                                break;
                        }
-                       else if (TransactionIdDidCommit(qe->xid))
+                       else if (qe->committed)
                        {
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;
@@ -2385,6 +2447,7 @@ ClearPendingActionsAndNotifies(void)
         */
        pendingActions = NULL;
        pendingNotifies = NULL;
+       atAbortInfo = NULL;
 }
 
 /*
@@ -2395,3 +2458,106 @@ check_notify_buffers(int *newval, void **extra, 
GucSource source)
 {
        return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ *  Mark notifications added on an in-progress to commit transaction as not 
committed.
+ *
+ * Notifications added on the shared global queue are added with committed =
+ * true during PreCommit_Notify() call. If an error occur between the
+ * PreCommit_Notify() and AtCommit_Notify() the AtAbort_Notify() will be called
+ * and we need to mark these notifications added on the shared queue by the
+ * crashed transaction as not committed so that listener backends can skip
+ * these notifications when reading the queue.
+ *
+ * We previously rely on TransactionDidCommit() to check this but if a
+ * notification is keep for too long on the queue and the VACUUM FREEZE is
+ * executed during this period it can remove clog files that is needed to check
+ * the transaction status of this notification, so we make the notification
+ * entries self contained to skip this problem.
+ *
+ */
+static void
+asyncQueueRollbackNotifications(void)
+{
+       QueuePosition current = atAbortInfo->previousHead;
+       QueuePosition head = atAbortInfo->head;
+
+       /*
+        * Iterates from the position saved at the beginning of the transaction
+        * (previousHead) to the current head of the queue. We do this to mark 
all
+        * entries within this range as uncommitted in case of a transaction
+        * crash.
+        */
+       for (;;)
+       {
+               int64           curpage = QUEUE_POS_PAGE(current);
+               int                     curoffset = QUEUE_POS_OFFSET(current);
+               LWLock     *lock = SimpleLruGetBankLock(NotifyCtl, curpage);
+               int                     slotno;
+
+               /*
+                * If we have reached the head, all entries from this 
transaction have
+                * been marked as not committed so break the loop.
+                */
+               if (QUEUE_POS_EQUAL(current, head))
+                       break;
+
+               /*
+                * Acquire an exclusive lock on the current SLRU page to ensure 
no
+                * other process can read or write to it while we are marking 
the
+                * entries.
+                */
+               LWLockAcquire(lock, LW_EXCLUSIVE);
+
+               /* Fetch the page from SLRU to mark entries as not committed. */
+               slotno = SimpleLruReadPage(NotifyCtl, curpage, true, 
InvalidTransactionId);
+
+               /*
+                * Loop through all entries on the current page. The loop will
+                * continue until we reach the end of the page or the current 
head.
+                */
+               for (;;)
+               {
+                       AsyncQueueEntry *qe;
+                       bool            reachedEndOfPage;
+
+                       /*
+                        * Check again to stop processing the entries on the 
current page.
+                        */
+                       if (QUEUE_POS_EQUAL(current, head))
+                               break;
+
+                       /*
+                        * Get a pointer to the current entry within the shared 
page
+                        * buffer.
+                        */
+                       qe = (AsyncQueueEntry *) 
(NotifyCtl->shared->page_buffer[slotno] + curoffset);
+
+                       /*
+                        * Just for sanity, all entries on the shared queue 
should be
+                        * marked as not committed.
+                        */
+                       Assert(qe->committed);
+
+                       /*
+                        * Mark the entry as uncommitted so listener backends 
can skip
+                        * this notification.
+                        */
+                       qe->committed = false;
+
+                       /* Advance our position. */
+                       reachedEndOfPage = asyncQueueAdvance(&current, 
qe->length);
+                       if (reachedEndOfPage)
+                               break;
+
+                       /*
+                        * Update the offset for the next iteration within the 
same page.
+                        */
+                       curoffset = QUEUE_POS_OFFSET(current);
+               }
+
+               /* Release the exclusive lock on the page. */
+               LWLockRelease(lock);
+       }
+}
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 4798eb79003..12a21c51452 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -357,6 +357,25 @@ CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, 
bool orstronger)
        return LockHeldByMe(&tag, lockmode, orstronger);
 }
 
+/*
+ *  CheckSharedObjectLockedByMe
+ *
+ *  Like CheckRelationLockedByMe, but it checks for shared objects.
+ */
+bool
+CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode, bool orstronger)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_OBJECT(tag,
+                                          InvalidOid,
+                                          classid,
+                                          InvalidOid,
+                                          0);
+
+       return LockHeldByMe(&tag, lockmode, orstronger);
+}
+
 /*
  *             LockHasWaitersRelation
  *
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index b7abd18397d..c119c8f4ded 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -50,6 +50,9 @@ extern bool CheckRelationLockedByMe(Relation relation, 
LOCKMODE lockmode,
                                                                        bool 
orstronger);
 extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode,
                                                                           bool 
orstronger);
+extern bool CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode,
+                                                                          bool 
orstronger);
+
 extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 902a7954101..a015c961d35 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -29,6 +29,7 @@ SUBDIRS = \
                  test_int128 \
                  test_integerset \
                  test_json_parser \
+                 test_listen_notify \
                  test_lfind \
                  test_lwlock_tranches \
                  test_misc \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 14fc761c4cf..6af33448d7b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -28,6 +28,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
diff --git a/src/test/modules/test_listen_notify/Makefile 
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY 
support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build 
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..a68052cd353
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,14 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+      't/002_aborted_tx_notifies.pl'
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl 
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..0a5130a042e
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+       'CREATE TABLE t AS SELECT g AS a, g+2 AS b from 
generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+       'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+       $node->safe_psql(
+               'postgres', "
+               BEGIN;
+               NOTIFY s, '$i';
+               COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is 
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from 
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid 
from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+       $notifications_count++;
+       like($i, qr/Asynchronous notification "s" with payload 
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl 
b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
new file mode 100644
index 00000000000..17fcb4b786e
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Test checks that listeners do not receive notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+# Setup
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);');
+
+# Listener
+my $psql_listener = $node->background_psql('postgres');
+$psql_listener->query_safe('LISTEN ch;');
+
+# Session1. Start SERIALIZABLE tx and add a notification.
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'committed';
+");
+
+# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict
+# with session1.
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'aborted';
+");
+
+# Session1 should be committed successfully. Listeners must receive session1
+# notifications.
+$psql_session1->query_safe("COMMIT;");
+
+# Session2 should be aborted due to the conflict with session1. Transaction
+# is aborted after adding notifications to the listen/notify queue, but
+# listeners should not receive session2 notifications.
+$psql_session2->query("COMMIT;");
+
+# send another notification after aborted
+$node->safe_psql('postgres', "NOTIFY ch, 'next_committed';");
+
+# fetch notifications
+my $res = $psql_listener->query_safe('begin; commit;');
+
+# check received notifications
+my @lines = split('\n', $res);
+is(@lines, 2, 'received all committed notifications');
+like($lines[0], qr/Asynchronous notification "ch" with payload "committed" 
received/);
+like($lines[1], qr/Asynchronous notification "ch" with payload 
"next_committed" received/);
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3c80d49b67e..967c9cdf704 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -159,6 +159,7 @@ ArrayType
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
+AtAbortNotifyInfo
 AttInMetadata
 AttStatsSlot
 AttoptCacheEntry
-- 
2.39.5 (Apple Git-154)

Reply via email to