From b8db6014fa8cbf9c095e7149c0061ea4215a9ecc Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Sun, 7 Sep 2025 16:55:57 -0700
Subject: [PATCH] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
- Pin WAL files if they contain unread notifications
- Ensure space in queue before entering critical section
- Added TAP test to ensure error if queue is full
- Added TAP test to ensure WAL records necessary for notifications are pinned
- Modified existing tests to not expect zeroing of new notify pages for a few notifications
- Allow notifications to be consumed on standby
---
 src/backend/access/rmgrdesc/Makefile          |    1 +
 src/backend/access/rmgrdesc/asyncdesc.c       |   47 +
 src/backend/access/rmgrdesc/meson.build       |    1 +
 src/backend/access/rmgrdesc/xactdesc.c        |   13 +
 src/backend/access/transam/rmgr.c             |    1 +
 src/backend/access/transam/xact.c             |   52 +-
 src/backend/access/transam/xlog.c             |   44 +-
 src/backend/commands/async.c                  | 1207 +++++++++++------
 src/backend/tcop/utility.c                    |   17 +-
 src/bin/pg_rewind/parsexlog.c                 |    1 +
 src/bin/pg_waldump/rmgrdesc.c                 |    2 +
 src/bin/pg_waldump/t/001_basic.pl             |    3 +-
 src/include/access/async_xlog.h               |   43 +
 src/include/access/rmgrlist.h                 |    1 +
 src/include/access/xact.h                     |    9 +
 src/include/commands/async.h                  |   30 +
 src/include/storage/proc.h                    |    3 +
 src/test/isolation/expected/async-notify.out  |    6 +-
 src/test/isolation/expected/stats.out         |   22 +-
 src/test/isolation/expected/stats_1.out       |   22 +-
 src/test/isolation/specs/async-notify.spec    |    2 +-
 src/test/modules/test_listen_notify/Makefile  |   17 +
 .../modules/test_listen_notify/meson.build    |   13 +
 .../test_listen_notify/t/002_queue_full.pl    |   78 ++
 .../test_listen_notify/t/003_wal_pin_test.pl  |  102 ++
 25 files changed, 1254 insertions(+), 483 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c
 create mode 100644 src/include/access/async_xlog.h
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/002_queue_full.pl
 create mode 100644 src/test/modules/test_listen_notify/t/003_wal_pin_test.pl

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..7f322849ff1
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index f0f696855b9..4f32f7fc591 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		xl_xact_notify xl_notify;
+
+		/* no alignment is guaranteed, so copy onto stack */
+		memcpy(&xl_notify, data, sizeof(xl_notify));
+
+		parsed->notify_lsn = xl_notify.notify_lsn;
+
+		data += sizeof(xl_xact_notify);
+	}
+
 }
 
 void
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2cf3d4e92b7..8296b63c731 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1448,6 +1448,17 @@ RecordTransactionCommit(void)
 		 */
 		pg_write_barrier();
 
+		/*
+		 * Handle notification commit ordering: if this transaction has pending
+		 * notifications, we must write the queue entry just before the commit
+		 * record while holding NotifyQueueLock to ensure proper ordering.
+		 */
+		if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn))
+		{
+			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+			asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyDataLsn);
+		}
+
 		/*
 		 * Insert the commit XLOG record.
 		 */
@@ -5841,7 +5852,9 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_notify xl_notify;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
@@ -5926,6 +5939,21 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	/* include notification information if present */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn))
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY;
+		xl_notify.notify_lsn = MyProc->notifyDataLsn;
+
+		/* Ensure dbId is present for NOTIFY delivery on standby */
+		if ((xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) == 0)
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+			xl_dbinfo.dbId = MyDatabaseId;
+			xl_dbinfo.tsId = MyDatabaseTableSpace;
+		}
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
@@ -5982,10 +6010,25 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData(&xl_origin, sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY)
+		XLogRegisterData(&xl_notify, sizeof(xl_xact_notify));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/* Insert the commit record */
+	result = XLogInsert(RM_XACT_ID, info);
+
+	/*
+	 * Release NotifyQueueLock if we held it. The queue entry is now
+	 * associated with a committed transaction, so readers can process it.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn))
+	{
+		LWLockRelease(NotifyQueueLock);
+	}
+
+	return result;
 }
 
 /*
@@ -6227,6 +6270,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 						   false /* backward */ , false /* WAL */ );
 	}
 
+	/* Add notification queue entry and wake listeners if commit has notifications */
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn);
+		SignalBackendsForDatabase(parsed->dbId);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fd91bcd68ec..dafdb585157 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -54,6 +54,7 @@
 #include "access/subtrans.h"
 #include "access/timeline.h"
 #include "access/transam.h"
+#include "commands/async.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
@@ -3881,7 +3882,7 @@ RemoveTempXlogFiles(void)
  */
 static void
 RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr,
