On 03/11/2025 23:45, Joel Jacobson wrote:
On Mon, Nov 3, 2025, at 12:02, Heikki Linnakangas wrote:
I wrote another little stand-alone performance test program for this,
attached. It launches N connections that send NOTIFYs to a single
channel as fast as possible, and M threads that listen for the
notifications. I ran it with different combinations of N and M, on
'master' and on REL_14_STABLE (which didn't have SLRU banks) and I
cannot discern any performance difference from these patches. So it
seems that holding the SLRU (bank) lock across the
TransactionIdDidCommit() calls is fine.

Nice! That for the benchmark code! I took the liberty of hacking a bit
on it, and added support for multiple channels, with separate listener
and notifier threads per channel. Each notification now carries the
notifier ID, a sequence number, and a send timestamp. Listeners verify
that sequence numbers arrive in order and record delivery latency. The
program collects latency measurements into fixed buckets and reports
them once per second together with total and per-second send/receive
counts.

Also added a short delay before starting notifiers so that listeners
have time to issue their LISTEN commands, and a new --channels option,
and the meaning of --listeners and --notifiers was changed to apply per
channel.

Also fixed so the code could be compiled outside of the PostgreSQL
source code repo, if wanting to build this as stand-alone tool.

I've benchmarked master vs 0001+0002 and can't notice any differences;
see attached output from benchmark runs.

Thanks. After some further testing, I was able to find a scenario where this patch significantly reduces performance: if the listening backends subscribe to a massive number of channels, like 10000, they spend a lot of time scanning the linked list of subscribed channels in IsListeningOn(). With the patch, those checks were performed while holding the SLRU lock, and it started to show up as lock contention between notifiers and listeners. To demonstrate that, attached is another version of the test program that adds an --extra-channels=N argument. If you set it to e.g. 10000, each listener backends calls LISTEN on 10000 additional channels that are never notified. They just make the listenChannels list longer. With that and the patches I posted previously, I'm getting:

$ PGHOST=localhost PGDB=postgres://localhost/postgres ./async-notify-test-3 --listeners=50 --notifiers=4 --channels=1 --extra-channels=10000
10 s: 12716 sent (1274/s), 635775 received (63737/s)
 0.00-0.01ms                0 (0.0%) avg: 0.000ms
 0.01-0.10ms                0 (0.0%) avg: 0.000ms
 0.10-1.00ms     #          1915 (0.3%) avg: 0.807ms
 1.00-10.00ms    #########  633550 (99.7%) avg: 3.502ms
 10.00-100.00ms  #          310 (0.0%) avg: 11.423ms
>100.00ms                  0 (0.0%) avg: 0.000ms
^C

Whereas on 'master', I see about 2-3x more notifies/s:

$ PGHOST=localhost PGDB=postgres://localhost/postgres ./async-notify-test-3 --listeners=50 --notifiers=4 --channels=1 --extra-channels=10000
10 s: 32057 sent (3296/s), 1602995 received (164896/s)
 0.00-0.01ms                0 (0.0%) avg: 0.000ms
 0.01-0.10ms     #          11574 (0.7%) avg: 0.078ms
 0.10-1.00ms     ######     1082960 (67.6%) avg: 0.577ms
 1.00-10.00ms    ###        508199 (31.7%) avg: 1.489ms
 10.00-100.00ms  #          262 (0.0%) avg: 16.178ms
>100.00ms                  0 (0.0%) avg: 0.000ms
^C

Fortunately that's easy to fix: We can move the IsListeningOn() check after releasing the lock. See attached.

The elephant in the room of course is that a lookup in a linked list is O(n) and it would be very straightforward to replace it with e.g. a hash table. We should do that irrespective of this bug fix. But I'm inclined to do it as a separate followup patch.

- Heikki
From 7c342e6efffc8d59c2e7658f6f2f3b138d02e0bb Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 4 Nov 2025 13:22:08 +0200
Subject: [PATCH v2 1/2] Fix bug where we truncated CLOG that was still needed
 by LISTEN/NOTIFY
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

The async notification queue contains the XID of the sender, and when
processing notifications we call TransactionIdDidCommit() on the
XID. But we had no safeguards to prevent the CLOG segments containing
those XIDs from being truncated away. As a result, if a backend didn't
for some reason process its notifications for a long time, or when a
new backend issued LISTEN, you could get an error like:

test=# listen c21;
ERROR:  58P01: could not access status of transaction 14279685
DETAIL:  Could not open file "pg_xact/000D": No such file or directory.
LOCATION:  SlruReportIOError, slru.c:1087

Note: This commit is not a full fix. A race condition remains, where a
backend is executing asyncQueueReadAllNotifications() and has just
made a local copy of an async SLRU page which contains old XIDs, while
vacuum concurrently truncates the CLOG covering those XIDs. When the
backend then calls TransactionIdDidCommit() on those XIDs from the
local copy, you still get the error. The next commit will fix that
remaining race condition.

This was first reported by Sergey Zhuravlev in 2021, with many other
people hitting the same issue later. Thanks to:
- Alexandra Wang, Daniil Davydov, Andrei Varashen and Jacques Combrink for
  investigating and providing reproducable test cases,
- Matheus Alcantara and Arseniy Mukhin for earlier proposed patches to
  fix this,
- Álvaro Herrera and Masahiko Sawada for reviewing said earlier patches,
- Yura Sokolov aka funny-falcon for the idea of marking transactions as
  committed in the notification queue, and
- Joel Jacobson for the final patch version. I hope I didn't forget anyone.

Backpatch to all supported versions. I believe the bug goes back all
the way to commit d1e027221d, which introduced the SLRU-based async
notification queue.

Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/cak98qz3wzle-rzjn_y%[email protected]
---
 src/backend/commands/async.c                  | 114 ++++++++++++++++++
 src/backend/commands/vacuum.c                 |   7 ++
 src/include/commands/async.h                  |   3 +
 src/test/isolation/expected/async-notify.out  |  33 ++++-
 src/test/isolation/specs/async-notify.spec    |  24 ++++
 src/test/modules/xid_wraparound/meson.build   |   1 +
 .../xid_wraparound/t/004_notify_freeze.pl     |  75 ++++++++++++
 7 files changed, 256 insertions(+), 1 deletion(-)
 create mode 100644 src/test/modules/xid_wraparound/t/004_notify_freeze.pl

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..ba06234dc8e 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2168,6 +2168,120 @@ asyncQueueAdvanceTail(void)
 	LWLockRelease(NotifyQueueTailLock);
 }
 
