On 05/11/2025 21:02, Matheus Alcantara wrote:
On Wed Nov 5, 2025 at 9:59 AM -03, Heikki Linnakangas wrote:
In case of an error on TransactionIdDidCommit() I think that we will
have the same behavior as we advance the "current" position before of
DidCommit call the PG_FINALLY block will set the backend position past
the failing notification entry.
With my patch, if TransactionIdDidCommit() fails, we will lose all the
notifications that we have buffered in the local buffer but haven't
passed to NotifyMyFrontEnd() yet. On 'master', you only lose a single
notification, the one where TransactionIdDidCommit() failed.
Yeah, that's true.
How bad would be to store the notification entries within a list and
store the next position with the notification entry and then wrap the
NotifyMyFrontEnd() call within a PG_TRY and update the "current" to the
saved "next position" on PG_CATCH? Something like this:
[ ...]
That addresses the failure on NotifyMyFrontEnd, but not on
TransactionIdDidCommit.
IMHO we should just make these errors FATAL. TransactionIdDidCommit()
should not really fail, after fixing the bug we're discussing.
NotifyMyFrontEnd() could fail on OOM, but that seems pretty unlikely on
an otherwise idle connection. Or it could fail if the client connection
is lost, but then the backend is about to die anyway. And arguably
closing the connection is better than losing even a single notification,
anyway.
My only concern with making these errors FATAL is that if a notification
entry causes a different, recoverable error, all subsequent messages
will be lost. This is because if backend die and the user open a new
connection and execute LISTEN on the channel it will not see these
notifications past the one that caused the error. I'm not sure if we are
completely safe from this case of a recoverable error, what do you
think?
I did some more testing of the current behavior, using the encoding
conversion to cause an error:
In backend A:
SET client_encoding='latin1';
LISTEN foo;
Backend b:
NOTIFY foo, 'ハ';
Observations:
- If the connection is idle when the notification is received, the ERROR
is turned into FATAL anyway:
postgres=# SET client_encoding='latin1';
SET
postgres=# LISTEN foo;
LISTEN
postgres=# select 1; -- do the NOTIFY in another connection before this
ERROR: character with byte sequence 0xe3 0x83 0x8f in encoding "UTF8"
has no equivalent in encoding "LATIN1"
FATAL: terminating connection because protocol synchronization was lost
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
- If there are multiple notifications pending, we stop processing the
subsequent notifications on the first error. The subsequent
notifications will only be processed when another notify interrupt is
received, i.e. when a backend sends yet another notification.
I'm getting more and more convinced that escalating all ERRORs to FATALs
during notify processing is the right way to go. Attached is a new patch
set that does that.
- Heikki
From c1e72ebc3ca840cb75d3fd004abba1944a028304 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Thu, 6 Nov 2025 11:21:39 +0200
Subject: [PATCH v3 1/3] Escalate ERRORs during async notify processing to
FATAL
Previously, if async notify processing encountered an error, we would
report the error to the client and advance our read position past the
offending entry to prevent trying to process it over and over
again. Trying to continue after an error has a few problems however:
- We have no way of telling the client that a notification was
lost. It's not clear if keeping the connection alive after losing a
notification is a good thing. Depending on the application logic,
missing a notification could for example cause the application to
get stuck waiting.
- If the connection is idle, PqCommReadingMsg is set and any ERROR is
turned into FATAL anyway.
- We bailed out of the notification processing loop on first error
without processing any subsequent notifications, until another
notify interrupt arrives. For example, if there were two
notifications pending, and processing the first one caused an ERROR,
the second notification would not be processed until someone sent a
new NOTIFY.
This commit changes the behavior so that any ERROR while processing
async notifications is turned into FATAL, causing the client
connection to be dropped. That makes the behavior more consistent as
that's what happened in idle state already, and dropping the
connection is a clear signal to the application that it might've
missed some notifications.
The reason to do this now is that the next commits will change the
notification processing code in a way that would make it harder to
skip over just the offending notification entry on error.
Discussion: https://www.postgresql.org/message-id/[email protected]
---
src/backend/commands/async.c | 34 +++++++++++++++++++++-------------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..6b844808ef3 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -446,7 +446,7 @@ static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
-static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
+static bool asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
char *page_buffer,
Snapshot snapshot);
@@ -1850,7 +1850,7 @@ ProcessNotifyInterrupt(bool flush)
static void
asyncQueueReadAllNotifications(void)
{
- volatile QueuePosition pos;
+ QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
@@ -1920,16 +1920,25 @@ asyncQueueReadAllNotifications(void)
* It is possible that we fail while trying to send a message to our
* frontend (for example, because of encoding conversion failure). If
* that happens it is critical that we not try to send the same message
- * over and over again. Therefore, we place a PG_TRY block here that will
- * forcibly advance our queue position before we lose control to an error.
- * (We could alternatively retake NotifyQueueLock and move the position
- * before handling each individual message, but that seems like too much
- * lock traffic.)
+ * over and over again. Therefore, we set ExitOnAnyError to upgrade any
+ * ERRORs to FATAL, causing the client connection to be closed on error.
+ *
+ * We used to only skip over only the offending message and try to soldier
+ * on, but it was a little questionable to lose a notification and give
+ * the client ERRORs instead. A client application would not be prepared
+ * for that and can't tell that a notification was missed. It was also
+ * not very useful in practice because notifications are often processed
+ * while a connection is idle and reading a message from the client, and
+ * in that state, any error is upgraded to FATAL anyway. Closing the
+ * connection is a clear signal to the application that it might have
+ * missed notifications.
*/
- PG_TRY();
{
+ bool save_ExitOnAnyError = ExitOnAnyError;
bool reachedStop;
+ ExitOnAnyError = true;
+
do
{
int64 curpage = QUEUE_POS_PAGE(pos);
@@ -1982,15 +1991,14 @@ asyncQueueReadAllNotifications(void)
page_buffer.buf,
snapshot);
} while (!reachedStop);
- }
- PG_FINALLY();
- {
+
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyProcNumber) = pos;
LWLockRelease(NotifyQueueLock);
+
+ ExitOnAnyError = save_ExitOnAnyError;
}
- PG_END_TRY();
/* Done with snapshot */
UnregisterSnapshot(snapshot);
@@ -2013,7 +2021,7 @@ asyncQueueReadAllNotifications(void)
* The QueuePosition *current is advanced past all processed messages.
*/
static bool
-asyncQueueProcessPageEntries(volatile QueuePosition *current,
+asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
char *page_buffer,
Snapshot snapshot)
--
2.47.3
From 57217873c5758c7bd0c6a82050cbbbea14445c43 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 4 Nov 2025 13:22:08 +0200
Subject: [PATCH v3 2/3] Fix bug where we truncated CLOG that was still needed
by LISTEN/NOTIFY
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The async notification queue contains the XID of the sender, and when
processing notifications we call TransactionIdDidCommit() on the
XID. But we had no safeguards to prevent the CLOG segments containing
those XIDs from being truncated away. As a result, if a backend didn't
for some reason process its notifications for a long time, or when a
new backend issued LISTEN, you could get an error like:
test=# listen c21;
ERROR: 58P01: could not access status of transaction 14279685
DETAIL: Could not open file "pg_xact/000D": No such file or directory.
LOCATION: SlruReportIOError, slru.c:1087
Note: This commit is not a full fix. A race condition remains, where a
backend is executing asyncQueueReadAllNotifications() and has just
made a local copy of an async SLRU page which contains old XIDs, while
vacuum concurrently truncates the CLOG covering those XIDs. When the
backend then calls TransactionIdDidCommit() on those XIDs from the
local copy, you still get the error. The next commit will fix that
remaining race condition.
This was first reported by Sergey Zhuravlev in 2021, with many other
people hitting the same issue later. Thanks to:
- Alexandra Wang, Daniil Davydov, Andrei Varashen and Jacques Combrink for
investigating and providing reproducable test cases,
- Matheus Alcantara and Arseniy Mukhin for earlier proposed patches to
fix this,
- Álvaro Herrera and Masahiko Sawada for reviewing said earlier patches,
- Yura Sokolov aka funny-falcon for the idea of marking transactions as
committed in the notification queue, and
- Joel Jacobson for the final patch version. I hope I didn't forget anyone.
Backpatch to all supported versions. I believe the bug goes back all
the way to commit d1e027221d, which introduced the SLRU-based async
notification queue.
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/[email protected]
Discussion: https://www.postgresql.org/message-id/cak98qz3wzle-rzjn_y%[email protected]
---
src/backend/commands/async.c | 114 ++++++++++++++++++
src/backend/commands/vacuum.c | 7 ++
src/include/commands/async.h | 3 +
src/test/isolation/expected/async-notify.out | 33 ++++-
src/test/isolation/specs/async-notify.spec | 24 ++++
src/test/modules/xid_wraparound/meson.build | 1 +
.../xid_wraparound/t/004_notify_freeze.pl | 75 ++++++++++++
7 files changed, 256 insertions(+), 1 deletion(-)
create mode 100644 src/test/modules/xid_wraparound/t/004_notify_freeze.pl
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6b844808ef3..5b81e34e340 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -2176,6 +2176,120 @@ asyncQueueAdvanceTail(void)
LWLockRelease(NotifyQueueTailLock);
}
+/*
+ * AsyncNotifyFreezeXids
+ *
+ * Prepare the async notification queue for CLOG truncation by freezing
+ * transaction IDs that are about to become inaccessible.
+ *
+ * This function is called by VACUUM before advancing datfrozenxid. It scans
+ * the notification queue and replaces XIDs that would become inaccessible
+ * after CLOG truncation with special markers:
+ * - Committed transactions are set to FrozenTransactionId
+ * - Aborted/crashed transactions are set to InvalidTransactionId
+ *
+ * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
+ * pages will be truncated. If XID < newFrozenXid, it cannot still be running
+ * (or it would have held back newFrozenXid through ProcArray).
+ * Therefore, if TransactionIdDidCommit returns false, we know the transaction
+ * either aborted explicitly or crashed, and we can safely mark it invalid.
+ */
+void
+AsyncNotifyFreezeXids(TransactionId newFrozenXid)
+{
+ QueuePosition pos;
+ QueuePosition head;
+ int64 curpage = -1;
+ int slotno = -1;
+ char *page_buffer = NULL;
+ bool page_dirty = false;
+
+ /*
+ * Acquire locks in the correct order to avoid deadlocks. As per the
+ * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
+ * bank locks.
+ *
+ * We only need SHARED mode since we're just reading the head/tail
+ * positions, not modifying them.
+ */
+ LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+ pos = QUEUE_TAIL;
+ head = QUEUE_HEAD;
+
+ /* Release NotifyQueueLock early, we only needed to read the positions */
+ LWLockRelease(NotifyQueueLock);
+
+ /*
+ * Scan the queue from tail to head, freezing XIDs as needed. We hold
+ * NotifyQueueTailLock throughout to ensure the tail doesn't move while
+ * we're working.
+ */
+ while (!QUEUE_POS_EQUAL(pos, head))
+ {
+ AsyncQueueEntry *qe;
+ TransactionId xid;
+ int64 pageno = QUEUE_POS_PAGE(pos);
+ int offset = QUEUE_POS_OFFSET(pos);
+
+ /* If we need a different page, release old lock and get new one */
+ if (pageno != curpage)
+ {
+ LWLock *lock;
+
+ /* Release previous page if any */
+ if (slotno >= 0)
+ {
+ if (page_dirty)
+ {
+ NotifyCtl->shared->page_dirty[slotno] = true;
+ page_dirty = false;
+ }
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ }
+
+ lock = SimpleLruGetBankLock(NotifyCtl, pageno);
+ LWLockAcquire(lock, LW_EXCLUSIVE);
+ slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+ InvalidTransactionId);
+ page_buffer = NotifyCtl->shared->page_buffer[slotno];
+ curpage = pageno;
+ }
+
+ qe = (AsyncQueueEntry *) (page_buffer + offset);
+ xid = qe->xid;
+
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, newFrozenXid))
+ {
+ if (TransactionIdDidCommit(xid))
+ {
+ qe->xid = FrozenTransactionId;
+ page_dirty = true;
+ }
+ else
+ {
+ qe->xid = InvalidTransactionId;
+ page_dirty = true;
+ }
+ }
+
+ /* Advance to next entry */
+ asyncQueueAdvance(&pos, qe->length);
+ }
+
+ /* Release final page lock if we acquired one */
+ if (slotno >= 0)
+ {
+ if (page_dirty)
+ NotifyCtl->shared->page_dirty[slotno] = true;
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ }
+
+ LWLockRelease(NotifyQueueTailLock);
+}
+
/*
* ProcessIncomingNotify
*
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..e785dd55ce5 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_database.h"
#include "catalog/pg_inherits.h"
+#include "commands/async.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/progress.h"
@@ -1941,6 +1942,12 @@ vac_truncate_clog(TransactionId frozenXID,
return;
}
+ /*
+ * Freeze any old transaction IDs in the async notification queue before
+ * CLOG truncation.
+ */
+ AsyncNotifyFreezeXids(frozenXID);
+
/*
* Advance the oldest value for commit timestamps before truncating, so
* that if a user requests a timestamp for a transaction we're truncating
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..aaec7314c10 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -46,4 +46,7 @@ extern void HandleNotifyInterrupt(void);
/* process interrupts */
extern void ProcessNotifyInterrupt(bool flush);
+/* freeze old transaction IDs in notify queue (called by VACUUM) */
+extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid);
+
#endif /* ASYNC_H */
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..20d5763f319 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 4 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -104,6 +104,37 @@ step l2commit: COMMIT;
listener2: NOTIFY "c1" with payload "" from notifier
step l2stop: UNLISTEN *;
+starting permutation: llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+step llisten: LISTEN c1; LISTEN c2;
+step n1begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n1select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n1insert: INSERT INTO t1 DEFAULT VALUES;
+step notify1: NOTIFY c1;
+step n2begins: BEGIN ISOLATION LEVEL SERIALIZABLE;
+step n2select: SELECT * FROM t1;
+a
+-
+(0 rows)
+
+step n2insert: INSERT INTO t1 DEFAULT VALUES;
+step n2notify1: NOTIFY c1, 'n2_payload';
+step n1commit: COMMIT;
+step n2commit: COMMIT;
+ERROR: could not serialize access due to read/write dependencies among transactions
+step notify1: NOTIFY c1;
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener: NOTIFY "c1" with payload "" from notifier
+listener: NOTIFY "c1" with payload "" from notifier
+
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..51b7ad43849 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -5,6 +5,10 @@
# Note we assume that each step is delivered to the backend as a single Query
# message so it will run as one transaction.
+# t1 table is used for serializable conflict
+setup { CREATE TABLE t1 (a bigserial); }
+teardown { DROP TABLE t1; }
+
session notifier
step listenc { LISTEN c1; LISTEN c2; }
step notify1 { NOTIFY c1; }
@@ -33,8 +37,21 @@ step notifys1 {
}
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
+step n1begins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n1select { SELECT * FROM t1; }
+step n1insert { INSERT INTO t1 DEFAULT VALUES; }
+step n1commit { COMMIT; }
teardown { UNLISTEN *; }
+# notifier2 session is used to reproduce serializable conflict with notifier
+
+session notifier2
+step n2begins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step n2select { SELECT * FROM t1; }
+step n2insert { INSERT INTO t1 DEFAULT VALUES; }
+step n2commit { COMMIT; }
+step n2notify1 { NOTIFY c1, 'n2_payload'; }
+
# The listener session is used for cross-backend notify checks.
session listener
@@ -73,6 +90,13 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
# and notify queue is not empty
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+# Test checks that listeners ignore notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+permutation llisten n1begins n1select n1insert notify1 n2begins n2select n2insert n2notify1 n1commit n2commit notify1 lcheck
+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction. We have to
diff --git a/src/test/modules/xid_wraparound/meson.build b/src/test/modules/xid_wraparound/meson.build
index f7dada67f67..3aec430df8c 100644
--- a/src/test/modules/xid_wraparound/meson.build
+++ b/src/test/modules/xid_wraparound/meson.build
@@ -30,6 +30,7 @@ tests += {
't/001_emergency_vacuum.pl',
't/002_limits.pl',
't/003_wraparounds.pl',
+ 't/004_notify_freeze.pl',
],
},
}
diff --git a/src/test/modules/xid_wraparound/t/004_notify_freeze.pl b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
new file mode 100644
index 00000000000..e0386afe26a
--- /dev/null
+++ b/src/test/modules/xid_wraparound/t/004_notify_freeze.pl
@@ -0,0 +1,75 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test freezing XIDs in the async notification queue. This isn't
+# really wraparound-related, but depends on the consume_xids() helper
+# function.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+if (!$ENV{PG_TEST_EXTRA} || $ENV{PG_TEST_EXTRA} !~ /\bxid_wraparound\b/)
+{
+ plan skip_all => "test xid_wraparound not enabled in PG_TEST_EXTRA";
+}
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+ 'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# Send multiple notify's from other sessions
+for my $i (1 .. 10)
+{
+ $node->safe_psql(
+ 'postgres', "
+ BEGIN;
+ NOTIFY s, '$i';
+ COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation, and one more with
+# 'txid_current' to bump up the freeze horizon.
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+$node->safe_psql('postgres', 'select txid_current()');
+
+# Remember current datfrozenxid before vacuum freeze so that we can
+# check that it is advanced. (Taking the min() this way assumes that
+# XID wraparound doesn't happen.)
+my $datafronzenxid = $node->safe_psql('postgres',
+ "select min(datfrozenxid::text::bigint) from pg_database");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+ "vacuumdb --all --freeze");
+
+# Check that vacuumdb advanced datfrozenxid
+my $datafronzenxid_freeze = $node->safe_psql('postgres',
+ "select min(datfrozenxid::text::bigint) from pg_database");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid advanced');
+
+# On Session 1, commit and ensure that the all the notifications are
+# received. This depends on correctly freezing the XIDs in the pending
+# notification entries.
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+ $notifications_count++;
+ like($i,
+ qr/Asynchronous notification "s" with payload "$notifications_count" received/
+ );
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
--
2.47.3
From 4403f2f2119fb3caed9e4daf26785aed221924f0 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 4 Nov 2025 13:24:30 +0200
Subject: [PATCH v3 3/3] Fix remaining race condition with CLOG truncation and
LISTEN/NOTIFY
Previous commit fixed a bug where VACUUM can truncate the CLOG that's
still needed to check the commit status of XIDs in the async notify
queue, but as mentioned in the commit message, it wasn't a full
fix. If a backend is executing asyncQueueReadAllNotifications() and
has just made a local copy of an async SLRU page which contains old
XIDs, vacuum can concurrently truncate the CLOG covering those XIDs,
and we will still get an error when backend then calls
TransactionIdDidCommit() on those XIDs in the local copy. This commit
fixes that race condition.
To fix, hold the SLRU bank lock across the TransactionIdDidCommit()
calls in NOTIFY processing.
Per Tom Lane's idea. Backpatch to all supported versions.
Reviewed-by: Joel Jacobson
Discussion: https://www.postgresql.org/message-id/[email protected]
---
src/backend/commands/async.c | 109 +++++++++++++++--------------------
1 file changed, 48 insertions(+), 61 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 5b81e34e340..3a6765a55fa 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -448,7 +448,6 @@ static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
- char *page_buffer,
Snapshot snapshot);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
@@ -1854,13 +1853,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition head;
Snapshot snapshot;
- /* page_buffer must be adequately aligned, so use a union */
- union
- {
- char buf[QUEUE_PAGESIZE];
- AsyncQueueEntry align;
- } page_buffer;
-
/* Fetch current state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
@@ -1941,37 +1933,6 @@ asyncQueueReadAllNotifications(void)
do
{
- int64 curpage = QUEUE_POS_PAGE(pos);
- int curoffset = QUEUE_POS_OFFSET(pos);
- int slotno;
- int copysize;
-
- /*
- * We copy the data from SLRU into a local buffer, so as to avoid
- * holding the SLRU lock while we are examining the entries and
- * possibly transmitting them to our frontend. Copy only the part
- * of the page we will actually inspect.
- */
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
- InvalidTransactionId);
- if (curpage == QUEUE_POS_PAGE(head))
- {
- /* we only want to read as far as head */
- copysize = QUEUE_POS_OFFSET(head) - curoffset;
- if (copysize < 0)
- copysize = 0; /* just for safety */
- }
- else
- {
- /* fetch all the rest of the page */
- copysize = QUEUE_PAGESIZE - curoffset;
- }
- memcpy(page_buffer.buf + curoffset,
- NotifyCtl->shared->page_buffer[slotno] + curoffset,
- copysize);
- /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
- LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
-
/*
* Process messages up to the stop position, end of page, or an
* uncommitted message.
@@ -1987,9 +1948,7 @@ asyncQueueReadAllNotifications(void)
* rewrite pages under us. Especially we don't want to hold a lock
* while sending the notifications to the frontend.
*/
- reachedStop = asyncQueueProcessPageEntries(&pos, head,
- page_buffer.buf,
- snapshot);
+ reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
} while (!reachedStop);
/* Update shared state */
@@ -2008,13 +1967,6 @@ asyncQueueReadAllNotifications(void)
* Fetch notifications from the shared queue, beginning at position current,
* and deliver relevant ones to my frontend.
*
- * The current page must have been fetched into page_buffer from shared
- * memory. (We could access the page right in shared memory, but that
- * would imply holding the SLRU bank lock throughout this routine.)
- *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
- *
* The function returns true once we have reached the stop position or an
* uncommitted notification, and false if we have finished with the page.
* In other words: once it returns true there is no need to look further.
@@ -2023,16 +1975,34 @@ asyncQueueReadAllNotifications(void)
static bool
asyncQueueProcessPageEntries(QueuePosition *current,
QueuePosition stop,
- char *page_buffer,
Snapshot snapshot)
{
+ int64 curpage = QUEUE_POS_PAGE(*current);
+ int slotno;
+ char *page_buffer;
bool reachedStop = false;
bool reachedEndOfPage;
- AsyncQueueEntry *qe;
+
+ /*
+ * We copy the entries into a local buffer to avoid holding the SLRU lock
+ * while we transmit them to our frontend. The local buffer must be
+ * adequately aligned, so use a union.
+ */
+ union
+ {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } local_buf;
+ char *local_buf_end = local_buf.buf;
+
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+ InvalidTransactionId);
+ page_buffer = NotifyCtl->shared->page_buffer[slotno];
do
{
QueuePosition thisentry = *current;
+ AsyncQueueEntry *qe;
if (QUEUE_POS_EQUAL(thisentry, stop))
break;
@@ -2076,16 +2046,8 @@ asyncQueueProcessPageEntries(QueuePosition *current,
}
else if (TransactionIdDidCommit(qe->xid))
{
- /* qe->data is the null-terminated channel name */
- char *channel = qe->data;
-
- if (IsListeningOn(channel))
- {
- /* payload follows channel name */
- char *payload = qe->data + strlen(channel) + 1;
-
- NotifyMyFrontEnd(channel, payload, qe->srcPid);
- }
+ memcpy(local_buf_end, qe, qe->length);
+ local_buf_end += qe->length;
}
else
{
@@ -2099,6 +2061,31 @@ asyncQueueProcessPageEntries(QueuePosition *current,
/* Loop back if we're not at end of page */
} while (!reachedEndOfPage);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+
+ /*
+ * Now that we have let go of the SLRU bank lock, send the notifications
+ * to our backend
+ */
+ Assert(local_buf_end - local_buf.buf <= BLCKSZ);
+ for (char *p = local_buf.buf; p < local_buf_end;)
+ {
+ AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
+ /* qe->data is the null-terminated channel name */
+ char *channel = qe->data;
+
+ if (IsListeningOn(channel))
+ {
+ /* payload follows channel name */
+ char *payload = qe->data + strlen(channel) + 1;
+
+ NotifyMyFrontEnd(channel, payload, qe->srcPid);
+ }
+
+ p += qe->length;
+ }
+
if (QUEUE_POS_EQUAL(*current, stop))
reachedStop = true;
--
2.47.3