-				   TimeLineID insertTLI)
+		TimeLineID insertTLI)
 {
 	DIR		   *xldir;
 	struct dirent *xlde;
@@ -3903,6 +3904,47 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr,
 	elog(DEBUG2, "attempting to remove WAL segments older than log file %s",
 		 lastoff);
 
+	/*
+	 * Pin WAL needed by NOTIFY delivery: adjust segno so that we do not
+	 * remove segments older than the one containing the oldest NOTIFY entry
+	 * still present in the queue. This prevents recycling WAL that listeners
+	 * may still need to read NOTIFY payloads from.
+	 */
+	{
+		XLogRecPtr notify_oldest;
+		if (AsyncNotifyOldestRequiredLSN(&notify_oldest))
+		{
+			XLogSegNo notifySegNo;
+			/* Segment containing the oldest required LSN */
+			XLByteToSeg(notify_oldest, notifySegNo, wal_segment_size);
+			if (Trace_notify)
+				elog(DEBUG1, "async notify: checking WAL pin; oldest notify LSN %X/%X (seg %lu)",
+					 LSN_FORMAT_ARGS(notify_oldest), (unsigned long) notifySegNo);
+			/*
+			 * Last removable must be strictly before notifySegNo. If
+			 * notifySegNo == 0, there is no valid "previous" segment, so do
+			 * not reduce segno at all in that case.
+			 */
+			if (notifySegNo > 0)
+			{
+				XLogSegNo cutoff = notifySegNo - 1;
+				if (cutoff < segno)
+				{
+					segno = cutoff;
+					if (Trace_notify)
+					{
+						XLogFileName(lastoff, 0, segno, wal_segment_size);
+						elog(DEBUG1, "async notify: WAL recycle cutoff adjusted to segno %lu (lastoff %s)",
+							 (unsigned long) segno, lastoff);
+					}
+				}
+			}
+		}
+	}
+
+	/* Recompute cutoff filename after any segno adjustment above */
+	XLogFileName(lastoff, 0, segno, wal_segment_size);
+
 	xldir = AllocateDir(XLOGDIR);
 
 	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..375984c95d4 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,14 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
+#include "access/xlog_internal.h"
+#include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -143,6 +151,8 @@
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/procsignal.h"
+#include "storage/procarray.h"
+#include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
@@ -150,6 +160,31 @@
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "storage/fd.h"
+#include <unistd.h>
+
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
 
 
 /*
@@ -163,66 +198,51 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * AsyncQueueEntry is defined in commands/async.h as a compact metadata-only
+ * structure; notification content is stored in WAL.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
+
+/* Queue alignment is still needed for SLRU page management */
 #define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/*
+ * QueuePosition is a scalar entry index. Derive page and byte offset from the
+ * index using the fixed AsyncQueueEntry size.
+ */
+typedef int64 QueuePosition;
 
+
+#define ASYNC_ENTRY_SIZE		((int) sizeof(AsyncQueueEntry))
+#define ASYNC_ENTRIES_PER_PAGE	(BLCKSZ / ASYNC_ENTRY_SIZE)
 /*
- * Struct describing a queue position, and assorted macros for working with it
+ * One SLRU page must contain an integral number of compact entries. That is
+ * required for the index→page/offset mapping below (division/modulo by the
+ * per-page entry count) and for unambiguous page-boundary detection.
+ *
+ * AsyncQueueEntry is currently 16 bytes (Oid 4 + TransactionId 4 + XLogRecPtr
+ * 8) with natural alignment and no padding. BLCKSZ (QUEUE_PAGESIZE) is always
+ * a multiple of 1024, so this assertion holds for standard builds. If the
+ * entry layout changes in the future, this compile-time check ensures we fail
+ * early rather than producing incorrect indexing math at runtime.
  */
-typedef struct QueuePosition
-{
-	int64		page;			/* SLRU page number */
-	int			offset;			/* byte offset within page */
-} QueuePosition;
+StaticAssertDecl(BLCKSZ % sizeof(AsyncQueueEntry) == 0,
+				 "AsyncQueueEntry size must divide QUEUE_PAGESIZE");
 
-#define QUEUE_POS_PAGE(x)		((x).page)
-#define QUEUE_POS_OFFSET(x)		((x).offset)
+#define QUEUE_POS_PAGE(x)		((x) / ASYNC_ENTRIES_PER_PAGE)
+#define QUEUE_POS_OFFSET(x)		((int)(((x) % ASYNC_ENTRIES_PER_PAGE) * ASYNC_ENTRY_SIZE))
 
 #define SET_QUEUE_POS(x,y,z) \
 	do { \
-		(x).page = (y); \
-		(x).offset = (z); \
+		(x) = ((int64) (y)) * ASYNC_ENTRIES_PER_PAGE + ((z) / ASYNC_ENTRY_SIZE); \
 	} while (0)
 
-#define QUEUE_POS_EQUAL(x,y) \
-	((x).page == (y).page && (x).offset == (y).offset)
+#define QUEUE_POS_EQUAL(x,y)		((x) == (y))
 
-#define QUEUE_POS_IS_ZERO(x) \
-	((x).page == 0 && (x).offset == 0)
+#define QUEUE_POS_IS_ZERO(x)		((x) == 0)
 
-/* choose logically smaller QueuePosition */
-#define QUEUE_POS_MIN(x,y) \
-	(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
-	 (x).page != (y).page ? (y) : \
-	 (x).offset < (y).offset ? (x) : (y))
-
-/* choose logically larger QueuePosition */
-#define QUEUE_POS_MAX(x,y) \
-	(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
-	 (x).page != (y).page ? (x) : \
-	 (x).offset > (y).offset ? (x) : (y))
+/* choose logically smaller/larger positions */
+#define QUEUE_POS_MIN(x,y)		((x) <= (y) ? (x) : (y))
+#define QUEUE_POS_MAX(x,y)		((x) >= (y) ? (x) : (y))
 
 /*
  * Parameter determining how often we try to advance the tail pointer:
@@ -285,6 +305,7 @@ typedef struct AsyncQueueControl
 								 * listening backend */
 	int64		stopPage;		/* oldest unrecycled page; must be <=
 								 * tail.page */
+	int64		reservedEntries;	/* number of entries reserved pre-commit */
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
@@ -420,6 +441,8 @@ static bool amRegisteredListener = false;
 
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
 static bool tryAdvanceTail = false;
+/* true if this backend reserved one compact entry pre-commit */
+static bool notifyEntryReserved = false;
 
 /* GUC parameters */
 bool		Trace_notify = false;
@@ -438,18 +461,13 @@ static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
-static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
-static void asyncQueueFillWarning(void);
 static void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
-										 Snapshot snapshot);
+										 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -457,6 +475,66 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
+/* prototype provided in commands/async.h */
+
+/*
+ * Per-page committed minimum notify LSNs. Indexed by page_no % max_notify_queue_pages
+ * and tagged with the exact page_no to avoid modulo aliasing.
+ */
+typedef struct NotifyPageMinEntry
+{
+	int64	   page_no;	/* absolute queue page number or -1 if invalid */
+	XLogRecPtr  min_lsn;	/* minimum notify_data_lsn for committed entries on page */
+} NotifyPageMinEntry;
+
+static NotifyPageMinEntry *NotifyPageMins = NULL; /* shmem array of length max_notify_queue_pages */
+
+
+/* Helpers to update per-page mins; caller must hold NotifyQueueLock. */
+static inline void
+NotifyPageMinUpdateForPage(int64 page_no, XLogRecPtr lsn)
+{
+	int idx;
+	NotifyPageMinEntry *e;
+
+	if (NotifyPageMins == NULL || page_no < 0)
+		return;
+
+	idx = (int) (page_no % max_notify_queue_pages);
+	e = &NotifyPageMins[idx];
+	if (e->page_no != page_no)
+	{
+		e->page_no = page_no;
+		e->min_lsn = lsn;
+	}
+	else
+	{
+		if (XLogRecPtrIsInvalid(e->min_lsn) || (lsn < e->min_lsn))
+			e->min_lsn = lsn;
+	}
+}
+
+/* Invalidate [from_page, to_page) entries; caller must hold NotifyQueueLock. */
+static inline void
+NotifyPageMinInvalidateRange(int64 from_page, int64 to_page)
+{
+	int64 p;
+	int idx;
+
+	if (NotifyPageMins == NULL)
+		return;
+	for (p = from_page; p < to_page; p++)
+	{
+		idx = (int) (p % max_notify_queue_pages);
+		if (NotifyPageMins[idx].page_no == p)
+		{
+			NotifyPageMins[idx].page_no = -1;
+			NotifyPageMins[idx].min_lsn = InvalidXLogRecPtr;
+		}
+	}
+}
+
 
 /*
  * Compute the difference between two queue page numbers.
@@ -492,6 +570,9 @@ AsyncShmemSize(void)
 
 	size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
 
+	/* Per-page committed mins */
+	size = add_size(size, mul_size(max_notify_queue_pages, sizeof(NotifyPageMinEntry)));
+
 	return size;
 }
 