+/*
+ * AsyncNotifyFreezeXids
+ *
+ * Prepare the async notification queue for CLOG truncation by freezing
+ * transaction IDs that are about to become inaccessible.
+ *
+ * This function is called by VACUUM before advancing datfrozenxid. It scans
+ * the notification queue and replaces XIDs that would become inaccessible
+ * after CLOG truncation with special markers:
+ * - Committed transactions are set to FrozenTransactionId
+ * - Aborted/crashed transactions are set to InvalidTransactionId
+ *
+ * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
+ * pages will be truncated. If XID < newFrozenXid, it cannot still be running
+ * (or it would have held back newFrozenXid through ProcArray).
+ * Therefore, if TransactionIdDidCommit returns false, we know the transaction
+ * either aborted explicitly or crashed, and we can safely mark it invalid.
+ */
+void
+AsyncNotifyFreezeXids(TransactionId newFrozenXid)
+{
+	QueuePosition pos;
+	QueuePosition head;
+	int64		curpage = -1;
+	int			slotno = -1;
+	char	   *page_buffer = NULL;
+	bool		page_dirty = false;
+
+	/*
+	 * Acquire locks in the correct order to avoid deadlocks. As per the
+	 * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
+	 * bank locks.
+	 *
+	 * We only need SHARED mode since we're just reading the head/tail
+	 * positions, not modifying them.
+	 */
+	LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
+	LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+	pos = QUEUE_TAIL;
+	head = QUEUE_HEAD;
+
+	/* Release NotifyQueueLock early, we only needed to read the positions */
+	LWLockRelease(NotifyQueueLock);
+
+	/*
+	 * Scan the queue from tail to head, freezing XIDs as needed. We hold
+	 * NotifyQueueTailLock throughout to ensure the tail doesn't move while
+	 * we're working.
+	 */
+	while (!QUEUE_POS_EQUAL(pos, head))
+	{
+		AsyncQueueEntry *qe;
+		TransactionId xid;
+		int64		pageno = QUEUE_POS_PAGE(pos);
+		int			offset = QUEUE_POS_OFFSET(pos);
+
+		/* If we need a different page, release old lock and get new one */
+		if (pageno != curpage)
+		{
+			LWLock	   *lock;
+
+			/* Release previous page if any */
+			if (slotno >= 0)
+			{
+				if (page_dirty)
+				{
+					NotifyCtl->shared->page_dirty[slotno] = true;
+					page_dirty = false;
+				}
+				LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+			}
+
+			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			LWLockAcquire(lock, LW_EXCLUSIVE);
+			slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+									   InvalidTransactionId);
+			page_buffer = NotifyCtl->shared->page_buffer[slotno];
+			curpage = pageno;
+		}
+
+		qe = (AsyncQueueEntry *) (page_buffer + offset);
+		xid = qe->xid;
+
+		if (TransactionIdIsNormal(xid) &&
+			TransactionIdPrecedes(xid, newFrozenXid))
+		{
+			if (TransactionIdDidCommit(xid))
+			{
+				qe->xid = FrozenTransactionId;
+				page_dirty = true;
+			}
+			else
+			{
+				qe->xid = InvalidTransactionId;
+				page_dirty = true;
+			}
+		}
+
+		/* Advance to next entry */
+		asyncQueueAdvance(&pos, qe->length);
+	}
+
+	/* Release final page lock if we acquired one */
+	if (slotno >= 0)
+	{
+		if (page_dirty)
+			NotifyCtl->shared->page_dirty[slotno] = true;
+		LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+	}
+
+	LWLockRelease(NotifyQueueTailLock);
+}
+
 /*
  * ProcessIncomingNotify
  *
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..e785dd55ce5 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
+#include "commands/async.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
@@ -1941,6 +1942,12 @@ vac_truncate_clog(TransactionId frozenXID,
 		return;
 	}
 
+	/*
+	 * Freeze any old transaction IDs in the async notification queue before
+	 * CLOG truncation.
+	 */
+	AsyncNotifyFreezeXids(frozenXID);
+
 	/*
 	 * Advance the oldest value for commit timestamps before truncating, so
 	 * that if a user requests a timestamp for a transaction we're truncating
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..aaec7314c10 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* freeze old transaction IDs in notify queue (called by VACUUM) */
+extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid);
+
 #endif							/* ASYNC_H */
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..20d5763f319 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 4 sessions
 
 starting permutation: listenc notify1 notify2 notify3 notifyf
 step listenc: LISTEN c1; LISTEN c2;