@@ -519,6 +600,7 @@ AsyncShmemInit(void)
 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
 		QUEUE_STOP_PAGE = 0;
+		asyncQueueControl->reservedEntries = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
 		for (int i = 0; i < MaxBackends; i++)
@@ -546,6 +628,24 @@ AsyncShmemInit(void)
 		 */
 		(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
 	}
+
+	/* Allocate/attach per-page committed mins array */
+	{
+		bool found2 = false;
+		NotifyPageMins = (NotifyPageMinEntry *)
+			ShmemInitStruct("Notify Per-Page Min Array",
+							sizeof(NotifyPageMinEntry) * (Size) max_notify_queue_pages,
+							&found2);
+		if (!found2)
+		{
+			for (int i = 0; i < max_notify_queue_pages; i++)
+			{
+				NotifyPageMins[i].page_no = -1;
+				NotifyPageMins[i].min_lsn = InvalidXLogRecPtr;
+			}
+		}
+	}
+
 }
 
 
@@ -890,65 +990,115 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Reserve space in the in-memory queue for the compact entry.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		{
+			QueuePosition reserved_head = QUEUE_HEAD + asyncQueueControl->reservedEntries;
+			int64 headPage = QUEUE_POS_PAGE(reserved_head);
+			int   headSlot = (int) (reserved_head % ASYNC_ENTRIES_PER_PAGE);
+			int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+			LWLock *nextbank;
+
+			/* If at last slot, ensure advancing to next page is allowed */
+			if (headSlot == ASYNC_ENTRIES_PER_PAGE - 1)
+			{
+				if (asyncQueuePageDiff(headPage + 1, tailPage) >= max_notify_queue_pages)
+				{
+					LWLockRelease(NotifyQueueLock);
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+							 errmsg("could not queue notification before commit"),
+							 errdetail("asynchronous notification queue is full")));
+				}
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+				/* Pre-initialize the next page so commit path doesn't fault it in */
+				nextbank = SimpleLruGetBankLock(NotifyCtl, headPage + 1);
+				LWLockAcquire(nextbank, LW_EXCLUSIVE);
+				(void) SimpleLruZeroPage(NotifyCtl, headPage + 1);
+				LWLockRelease(nextbank);
+			}
+
+			/* Reserve one entry */
+			asyncQueueControl->reservedEntries++;
+			notifyEntryReserved = true;
+		}
+		LWLockRelease(NotifyQueueLock);
+
+		/*
+		 * Step 2: Write notification data to WAL.
+		 */
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
+
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
+		}
+
+		/*
+		 * Conservatively pre-pin before WAL insert. There is a small window between
+		 * the notify wal data being written, and the actual notify commit lsn being assigned
+		 * to MyProc, where the recycler could remove the notify wal record, since it wouldn't
+		 * be considered while trying to calculate the min. Assign the current LSN, which
+		 * we know is <= notify_lsn.
+		 */
+		MyProc->notifyDataLsn = GetXLogInsertRecPtr();
+
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/* Publish the uncommitted notify lsn for the proc */
+		MyProc->notifyDataLsn = notify_lsn;
+
+		/* Notification payloads are now read directly from WAL at delivery time. */
 	}
 }
 
@@ -1006,13 +1156,20 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * If we had notifications, they were already written to the queue in
+	 * PreCommit_Notify. After commit, signal listening backends to check the
+	 * queue. The transaction visibility logic will see our
+	 * XID as committed and process the notifications.
 	 */
-	if (pendingNotifies != NULL)
+	if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn))
+	{
+		/* Signal listening backends to check the queue */
 		SignalBackends();
 
+		/* Clear the flag after signaling */
+		MyProc->notifyDataLsn = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
 	 *
@@ -1030,6 +1187,9 @@ AtCommit_Notify(void)
 
 	/* And clean up */
 	ClearPendingActionsAndNotifies();
+
+	/* Reset local reservation flag if set (reservation consumed at commit). */
+	notifyEntryReserved = false;
 }
 
 /*
@@ -1263,21 +1423,6 @@ asyncQueueUnregister(void)
 	amRegisteredListener = false;
 }
 
-/*
- * Test whether there is room to insert more notification messages.
- *
- * Caller must hold at least shared NotifyQueueLock.
- */
-static bool
-asyncQueueIsFull(void)
-{
-	int64		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int64		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
-	int64		occupied = headPage - tailPage;
-
-	return occupied >= max_notify_queue_pages;
-}
-
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
  * entry is of length entryLength.  If we jump to a new page the function
@@ -1286,193 +1431,17 @@ asyncQueueIsFull(void)
 static bool
 asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 {
-	int64		pageno = QUEUE_POS_PAGE(*position);
-	int			offset = QUEUE_POS_OFFSET(*position);
-	bool		pageJump = false;
-
-	/*
-	 * Move to the next writing position: First jump over what we have just
-	 * written or read.
-	 */
-	offset += entryLength;
-	Assert(offset <= QUEUE_PAGESIZE);
-
-	/*
-	 * In a second step check if another entry can possibly be written to the
-	 * page. If so, stay here, we have reached the next position. If not, then
-	 * we need to move on to the next page.
-	 */
-	if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
-	{
-		pageno++;
-		offset = 0;
-		pageJump = true;
-	}
-
-	SET_QUEUE_POS(*position, pageno, offset);
+	int64 idx;
+	bool pageJump;
+
+	/* With fixed-size entries, advancing is just +1 entry. */
+	Assert(entryLength == (int) sizeof(AsyncQueueEntry));
+	idx = *position;
+	pageJump = ((idx % ASYNC_ENTRIES_PER_PAGE) == (ASYNC_ENTRIES_PER_PAGE - 1));
+	*position = idx + 1;
 	return pageJump;
 }
 
-/*
- * Fill the AsyncQueueEntry at *qe with an outbound notification message.
- */
-static void
-asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
-{
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
-}
-
-/*
- * Add pending notifications to the queue.
- *
- * We go page by page here, i.e. we stop once we have to go to a new page but
- * we will be called again and then fill that next page. If an entry does not
- * fit into the current page, we write a dummy entry with an InvalidOid as the
- * database OID in order to fill the page. So every page is always used up to
- * the last byte which simplifies reading the page later.
- *
- * We are passed the list cell (in pendingNotifies->events) containing the next
- * notification to write and return the first still-unwritten cell back.
- * Eventually we will return NULL indicating all is done.
- *
- * We are holding NotifyQueueLock already from the caller and grab
- * page specific SLRU bank lock locally in this function.
- */
-static ListCell *
-asyncQueueAddEntries(ListCell *nextNotify)
-{
-	AsyncQueueEntry qe;
-	QueuePosition queue_head;
-	int64		pageno;
-	int			offset;
-	int			slotno;
-	LWLock	   *prevlock;
-
-	/*
-	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
-	 * memory upon exiting.  The reason for this is that if we have to advance
-	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
-	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
-	 * subsequent insertions would try to put entries into a page that slru.c
-	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
-	 * that if we do fail, any already-inserted queue entries are forgotten;
-	 * this is okay, since they'd be useless anyway after our transaction
-	 * rolls back.
-	 */
-	queue_head = QUEUE_HEAD;
-
-	/*
-	 * If this is the first write since the postmaster started, we need to
-	 * initialize the first page of the async SLRU.  Otherwise, the current
-	 * page should be initialized already, so just fetch it.
-	 */
-	pageno = QUEUE_POS_PAGE(queue_head);
-	prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
-
-	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
-	LWLockAcquire(prevlock, LW_EXCLUSIVE);
-
-	if (QUEUE_POS_IS_ZERO(queue_head))
-		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
-	else
-		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
-								   InvalidTransactionId);
-
-	/* Note we mark the page dirty before writing in it */
-	NotifyCtl->shared->page_dirty[slotno] = true;
-
-	while (nextNotify != NULL)
-	{
-		Notification *n = (Notification *) lfirst(nextNotify);
-
-		/* Construct a valid queue entry in local variable qe */
-		asyncQueueNotificationToEntry(n, &qe);
-
-		offset = QUEUE_POS_OFFSET(queue_head);
-
-		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
-		{
-			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies->events, nextNotify);
-		}
-		else
-		{
-			/*
-			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
-			 * OID, they will ignore this entry and move on.
-			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
-		}
-
-		/* Now copy qe into the shared buffer page */
-		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
-			   &qe,
-			   qe.length);
-
-		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
-		{
-			LWLock	   *lock;
-
-			pageno = QUEUE_POS_PAGE(queue_head);
-			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
-			if (lock != prevlock)
-			{
-				LWLockRelease(prevlock);
-				LWLockAcquire(lock, LW_EXCLUSIVE);
-				prevlock = lock;
-			}
-
-			/*
-			 * Page is full, so we're done here, but first fill the next page
-			 * with zeroes.  The reason to do this is to ensure that slru.c's
-			 * idea of the head page is always the same as ours, which avoids
-			 * boundary problems in SimpleLruTruncate.  The test in
-			 * asyncQueueIsFull() ensured that there is room to create this
-			 * page without overrunning the queue.
-			 */
-			slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
-
-			/*
-			 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
-			 * set flag to remember that we should try to advance the tail
-			 * pointer (we don't want to actually do that right here).
-			 */
-			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-				tryAdvanceTail = true;
-
-			/* And exit the loop */
-			break;
-		}
-	}
-
-	/* Success, so update the global QUEUE_HEAD */
-	QUEUE_HEAD = queue_head;
-
-	LWLockRelease(prevlock);
-
-	return nextNotify;
-}
-
 /*
  * SQL function to return the fraction of the notification queue currently
  * occupied.
@@ -1515,52 +1484,6 @@ asyncQueueUsage(void)
 	return (double) occupied / (double) max_notify_queue_pages;
 }
 
-/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-asyncQueueFillWarning(void)
-{
-	double		fillDegree;
-	TimestampTz t;
-
-	fillDegree = asyncQueueUsage();
-	if (fillDegree < 0.5)
-		return;
-
-	t = GetCurrentTimestamp();
-
-	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
-								   t, QUEUE_FULL_WARN_INTERVAL))
-	{
-		QueuePosition min = QUEUE_HEAD;
-		int32		minPid = InvalidPid;
-
-		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-		{
-			Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-			if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
-				minPid = QUEUE_BACKEND_PID(i);
-		}
-
-		ereport(WARNING,
-				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
-				 (minPid != InvalidPid ?
-				  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
-				  : 0),
-				 (minPid != InvalidPid ?
-				  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
-				  : 0)));
-
-		asyncQueueControl->lastQueueFillWarn = t;
-	}
-}
 
 /*
  * Send signals to listening backends.
@@ -1659,6 +1582,65 @@ SignalBackends(void)
 	pfree(procnos);
 }
 
+/*
+ * SignalBackendsForDatabase
+ *
+ * Wake listeners that are registered for the specified database OID.
+ * Intended for use by the startup/redo process when replaying a commit
+ * that enqueued NOTIFY entries for that database.
+ */
+void
+SignalBackendsForDatabase(Oid dboid)
+{
+	int32		*pids;
+	ProcNumber	*procnos;
+	int			count;
+
+	/* Build a list of target PIDs for listeners in dboid */
+	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
+	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+	count = 0;
+
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	{
+		int32		pid = QUEUE_BACKEND_PID(i);
+		QueuePosition pos;
+
+		Assert(pid != InvalidPid);
+		if (QUEUE_BACKEND_DBOID(i) != dboid)
+			continue; /* only same DB */
+
+		pos = QUEUE_BACKEND_POS(i);
+		/* Skip if already caught up */
+		if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+			continue;
+
+		pids[count] = pid;
+		procnos[count] = i;
+		count++;
+	}
+	LWLockRelease(NotifyQueueLock);
+
+	/* Now send signals */
+	for (int i = 0; i < count; i++)
+	{
+		int32		pid = pids[i];
+
+		if (pid == MyProcPid)
+		{
+			notifyInterruptPending = true;
+			continue;
+		}
+
+		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+	}
+
+	pfree(pids);
+	pfree(procnos);
+}
+
 /*
  * AtAbort_Notify
  *
@@ -1678,6 +1660,20 @@ AtAbort_Notify(void)
 	if (amRegisteredListener && listenChannels == NIL)
 		asyncQueueUnregister();
 
+	/* Release any reserved queue entry */
+	if (notifyEntryReserved)
+	{
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		if (asyncQueueControl->reservedEntries > 0)
+			asyncQueueControl->reservedEntries--;
+		LWLockRelease(NotifyQueueLock);
+		notifyEntryReserved = false;
+	}
+
+	/* Clear per-backend NOTIFY pin (no lock needed) */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyDataLsn))
+		MyProc->notifyDataLsn = InvalidXLogRecPtr;
+
 	/* And clean up */
 	ClearPendingActionsAndNotifies();
 }