@@ -104,6 +104,37 @@ step l2commit: COMMIT;
 listener2: NOTIFY "c1" with payload "" from notifier
 step l2stop: UNLISTEN *;
 
+starting permutation: llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+step llisten: LISTEN c1; LISTEN c2;
+step n1begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n1select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n1insert: INSERT INTO t1 DEFAULT VALUES;
+step notify1: NOTIFY c1;
+step n2begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n2select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n2insert: INSERT INTO t1 DEFAULT VALUES;
+step n2notify1: NOTIFY c1, 'n2_payload';
+step n1commit: COMMIT;
+step n2commit: COMMIT;
+ERROR:  could not serialize access due to read/write dependencies among transactions
+step notify1: NOTIFY c1;
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c1" with payload "" from notifier
+
 starting permutation: llisten lbegin usage bignotify usage
 step llisten: LISTEN c1; LISTEN c2;
 step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..51b7ad43849 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -5,6 +5,10 @@
 # Note we assume that each step is delivered to the backend as a single Query
 # message so it will run as one transaction.
 
+# t1 table is used for serializable conflict
+setup { CREATE TABLE t1 (a bigserial); }
+teardown { DROP TABLE t1; }
+
 session notifier
 step listenc	{ LISTEN c1; LISTEN c2; }
 step notify1	{ NOTIFY c1; }