@@ -1844,15 +1840,13 @@ ProcessNotifyInterrupt(bool flush)
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
- * ones to my frontend.  Stop when we reach queue head or an uncommitted
- * notification.
+ * ones to my frontend.  Stop when we reach queue head.
  */
 static void
 asyncQueueReadAllNotifications(void)
 {
 	volatile QueuePosition pos;
 	QueuePosition head;
-	Snapshot	snapshot;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1875,46 +1869,6 @@ asyncQueueReadAllNotifications(void)
 		return;
 	}
 
-	/*----------
-	 * Get snapshot we'll use to decide which xacts are still in progress.
-	 * This is trickier than it might seem, because of race conditions.
-	 * Consider the following example:
-	 *
-	 * Backend 1:					 Backend 2:
-	 *
-	 * transaction starts
-	 * UPDATE foo SET ...;
-	 * NOTIFY foo;
-	 * commit starts
-	 * queue the notify message
-	 *								 transaction starts
-	 *								 LISTEN foo;  -- first LISTEN in session
-	 *								 SELECT * FROM foo WHERE ...;
-	 * commit to clog
-	 *								 commit starts
-	 *								 add backend 2 to array of listeners
-	 *								 advance to queue head (this code)
-	 *								 commit to clog
-	 *
-	 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
-	 * wasn't committed yet.  Ideally we'd ensure that client 2 would
-	 * eventually get transaction 1's notify message, but there's no way
-	 * to do that; until we're in the listener array, there's no guarantee
-	 * that the notify message doesn't get removed from the queue.
-	 *
-	 * Therefore the coding technique transaction 2 is using is unsafe:
-	 * applications must commit a LISTEN before inspecting database state,
-	 * if they want to ensure they will see notifications about subsequent
-	 * changes to that state.
-	 *
-	 * What we do guarantee is that we'll see all notifications from
-	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
-	 * so no not-yet-committed messages can be removed from the queue
-	 * before we see them.
-	 *----------
-	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -1979,8 +1933,7 @@ asyncQueueReadAllNotifications(void)
 			 * while sending the notifications to the frontend.
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+													   page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -1992,8 +1945,6 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
-	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2004,23 +1955,22 @@ asyncQueueReadAllNotifications(void)
  * memory.  (We could access the page right in shared memory, but that
  * would imply holding the SLRU bank lock throughout this routine.)
  *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * We stop if we reach the "stop" position or reach the end of the page.
  *
- * The function returns true once we have reached the stop position or an
- * uncommitted notification, and false if we have finished with the page.
+ * The function returns true once we have reached the stop position, and false
+ * if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
-							 Snapshot snapshot)
+							 char *page_buffer)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
 	AsyncQueueEntry *qe;
+	Snapshot	snap = ActiveSnapshotSet() ? GetActiveSnapshot() : NULL;
 
 	do
 	{
@@ -2032,69 +1982,378 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * If the producing XID is present in our MVCC snapshot (i.e., not yet
+		 * visible), stop processing further notifications for now. Leave
+		 * *current at this entry so we will retry it later when visible.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		if (qe->dboid == MyDatabaseId && snap != NULL &&
+			XidInMVCCSnapshot(qe->xid, snap))
+		{
+			reachedStop = true;
+			break;
+		}
+
+		/* Advance *current by one fixed-size compact entry. */
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
 		if (qe->dboid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, snapshot))
+			/*
+			 * Since queue entries are written atomically with commit records
+			 * while holding NotifyQueueLock exclusively, all entries in the queue
+			 * are guaranteed to be from committed transactions.
+			 *
+			 * Step 5: Read notification data using stored LSN from WAL.
+			 * The compact entry only contains metadata.
+			 */
+			processNotificationFromWAL(qe->notify_lsn);
+		}
+
+		/* Loop back if we're not at end of page */
+	} while (!reachedEndOfPage);
+
+	if (QUEUE_POS_EQUAL(*current, stop))
+		reachedStop = true;
+
+	return reachedStop;
+}
+
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+	Oid			dboid;
+	uint32		nnotifications;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+	/* Start reading exactly at the NOTIFY_DATA record begin LSN */
+	XLogBeginRead(xlogreader, notify_lsn);
+
+	/* Read the NOTIFY_DATA record */
+	record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+	if (record == NULL)
+	{
+		XLogReaderFree(xlogreader);
+		elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+			 LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+	}
+
+	/* Verify this is the expected record type */
+	if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+		(XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+		elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u",
+			 LSN_FORMAT_ARGS(notify_lsn),
+			 XLogRecGetRmid(xlogreader),
+			 (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK));
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	dboid = xlrec->dbid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+	nnotifications = xlrec->nnotifications;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (dboid == MyDatabaseId && IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+
+/*
+ * AsyncNotifyOldestRequiredLSN
+ *
+ * Compute the oldest WAL LSN required to satisfy NOTIFY delivery for any
+ * still-present queue entry. Returns true and sets *oldest_lsn when the
+ * queue is non-empty (QUEUE_TAIL != QUEUE_HEAD). Otherwise returns false.
+ *
+ * We look at the queue entry at QUEUE_TAIL; since that is the oldest entry
+ * still needed by some listener, its notify_lsn is the minimum WAL position
+ * that must be retained.
+ */
+bool
+AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn)
+{
+	XLogRecPtr committed_min = InvalidXLogRecPtr;
+	XLogRecPtr pin_min = InvalidXLogRecPtr;
+	bool have_any = false;
+
+	/* First, scan per-backend pins under ProcArrayLock */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+	{
+		int i;
+		for (i = 0; i < MaxBackends; i++)
+		{
+			PGPROC *proc = GetPGProcByNumber(i);
+			XLogRecPtr lsn;
+
+			/* Skip unused PGPROC slots */
+			if (proc->pid == 0)
+				continue;
+
+			lsn = proc->notifyDataLsn;
+			if (!XLogRecPtrIsInvalid(lsn))
 			{
-				/*
-				 * The source transaction is still in progress, so we can't
-				 * process this message yet.  Break out of the loop, but first
-				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
-				 */
-				*current = thisentry;
-				reachedStop = true;
-				break;
+				if (XLogRecPtrIsInvalid(pin_min) || lsn < pin_min)
+					pin_min = lsn;
 			}
-			else if (TransactionIdDidCommit(qe->xid))
+		}
+	}
+	LWLockRelease(ProcArrayLock);
+
+	/* Then, scan per-page committed mins under shared lock */
+	LWLockAcquire(NotifyQueueLock, LW_SHARED);
+	if (NotifyPageMins != NULL)
+	{
+		int i;
+		for (i = 0; i < max_notify_queue_pages; i++)
+		{
+			if (NotifyPageMins[i].page_no >= 0 && !XLogRecPtrIsInvalid(NotifyPageMins[i].min_lsn))
 			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
+				if (XLogRecPtrIsInvalid(committed_min) || (NotifyPageMins[i].min_lsn < committed_min))
+					committed_min = NotifyPageMins[i].min_lsn;
+			}
+		}
+	}
+	LWLockRelease(NotifyQueueLock);
 
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
+	if (!XLogRecPtrIsInvalid(pin_min))
+	{
+		*oldest_lsn = pin_min;
+		have_any = true;
+	}
+	if (!XLogRecPtrIsInvalid(committed_min))
+	{
+		if (!have_any || (committed_min < *oldest_lsn))
+			*oldest_lsn = committed_min;
+		have_any = true;
+	}
 
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
+	return have_any;
+}
+
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int64		entry_pageno = -1; /* page where the entry is written */
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dboid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/* Capacity was reserved in PreCommit_Notify. Just write the entry. */
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		entry_pageno = pageno;
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
 			}
-			else
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
 			{
-				/*
-				 * The source transaction aborted or crashed, so we just
-				 * ignore its notifications.
-				 */
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
 			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
 		}
 
-		/* Loop back if we're not at end of page */
-	} while (!reachedEndOfPage);
+		/* Update the global queue head and consume reservation (not in recovery) */
+		QUEUE_HEAD = queue_head;
+		if (!RecoveryInProgress())
+		{
+			Assert(asyncQueueControl->reservedEntries > 0);
+			asyncQueueControl->reservedEntries--;
+		}
+	}
+	else
+	{
+		/*
+		 * No room on current page. Move to the next page and write entry at
+		 * offset 0; padding is unnecessary with fixed-size entries and bounded
+		 * scans that stop at QUEUE_HEAD.
+		 */
+		LWLockRelease(banklock);
 
-	if (QUEUE_POS_EQUAL(*current, stop))
-		reachedStop = true;
+		/* Move head to the start of the next page */
+		SET_QUEUE_POS(queue_head, QUEUE_POS_PAGE(queue_head) + 1, 0);
 
-	return reachedStop;
+		/* Ensure next page is present */
+		banklock = SimpleLruGetBankLock(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+		LWLockAcquire(banklock, LW_EXCLUSIVE);
+		slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+		NotifyCtl->shared->page_dirty[slotno] = true;
+
+		/* Write entry at beginning of the new page */
+		memcpy(NotifyCtl->shared->page_buffer[slotno], &entry, sizeof(AsyncQueueEntry));
+
+		entry_pageno = QUEUE_POS_PAGE(queue_head);
+
+		/* Advance queue head and initialize subsequent page if needed */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			LWLock *nextlock;
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head and consume reservation (not in recovery) */
+		QUEUE_HEAD = queue_head;
+		if (!RecoveryInProgress())
+		{
+			Assert(asyncQueueControl->reservedEntries > 0);
+			asyncQueueControl->reservedEntries--;
+		}
+	}
+
+	/* Update per-page minimum under locks. */
+	if (entry_pageno >= 0)
+	{
+		/* Caller holds NotifyQueueLock EXCLUSIVE (see xact.c commit path). */
+		NotifyPageMinUpdateForPage(entry_pageno, notify_lsn);
+	}
+
+	LWLockRelease(banklock);
 }
 
 /*
@@ -2161,6 +2420,8 @@ asyncQueueAdvanceTail(void)
 		SimpleLruTruncate(NotifyCtl, newtailpage);
 
 		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		/* Invalidate per-page mins for pages we are truncating */
+		NotifyPageMinInvalidateRange(oldtailpage, newtailpage);
 		QUEUE_STOP_PAGE = newtailpage;
 		LWLockRelease(NotifyQueueLock);
 	}
@@ -2395,3 +2656,59 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+			   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+	XLogRegisterData(data, data_len);
+
+	(void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+	/* Return the begin LSN of the record we just inserted. */
+	return ProcLastRecPtr;
+}
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 918db53dd5e..8969a3bef09 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -323,18 +323,17 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 			}
 
 		case T_ListenStmt:
-		case T_NotifyStmt:
 			{
 				/*
-				 * NOTIFY requires an XID assignment, so it can't be permitted
-				 * on a standby. Perhaps LISTEN could, since without NOTIFY it
-				 * would be OK to just do nothing, at least until promotion,
-				 * but we currently prohibit it lest the user get the wrong
-				 * idea.
-				 *
-				 * (We do allow T_UnlistenStmt on a standby, though, because
-				 * it's a no-op.)
+				 * Allow LISTEN during recovery. With WAL-based notifications,
+				 * standbys can subscribe and receive NOTIFYs from WAL replay.
 				 */
+				return COMMAND_OK_IN_RECOVERY | COMMAND_OK_IN_READ_ONLY_TXN;
+			}
+
+		case T_NotifyStmt:
+			{
+				/* NOTIFY still requires an XID; disallow during recovery. */
 				return COMMAND_OK_IN_READ_ONLY_TXN;
 			}
 
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..b35b007e51c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -19,6 +19,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "fe_utils/archive.h"
 #include "filemap.h"
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..03e73ae33c9 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
@@ -23,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
diff --git a/src/bin/pg_waldump/t/001_basic.pl b/src/bin/pg_waldump/t/001_basic.pl
index f26d75e01cf..45cfe40e5a8 100644
--- a/src/bin/pg_waldump/t/001_basic.pl
+++ b/src/bin/pg_waldump/t/001_basic.pl
@@ -73,7 +73,8 @@ BRIN
 CommitTs
 ReplicationOrigin
 Generic
-LogicalMessage$/,
+LogicalMessage
+Async$/,
 	'rmgr list');
 
 
diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h
new file mode 100644
index 00000000000..d4c0c828e84
--- /dev/null
+++ b/src/include/access/async_xlog.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * async_xlog.h
+ *	  Async notification WAL definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/async_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ASYNC_XLOG_H
+#define ASYNC_XLOG_H
+
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+/*
+ * WAL record types for async notifications
+ */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00	/* notification data */
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+#endif							/* ASYNC_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 4528e51829e..2d8709ba51e 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -195,6 +195,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
 #define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