@@ -33,8 +37,21 @@ step notifys1	{
 }
 step usage		{ SELECT pg_notification_queue_usage() > 0 AS nonzero; }
 step bignotify	{ SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
+step n1begins   { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n1select   { SELECT * FROM t1; }
+step n1insert   { INSERT INTO t1 DEFAULT VALUES; }
+step n1commit   { COMMIT; }
 teardown		{ UNLISTEN *; }
 
+# notifier2 session is used to reproduce serializable conflict with notifier
+
+session notifier2
+step n2begins    { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n2select    { SELECT * FROM t1; }
+step n2insert    { INSERT INTO t1 DEFAULT VALUES; }
+step n2commit    { COMMIT; }
+step n2notify1   { NOTIFY c1, 'n2_payload';  }
+
 # The listener session is used for cross-backend notify checks.
 
 session listener
@@ -73,6 +90,13 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
 # and notify queue is not empty
 permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
 
+# Test checks that listeners ignore notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+permutation llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+
 # Verify that pg_notification_queue_usage correctly reports a non-zero result,
 # after submitting notifications while another connection is listening for
 # those notifications and waiting inside an active transaction.  We have to
diff --git a/src/test/modules/xid_wraparound/meson.build b/src/test/modules/xid_wraparound/meson.build
index f7dada67f67..3aec430df8c 100644
--- a/src/test/modules/xid_wraparound/meson.build
+++ b/src/test/modules/xid_wraparound/meson.build
@@ -30,6 +30,7 @@ tests += {
       't/001_emergency_vacuum.pl',
       't/002_limits.pl',
       't/003_wraparounds.pl',
+      't/004_notify_freeze.pl',
     ],
   },
 }
diff --git a/src/test/modules/xid_wraparound/t/004_notify_freeze.pl b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
new file mode 100644
index 00000000000..e0386afe26a
--- /dev/null
+++ b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
@@ -0,0 +1,75 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test freezing XIDs in the async notification queue. This isn't
+# really wraparound-related, but depends on the consume_xids() helper
+# function.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\bxid_wraparound\b/)
+{
+	plan skip_all => "test xid_wraparound not enabled in PG_TEST_EXTRA";
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+	'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# Send multiple notify's from other sessions
+for my $i (1 .. 10)
+{
+	$node->safe_psql(
+		'postgres', "
+		BEGIN;
+		NOTIFY s, '$i';
+		COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation, and one more with
+# 'txid_current' to bump up the freeze horizon.
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+$node->safe_psql('postgres', 'select txid_current()');
+
+# Remember current datfrozenxid before vacuum freeze so that we can
+# check that it is advanced. (Taking the min() this way assumes that
+# XID wraparound doesn't happen.)
+my $datafronzenxid = $node->safe_psql('postgres',
+	"select min(datfrozenxid::text::bigint) from pg_database");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+	"vacuumdb --all --freeze");
+
+# Check that vacuumdb advanced datfrozenxid
+my $datafronzenxid_freeze = $node->safe_psql('postgres',
+	"select min(datfrozenxid::text::bigint) from pg_database");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid advanced');
+
+# On Session 1, commit and ensure that the all the notifications are
+# received. This depends on correctly freezing the XIDs in the pending
+# notification entries.
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+	$notifications_count++;
+	like($i,
+		qr/Asynchronous notification "s" with payload "$notifications_count" received/
+	);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
-- 
2.47.3

From 6b416d7812ca8db0830307f29cd151af95e3fd92 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 4 Nov 2025 13:24:30 +0200
Subject: [PATCH v2 2/2] Fix remaining race condition with CLOG truncation and
 LISTEN/NOTIFY

Previous commit fixed a bug where VACUUM can truncate the CLOG that's
still needed to check the commit status of XIDs in the async notify
queue, but as mentioned in the commit message, it wasn't a full
fix. If a backend is executing asyncQueueReadAllNotifications() and
has just made a local copy of an async SLRU page which contains old
XIDs, vacuum can concurrently truncate the CLOG covering those XIDs,
and we will still get an error when backend then calls
TransactionIdDidCommit() on those XIDs in the local copy. This commit
fixes that race condition.

To fix, hold the SLRU bank lock across the TransactionIdDidCommit()
calls in NOTIFY processing.

Per Tom Lane's idea. Backpatch to all supported versions.

Reviewed-by: Joel Jacobson
Discussion: https://www.postgresql.org/message-id/[email protected]
---
 src/backend/commands/async.c | 109 +++++++++++++++--------------------
 1 file changed, 48 insertions(+), 61 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ba06234dc8e..8ac7d989641 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -448,7 +448,6 @@ static void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
 										 Snapshot snapshot);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
@@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void)
 	QueuePosition head;
 	Snapshot	snapshot;
 
-	/* page_buffer must be adequately aligned, so use a union */
-	union
-	{
-		char		buf[QUEUE_PAGESIZE];
-		AsyncQueueEntry align;
-	}			page_buffer;
-
 	/* Fetch current state */
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
@@ -1932,37 +1924,6 @@ asyncQueueReadAllNotifications(void)
 
 		do
 		{
-			int64		curpage = QUEUE_POS_PAGE(pos);
-			int			curoffset = QUEUE_POS_OFFSET(pos);
-			int			slotno;
-			int			copysize;
-
-			/*
-			 * We copy the data from SLRU into a local buffer, so as to avoid
-			 * holding the SLRU lock while we are examining the entries and
-			 * possibly transmitting them to our frontend.  Copy only the part
-			 * of the page we will actually inspect.
-			 */
-			slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-												InvalidTransactionId);
-			if (curpage == QUEUE_POS_PAGE(head))
-			{
-				/* we only want to read as far as head */
-				copysize = QUEUE_POS_OFFSET(head) - curoffset;
-				if (copysize < 0)
-					copysize = 0;	/* just for safety */
-			}
-			else
-			{
-				/* fetch all the rest of the page */
-				copysize = QUEUE_PAGESIZE - curoffset;
-			}
-			memcpy(page_buffer.buf + curoffset,
-				   NotifyCtl->shared->page_buffer[slotno] + curoffset,
-				   copysize);
-			/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
-			LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
-
 			/*
 			 * Process messages up to the stop position, end of page, or an
 			 * uncommitted message.
@@ -1978,9 +1939,7 @@ asyncQueueReadAllNotifications(void)
 			 * rewrite pages under us. Especially we don't want to hold a lock
 			 * while sending the notifications to the frontend.
 			 */
-			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+			reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -2000,13 +1959,6 @@ asyncQueueReadAllNotifications(void)
  * Fetch notifications from the shared queue, beginning at position current,
  * and deliver relevant ones to my frontend.
  *
- * The current page must have been fetched into page_buffer from shared
- * memory.  (We could access the page right in shared memory, but that
- * would imply holding the SLRU bank lock throughout this routine.)
- *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
- *
  * The function returns true once we have reached the stop position or an
  * uncommitted notification, and false if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
@@ -2015,16 +1967,34 @@ asyncQueueReadAllNotifications(void)
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
 							 Snapshot snapshot)
 {
+	int64		curpage = QUEUE_POS_PAGE(*current);
+	int			slotno;
+	char	   *page_buffer;
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
-	AsyncQueueEntry *qe;
+
+	/*
+	 * We copy the entries into a local buffer to avoid holding the SLRU lock
+	 * while we transmit them to our frontend.  The local buffer must be
+	 * adequately aligned, so use a union.
+	 */
+	union
+	{
+		char		buf[QUEUE_PAGESIZE];
+		AsyncQueueEntry align;
+	}			local_buf;
+	char	   *local_buf_end = local_buf.buf;
+
+	slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+										InvalidTransactionId);
+	page_buffer = NotifyCtl->shared->page_buffer[slotno];
 
 	do
 	{
 		QueuePosition thisentry = *current;
+		AsyncQueueEntry *qe;
 
 		if (QUEUE_POS_EQUAL(thisentry, stop))
 			break;
@@ -2068,16 +2038,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 			}
 			else if (TransactionIdDidCommit(qe->xid))
 			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