+#define XACT_XINFO_HAS_NOTIFY			(1U << 9)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -318,6 +319,11 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_notify
+{
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} xl_xact_notify;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -331,6 +337,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -404,6 +411,8 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	XLogRecPtr	notify_lsn;		/* LSN of notification data */
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..90870e6ae82 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,25 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dboid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +60,20 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+/* notification queue functions */
+extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+
+/* wake listeners for a specific database (used during standby replay) */
+extern void SignalBackendsForDatabase(Oid dboid);
+
+/* Spill helper to be called before WAL recycle */
+extern bool AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..8f5c1ed24ff 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyDataLsn;		/* LSN of NOTIFY data record for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..01423582d64 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -107,10 +107,10 @@ step l2stop: UNLISTEN *;
 starting permutation: llisten lbegin usage bignotify usage
 step llisten: LISTEN c1; LISTEN c2;
 step lbegin: BEGIN;
-step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+step usage: SELECT pg_notification_queue_usage() = 0 AS nonzero;
 nonzero
 -------
-f      
+t      
 (1 row)
 
 step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
@@ -119,7 +119,7 @@ count
  1000
 (1 row)
 
-step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
+step usage: SELECT pg_notification_queue_usage() = 0 AS nonzero;
 nonzero
 -------
 t      
diff --git a/src/test/isolation/expected/stats.out b/src/test/isolation/expected/stats.out
index cfad309ccf3..f2494b24af7 100644
--- a/src/test/isolation/expected/stats.out
+++ b/src/test/isolation/expected/stats.out
@@ -3100,7 +3100,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3140,7 +3140,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3236,7 +3236,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3249,7 +3249,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3316,7 +3316,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3383,7 +3383,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3437,7 +3437,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_clear_snapshot: SELECT pg_stat_clear_snapshot();
@@ -3455,7 +3455,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3528,7 +3528,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3601,7 +3601,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3701,7 +3701,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s2_func_call: SELECT test_stat_func()
diff --git a/src/test/isolation/expected/stats_1.out b/src/test/isolation/expected/stats_1.out
index e1d937784cb..5e2c1fe5d9f 100644
--- a/src/test/isolation/expected/stats_1.out
+++ b/src/test/isolation/expected/stats_1.out
@@ -3124,7 +3124,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3164,7 +3164,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3260,7 +3260,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3273,7 +3273,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3340,7 +3340,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3407,7 +3407,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 
@@ -3461,7 +3461,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_clear_snapshot: SELECT pg_stat_clear_snapshot();
@@ -3479,7 +3479,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3552,7 +3552,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3625,7 +3625,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s1_commit: COMMIT;
@@ -3725,7 +3725,7 @@ step s1_slru_check_stats:
 
 ?column?
 --------
-t       
+f       
 (1 row)
 
 step s2_func_call: SELECT test_stat_func()
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..2e2e3e186be 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,7 +31,7 @@ step notifys1	{
 	ROLLBACK TO SAVEPOINT s2;
 	COMMIT;
 }
-step usage		{ SELECT pg_notification_queue_usage() > 0 AS nonzero; }
+step usage		{ SELECT pg_notification_queue_usage() = 0 AS nonzero; }
 step bignotify	{ SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
 teardown		{ UNLISTEN *; }
 
diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..8119e6c761f
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,13 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/002_queue_full.pl b/src/test/modules/test_listen_notify/t/002_queue_full.pl
new file mode 100644
index 00000000000..0923ae3f3e3
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_queue_full.pl
@@ -0,0 +1,78 @@
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::BackgroundPsql;
+use Test::More;
+
+# Verify that NOTIFY errors when the notification queue reaches the configured
+# maximum page distance. Use a small max_notify_queue_pages so we can reach
+# the limit quickly. With fixed-size compact entries, the last slot on the page
+# cannot be used because advancing would require preparing the next page, which
+# exceeds the allowed window.
+
+my $node = PostgreSQL::Test::Cluster->new('t_queue_full');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+max_notify_queue_pages = 64
+fsync = off
+synchronous_commit = off
+full_page_writes = off
+autovacuum = off
+});
+$node->start;
+
+# Create a listener that registers and then stays in a transaction so it does
+# not process incoming notifications, preventing the queue tail from advancing.
+my $listener = $node->background_psql('postgres');
+$listener->query_safe('LISTEN tap_queue_full');
+$listener->query_safe('BEGIN');
+
+my $sender = $node->background_psql('postgres', on_error_stop => 0);
+my $full_seen = 0;
+my $stderr_msg = '';
+
+# Fill using moderate chunks to reach the boundary quickly.
+my $chunk_sql = ("NOTIFY tap_queue_full, 'x';\n" x 500);
+for my $iter (1..200) {
+    last if $full_seen;
+    my ($out, $errflag) = $sender->query($chunk_sql);
+    my $errtxt = $sender->{stderr};
+    if ($errtxt =~ /(asynchronous notification queue is full[^\n]*)/i) {
+        $stderr_msg = $1;
+        $full_seen = 1;
+        last;
+    }
+}
+
+ok($full_seen, 'NOTIFY fails once queue reaches configured maximum');
+like($stderr_msg, qr/asynchronous notification queue is full/i,
+     'error message mentions full NOTIFY queue');
+
+# Now verify concurrent attempts also fail: create several senders and have
+# each issue exactly one NOTIFY; all must be rejected.
+my $n_concurrent = 4;
+my @senders;
+for (1..$n_concurrent) {
+    push @senders, $node->background_psql('postgres', on_error_stop => 0);
+}
+
+my $all_failed = 1;
+for my $s (@senders, $sender) {
+    my ($out, $errflag) = $s->query("NOTIFY tap_queue_full, 'x'");
+    my $errtxt = $s->{stderr};
+    $all_failed &&= ($errflag && $errtxt =~ /asynchronous notification queue is full/i);
+}
+
+ok($all_failed, 'all concurrent NOTIFY attempts are rejected at boundary');
+
+# Cleanup sessions and node
+for my $s (@senders) {
+    $s->quit();
+}
+$sender->quit();
+$listener->query('ROLLBACK');
+$listener->quit();
+$node->stop('fast');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl b/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl
new file mode 100644
index 00000000000..cb36ef26832
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl
@@ -0,0 +1,102 @@
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use IPC::Run qw(start pump finish timeout);
+use Test::More;
+
+# Goal: Verify that NOTIFY pins WAL segments via notify LSN so WAL recycling
+# does not remove needed segments. With a listener idle-in-transaction, queued
+# NOTIFY entries remain unconsumed, so RemoveOldXlogFiles should keep older
+# segments. After releasing the listener, notifications are delivered and WAL
+# recycling can proceed.
+
+my $node = PostgreSQL::Test::Cluster->new('wal_pin');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+fsync = off
+synchronous_commit = off
+full_page_writes = off
+autovacuum = off
+wal_level = replica
+max_wal_size = '64MB'
+min_wal_size = '32MB'
+checkpoint_timeout = '30s'
+trace_notify = on
+log_min_messages = debug1
+});
+$node->start;
+
+# Helper to count WAL segment files in pg_wal (24-hex-digit filenames)
+sub count_wal_files {
+    my $wal_dir = $node->basedir . '/pgdata/pg_wal';
+    opendir(my $dh, $wal_dir) or die "cannot open pg_wal: $!";
+    my @segs = grep { /^[0-9A-F]{24}$/ } readdir($dh);
+    closedir($dh);
+    return scalar(@segs);
+}
+
+# Start a psql session that LISTENs and then pins by going idle-in-xact.
+my $psql = [ $node->installed_command('psql'), '--no-psqlrc', '--quiet', '--dbname' => $node->connstr('postgres') ];
+my ($in, $out, $err) = ('','','');
+my $tmo = timeout($PostgreSQL::Test::Utils::timeout_default);
+my $h = start $psql, '<' => \$in, '>' => \$out, '2>' => \$err, $tmo;
+
+$in .= "\\set ON_ERROR_STOP 1\nLISTEN wal_pin_t;\n";
+$h->pump();
+
+# Commit the LISTEN (autocommit), then begin a transaction to be idle-in-xact.
+$in .= "BEGIN;\n";
+$h->pump();
+
+# Produce a bunch of NOTIFYs in autocommit to populate the queue and WAL.
+my $notify_count = 2000;
+my $sql = ("NOTIFY wal_pin_t, 'p';\n" x $notify_count);
+my ($ret, $stdout, $stderr) = $node->psql('postgres', "\\set ON_ERROR_STOP 1\n$sql");
+is($ret, 0, 'NOTIFY batch succeeded');
+
+# Force WAL generation and recycling attempts.
+for my $i (1..150) {
+    $node->safe_psql('postgres', 'SELECT pg_switch_wal()');
+}
+for my $i (1..20) {
+    $node->safe_psql('postgres', 'CHECKPOINT');
+}
+
+# Assert that NOTIFY WAL pinning path was invoked.
+my $log = $node->logfile;
+open my $lfh, '<', $log or die "cannot open postmaster log $log: $!";
+my $logtxt = do { local $/; <$lfh> };
+close $lfh;
+like($logtxt, qr/async notify: (?:WAL recycle pinned by oldest notify LSN|checking WAL pin; oldest notify LSN)/i,
+     'saw WAL pinning path invoked');
+
+note('completed WAL switch/checkpoint churn with listener pinned');
+
+# Release listener so it can process notifications, and force a round trip.
+$in .= "ROLLBACK;\nSELECT 1;\n";
+$h->pump();
+
+# Pump to capture async outputs printed by psql
+my $saw = 0;
+for my $i (1..1000) {
+  eval { $h->pump(); 1 } or do { };
+  if ($out =~ /Asynchronous notification/i) { $saw = 1; last; }
+  select(undef, undef, undef, 0.02);
+}
+ok($saw, 'notifications printed after releasing listener');
+
+# Encourage tail advancement and WAL cleanup.
+for my $i (1..20) {
+    $node->safe_psql('postgres', 'SELECT pg_notification_queue_usage()');
+    $node->safe_psql('postgres', 'CHECKPOINT');
+}
+
+note('post-release: queue advanced and checkpoints executed');
+
+$in .= "\\q\n";
+eval { $h->finish; 1 } or do {};
+
+$node->stop('fast');
+
+done_testing();
-- 
2.51.0