+				memcpy(local_buf_end, qe, qe->length);
+				local_buf_end += qe->length;
 			}
 			else
 			{
@@ -2091,6 +2053,31 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		/* Loop back if we're not at end of page */
 	} while (!reachedEndOfPage);
 
+	/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+	LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+
+	/*
+	 * Now that we have let go of the SLRU bank lock, send the notifications
+	 * to our backend
+	 */
+	Assert(local_buf_end - local_buf.buf <= BLCKSZ);
+	for (char *p = local_buf.buf; p < local_buf_end;)
+	{
+		AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
+		/* qe->data is the null-terminated channel name */
+		char	   *channel = qe->data;
+
+		if (IsListeningOn(channel))
+		{
+			/* payload follows channel name */
+			char	   *payload = qe->data + strlen(channel) + 1;
+
+			NotifyMyFrontEnd(channel, payload, qe->srcPid);
+		}
+
+		p += qe->length;
+	}
+
 	if (QUEUE_POS_EQUAL(*current, stop))
 		reachedStop = true;
 
-- 
2.47.3

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <getopt.h>
#include <pthread.h>
#include <math.h>

#include "libpq-fe.h"

#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
#define CONNECTION_STRING "postgresql:///postgres"
#define MAX_NOTIFIERS 256  /* Maximum notifiers per channel for sequence tracking */

/* Latency histogram buckets */
#define NUM_BUCKETS 6
static uint64_t bucket_counts[NUM_BUCKETS];
static uint64_t bucket_totals[NUM_BUCKETS];  /* Total latency in microseconds */
static pthread_mutex_t histogram_mutex = PTHREAD_MUTEX_INITIALIZER;

static uint32_t num_notifies_sent;
static uint32_t num_notifies_received;
static volatile int start_notifying = 0;  /* Signal for notifiers to start */

/* --extra-channels argument */
static int			num_extra_channels = 0;

/* Thread arguments structure */
struct thread_args {
	int channel_id;
	int notifier_id;  /* ID of notifier within channel (0 to num_notify_threads-1) */
	int num_notifiers;  /* Total number of notifiers per channel */
	uint64_t *seq_counter;  /* Pointer to this notifier's sequence counter */
};

typedef int64_t pg_time_usec_t;

static inline pg_time_usec_t
pg_time_now(void)
{
	struct timeval tv;

	gettimeofday(&tv, NULL);

	return (pg_time_usec_t) (tv.tv_sec * 1000000 + tv.tv_usec);
}

static void
exit_nicely(PGconn *conn)
{
	PQfinish(conn);
	exit(1);
}

static int
get_latency_bucket(double latency_ms)
{
	/* Buckets: 0-0.01ms, 0.01-0.1ms, 0.1-1ms, 1-10ms, 10-100ms, >100ms */
	if (latency_ms < 0.01)
		return 0;
	else if (latency_ms < 0.1)
		return 1;
	else if (latency_ms < 1.0)
		return 2;
	else if (latency_ms < 10.0)
		return 3;
	else if (latency_ms < 100.0)
		return 4;
	else
		return 5;
}

static void
update_histogram(uint64_t latency_usec)
{
	double latency_ms = latency_usec / 1000.0;
	int bucket = get_latency_bucket(latency_ms);

	pthread_mutex_lock(&histogram_mutex);
	bucket_counts[bucket]++;
	bucket_totals[bucket] += latency_usec;
	pthread_mutex_unlock(&histogram_mutex);
}

static void *
notify_thread_main(void *arg)
{
	struct thread_args *args = (struct thread_args *)arg;
	int channel_id = args->channel_id;
	int notifier_id = args->notifier_id;
	uint64_t *seq_counter = args->seq_counter;
	PGconn	   *conn;
	PGresult   *res;
	char		channel_name[32];

	/* Generate channel name from channel_id */
	snprintf(channel_name, sizeof(channel_name), "%d", channel_id);

	/* Make a connection to the database */
	conn = PQconnectdb(CONNECTION_STRING);

	/* Check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(conn));
		exit_nicely(conn);
	}

	/* Wait for signal to start notifying */
	while (!start_notifying)
		usleep(10000);  /* Sleep 10ms */

	for(;;)
	{
		char buf[128];
		pg_time_usec_t send_time;
		uint64_t seq;

		/* Get timestamp before sending */
		send_time = pg_time_now();

		/* Atomically get and increment this notifier's sequence counter */
		seq = __sync_fetch_and_add(seq_counter, 1);

		/* Send notification with notifier_id, sequence number, and timestamp */
		snprintf(buf, sizeof(buf), "NOTIFY \"%s\", '%d %lld %lld'",
				 channel_name, notifier_id, (long long)seq, (long long)send_time);
		res = PQexec(conn, buf);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "NOTIFY command failed: %s", PQerrorMessage(conn));
			PQclear(res);
			exit_nicely(conn);
		}
		PQclear(res);

		__sync_fetch_and_add(&num_notifies_sent, 1);
	}
}

static void *
listen_thread_main(void *arg)
{
	struct thread_args *args = (struct thread_args *)arg;
	int channel_id = args->channel_id;
	int num_notifiers = args->num_notifiers;
	PGconn	   *conn;
	PGresult   *res;
	PGnotify   *notify;
	uint64_t	expected_seq[MAX_NOTIFIERS];
	char		channel_name[32];
	char		listen_cmd[64];

	/* Initialize expected sequence for each notifier */
	for (int i = 0; i < MAX_NOTIFIERS; i++)
		expected_seq[i] = 0;

	/* Generate channel name from channel_id */
	snprintf(channel_name, sizeof(channel_name), "%d", channel_id);

	/* Make a connection to the database */
	conn = PQconnectdb(CONNECTION_STRING);

	/* Check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "%s", PQerrorMessage(conn));
		exit_nicely(conn);
	}


	/*
	 * Issue LISTEN command for "extra" channels. The extra channels are never
	 * notified, they're used just to bloat the list of channels that notify
	 * processing needs to traverse.
	 */
	for (int i = 0; i < num_extra_channels; i++)
	{
		snprintf(listen_cmd, sizeof(listen_cmd), "LISTEN \"extra%d\"", i);
		res = PQexec(conn, listen_cmd);
		if (PQresultStatus(res) != PGRES_COMMAND_OK)
		{
			fprintf(stderr, "LISTEN command failed: %s", PQerrorMessage(conn));
			PQclear(res);
			exit_nicely(conn);
		}
		PQclear(res);
	}

	/*
	 * Issue LISTEN command to enable notifications from the rule's NOTIFY.
	 */
	snprintf(listen_cmd, sizeof(listen_cmd), "LISTEN \"%s\"", channel_name);
	res = PQexec(conn, listen_cmd);
	if (PQresultStatus(res) != PGRES_COMMAND_OK)
	{
		fprintf(stderr, "LISTEN command failed: %s", PQerrorMessage(conn));
		PQclear(res);
		exit_nicely(conn);
	}
	PQclear(res);

	for (;;)
	{
		/*
		 * Sleep until something happens on the connection.  We use select(2)
		 * to wait for input, but you could also use poll() or similar
		 * facilities.
		 */
		int			sock;
		fd_set		input_mask;

		sock = PQsocket(conn);

		if (sock < 0)
			break;				/* shouldn't happen */

		FD_ZERO(&input_mask);
		FD_SET(sock, &input_mask);

		if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
		{
			fprintf(stderr, "select() failed: %s\n", strerror(errno));
			exit_nicely(conn);
		}

		/* Now check for input */
		PQconsumeInput(conn);
		while ((notify = PQnotifies(conn)) != NULL)
		{
			pg_time_usec_t recv_time;
			pg_time_usec_t send_time;
			int notifier_id;
			uint64_t seq;
			uint64_t latency_usec;

			/* Get receive timestamp */
			recv_time = pg_time_now();

			/* Parse notifier_id, sequence number, and send timestamp from payload */
			if (notify->extra && notify->extra[0])
			{
				if (sscanf(notify->extra, "%d %lld %lld", &notifier_id, (long long *)&seq, (long long *)&send_time) == 3)
				{
					/* Validate notifier_id */
					if (notifier_id < 0 || notifier_id >= num_notifiers)
					{
						fprintf(stderr, "\nERROR: Channel %d received invalid notifier_id %d (expected 0-%d)\n",
								channel_id, notifier_id, num_notifiers - 1);
						abort();
					}

					/* Verify sequence number for this notifier */
					if (seq != expected_seq[notifier_id])
					{
						fprintf(stderr, "\nERROR: Channel %d notifier %d sequence gap! Expected %lld, received %lld\n",
								channel_id, notifier_id, (long long)expected_seq[notifier_id], (long long)seq);
						abort();
					}
					expected_seq[notifier_id]++;

					latency_usec = recv_time - send_time;

					/* Update histogram */
					update_histogram(latency_usec);
				}
			}

			PQfreemem(notify);
			PQconsumeInput(conn);

			__sync_fetch_and_add(&num_notifies_received, 1);
		}
	}

	return NULL;
}

int
main(int argc, char **argv)
{
	int			num_threads = 0;
	pthread_t  *threads;
	pg_time_usec_t start;
	static struct option long_options[] = {
		/* systematic long/short named options */
		{"listeners", required_argument, NULL, 1},
		{"notifiers", required_argument, NULL, 2},
		{"channels", required_argument, NULL, 3},
		{"extra-channels", required_argument, NULL, 4},
		{NULL, 0, NULL, 0}
	};
	int			num_listen_threads = 1;
	int			num_notify_threads = 1;
	int			num_channels = 1;
	int			optindex;
	int			c;

	while ((c = getopt_long(argc, argv, "", long_options, &optindex)) != -1)
	{
		switch (c)
		{
			case 1:				/* listeners */
				num_listen_threads = atoi(optarg);
				if (num_listen_threads < 1)
				{
					fprintf(stderr, "invalid --listeners argument\n");
					exit(1);
				}
				break;

			case 2:				/* notifiers */
				num_notify_threads = atoi(optarg);
				if (num_notify_threads < 1)
				{
					fprintf(stderr, "invalid --notifiers argument\n");
					exit(1);
				}
				break;

			case 3:				/* channels */
				num_channels = atoi(optarg);
				if (num_channels < 1)
				{
					fprintf(stderr, "invalid --channels argument\n");
					exit(1);
				}
				break;

			case 4:				/* extra-channels */
				num_extra_channels = atoi(optarg);
				if (num_extra_channels < 0)
				{
					fprintf(stderr, "invalid --extra-channels argument\n");
					exit(1);
				}
				break;
		}
	}

	int total_threads = num_channels * (num_notify_threads + num_listen_threads);
	threads = malloc(total_threads * sizeof(pthread_t));
	struct thread_args *thread_args_array = malloc(total_threads * sizeof(struct thread_args));

	/* Allocate sequence counters for each notifier thread (initialized to 0) */
	uint64_t *notifier_seqs = calloc(num_channels * num_notify_threads, sizeof(uint64_t));

	/* Spawn threads for each channel */
	for (int channel_id = 0; channel_id < num_channels; channel_id++)
	{
		/* Spawn notifier threads for this channel */
		for (int i = 0; i < num_notify_threads; i++)
		{
			int			s;
			int			notifier_index = channel_id * num_notify_threads + i;

			thread_args_array[num_threads].channel_id = channel_id;
			thread_args_array[num_threads].notifier_id = i;
			thread_args_array[num_threads].num_notifiers = num_notify_threads;
			thread_args_array[num_threads].seq_counter = &notifier_seqs[notifier_index];
			s = pthread_create(&threads[num_threads], NULL,
							   &notify_thread_main, &thread_args_array[num_threads]);
			if (s != 0)
			{
				fprintf(stderr, "pthread_create failed\n");
				exit(1);
			}
			num_threads++;
		}

		/* Spawn listener threads for this channel */
		for (int i = 0; i < num_listen_threads; i++)
		{
			int			s;

			thread_args_array[num_threads].channel_id = channel_id;
			thread_args_array[num_threads].notifier_id = -1;  /* Not used for listeners */
			thread_args_array[num_threads].num_notifiers = num_notify_threads;
			thread_args_array[num_threads].seq_counter = NULL;  /* Not used for listeners */
			s = pthread_create(&threads[num_threads], NULL,
							   &listen_thread_main, &thread_args_array[num_threads]);
			if (s != 0)
			{
				fprintf(stderr, "pthread_create failed\n");
				exit(1);
			}
			num_threads++;
		}
	}

	/* Give listeners time to establish LISTEN before notifiers start sending */
	sleep(5);

	/* Signal notifiers to start */
	start_notifying = 1;

	start = pg_time_now();

	uint32_t prev_sent = 0;
	uint32_t prev_received = 0;
	int first_iteration = 1;

	for (;;)
	{
		double		elapsed_sec;
		uint32_t	curr_sent;
		uint32_t	curr_received;
		uint32_t	sent_per_sec;
		uint32_t	received_per_sec;

		sleep(1);

		/* Move cursor back up before printing (except first time) */
		if (!first_iteration)
			fprintf(stderr, "\033[%dA\r", NUM_BUCKETS + 1);
		first_iteration = 0;

		elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);

		curr_sent = num_notifies_sent;
		curr_received = num_notifies_received;
		sent_per_sec = curr_sent - prev_sent;
		received_per_sec = curr_received - prev_received;

		/* Print stats on same line */
		fprintf(stderr, "\r%.0f s: %u sent (%u/s), %u received (%u/s)    ",
				elapsed_sec, curr_sent, sent_per_sec, curr_received, received_per_sec);

		/* Print histogram */
		pthread_mutex_lock(&histogram_mutex);

		uint64_t total_measured = 0;
		for (int i = 0; i < NUM_BUCKETS; i++)
			total_measured += bucket_counts[i];

		if (total_measured > 0)
		{
			const char *bucket_labels[] = {
				" 0.00-0.01ms   ",
				" 0.01-0.10ms   ",
				" 0.10-1.00ms   ",
				" 1.00-10.00ms  ",
				" 10.00-100.00ms",
				">100.00ms     "
			};

			fprintf(stderr, "\n");
			for (int i = 0; i < NUM_BUCKETS; i++)
			{
				uint64_t count = bucket_counts[i];
				double percentage = (count * 100.0) / total_measured;
				double avg_latency_ms = 0.0;

				if (count > 0)
					avg_latency_ms = (bucket_totals[i] / 1000.0) / count;

				/* Draw bar chart (max 10 chars) */
				int bar_length = (int)((count * 10) / total_measured);
				if (bar_length == 0 && count > 0)
					bar_length = 1;

				fprintf(stderr, "%s  ", bucket_labels[i]);
				for (int j = 0; j < bar_length; j++)
					fprintf(stderr, "#");
				for (int j = bar_length; j < 10; j++)
					fprintf(stderr, " ");

				fprintf(stderr, " %llu (%.1f%%) avg: %.3fms\n",
						(unsigned long long)count, percentage, avg_latency_ms);
			}
		}

		pthread_mutex_unlock(&histogram_mutex);
		fflush(stderr);

		prev_sent = curr_sent;
		prev_received = curr_received;
	}

	return 0;
}

Reply via email to