From d37772e3337a083e719be2fc9d93474792bc8c8d Mon Sep 17 00:00:00 2001
From: rbagga <bangalorian@gmail.com>
Date: Sun, 7 Sep 2025 16:55:57 -0700
Subject: [PATCH v4] Implement WAL-based async notifications for improved
 throughput

- Added WAL logging for async notifications to improve scalability
- Implemented async resource manager for WAL-based notification handling
- Added new async descriptor files for pg_waldump support
- Updated makefiles and build configuration for new components
---
 src/backend/access/rmgrdesc/Makefile    |   1 +
 src/backend/access/rmgrdesc/asyncdesc.c |  47 ++
 src/backend/access/rmgrdesc/meson.build |   1 +
 src/backend/access/rmgrdesc/xactdesc.c  |  13 +
 src/backend/access/transam/rmgr.c       |   1 +
 src/backend/access/transam/xact.c       |  48 +-
 src/backend/commands/async.c            | 800 ++++++++++++------------
 src/bin/pg_rewind/parsexlog.c           |   1 +
 src/bin/pg_waldump/rmgrdesc.c           |   2 +
 src/include/access/async_xlog.h         |  43 ++
 src/include/access/rmgrlist.h           |   1 +
 src/include/access/xact.h               |   9 +
 src/include/commands/async.h            |  25 +
 src/include/storage/proc.h              |   3 +
 14 files changed, 591 insertions(+), 404 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c
 create mode 100644 src/include/access/async_xlog.h

diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..6e6e75b12bd 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	asyncdesc.o \
 	brindesc.o \
 	clogdesc.o \
 	committsdesc.o \
diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c
new file mode 100644
index 00000000000..7f322849ff1
--- /dev/null
+++ b/src/backend/access/rmgrdesc/asyncdesc.c
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * asyncdesc.c
+ *	  rmgr descriptor routines for access/async.c
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/asyncdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/async_xlog.h"
+
+void
+async_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_ASYNC_NOTIFY_DATA)
+	{
+		xl_async_notify_data *xlrec = (xl_async_notify_data *) rec;
+
+		appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u",
+						 xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications);
+	}
+}
+
+const char *
+async_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			id = "NOTIFY_DATA";
+			break;
+	}
+
+	return id;
+}
\ No newline at end of file
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index 96c98e800c2..38bef2e87f6 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -2,6 +2,7 @@
 
 # used by frontend programs like pg_waldump
 rmgr_desc_sources = files(
+  'asyncdesc.c',
   'brindesc.c',
   'clogdesc.c',
   'committsdesc.c',
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index f0f696855b9..4f32f7fc591 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 
 		data += sizeof(xl_xact_origin);
 	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		xl_xact_notify xl_notify;
+
+		/* no alignment is guaranteed, so copy onto stack */
+		memcpy(&xl_notify, data, sizeof(xl_notify));
+
+		parsed->notify_lsn = xl_notify.notify_lsn;
+
+		data += sizeof(xl_xact_notify);
+	}
+
 }
 
 void
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 1b7499726eb..f8c25e6597a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -19,6 +19,7 @@
 
 /* includes needed for "access/rmgrlist.h" */
 /* IWYU pragma: begin_keep */
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b46e7e9c2a6..33b16ff4746 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
+	xl_xact_notify xl_notify;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
+	/*
+	 * Handle notification commit ordering: if this transaction has pending
+	 * notifications, we must write the queue entry just before the commit
+	 * record while holding NotifyQueueLock to ensure proper ordering.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		TransactionId xid = GetCurrentTransactionId();
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn);
+	}
+
 	xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc commit */
@@ -5926,9 +5940,17 @@ XactLogCommitRecord(TimestampTz commit_time,
 		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
+	/* include notification information if present */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY;
+		xl_notify.notify_lsn = MyProc->notifyCommitLsn;
+	}
+
 	if (xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
+
 	/* Then include all the collected data into the commit record. */
 
 	XLogBeginInsert();
@@ -5982,10 +6004,28 @@ XactLogCommitRecord(TimestampTz commit_time,
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData(&xl_origin, sizeof(xl_xact_origin));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY)
+		XLogRegisterData(&xl_notify, sizeof(xl_xact_notify));
+
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/* Insert the commit record */
+	result = XLogInsert(RM_XACT_ID, info);
+
+	/*
+	 * Release NotifyQueueLock if we held it. The queue entry is now
+	 * associated with a committed transaction, so readers can process it.
+	 */
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		LWLockRelease(NotifyQueueLock);
+		
+		/* Signal listening backends to check for new notifications */
+		SignalBackends();
+	}
+
+	return result;
 }
 
 /*
@@ -6227,6 +6267,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 						   false /* backward */ , false /* WAL */ );
 	}
 
+	/* Add notification queue entry if this commit has notifications */
+	if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY)
+	{
+		asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn);
+	}
+
 	/* Make sure files supposed to be dropped are dropped */
 	if (parsed->nrels > 0)
 	{
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..57fa732e9b8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,12 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "access/xlogrecovery.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -151,6 +157,29 @@
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
 
+/* Missing definitions for WAL-based notification system */
+#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE
+#define SLRU_PAGE_SIZE BLCKSZ
+#define AsyncCtl NotifyCtl
+
+/* WAL record types */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -163,30 +192,13 @@
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
 /*
- * Struct representing an entry in the global notify queue
- *
- * This struct declaration has the maximal length, but in a real queue entry
- * the data area is only big enough for the actual channel and payload strings
- * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
- * entry size, if both channel and payload strings are empty (but note it
- * doesn't include alignment padding).
- *
- * The "length" field should always be rounded up to the next QUEUEALIGN
- * multiple so that all fields are properly aligned.
+ * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h
+ * as a compact metadata-only structure for the new WAL-based notification system.
+ * The old variable-length structure with full notification content is no longer used.
  */
-typedef struct AsyncQueueEntry
-{
-	int			length;			/* total allocated length of entry */
-	Oid			dboid;			/* sender's database OID */
-	TransactionId xid;			/* sender's XID */
-	int32		srcPid;			/* sender's PID */
-	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
-} AsyncQueueEntry;
-
-/* Currently, no field of AsyncQueueEntry requires more than int alignment */
-#define QUEUEALIGN(len)		INTALIGN(len)
 
-#define AsyncQueueEntryEmptySize	(offsetof(AsyncQueueEntry, data) + 2)
+/* Queue alignment is still needed for SLRU page management */
+#define QUEUEALIGN(len)		INTALIGN(len)
 
 /*
  * Struct describing a queue position, and assorted macros for working with it
@@ -438,18 +450,13 @@ static void Exec_UnlistenCommit(const char *channel);
 static void Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
-static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
-static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
-static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
-static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+void SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
-										 char *page_buffer,
-										 Snapshot snapshot);
+										 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -457,6 +464,7 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static void processNotificationFromWAL(XLogRecPtr notify_lsn);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -890,65 +898,75 @@ PreCommit_Notify(void)
 		}
 	}
 
-	/* Queue any pending notifies (must happen after the above) */
+	/* Write notification data to WAL if we have any */
 	if (pendingNotifies)
 	{
-		ListCell   *nextNotify;
+		TransactionId currentXid;
+		ListCell   *l;
+		size_t		total_size = 0;
+		uint32		nnotifications = 0;
+		char	   *notifications_data;
+		char	   *ptr;
+		XLogRecPtr	notify_lsn;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
 		 * GetCurrentTransactionId is cheap if we already have an XID, but not
-		 * so cheap if we don't, and we'd prefer not to do that work while
-		 * holding NotifyQueueLock.
+		 * so cheap if we don't.
 		 */
-		(void) GetCurrentTransactionId();
+		currentXid = GetCurrentTransactionId();
 
 		/*
-		 * Serialize writers by acquiring a special lock that we hold till
-		 * after commit.  This ensures that queue entries appear in commit
-		 * order, and in particular that there are never uncommitted queue
-		 * entries ahead of committed ones, so an uncommitted transaction
-		 * can't block delivery of deliverable notifications.
-		 *
-		 * We use a heavyweight lock so that it'll automatically be released
-		 * after either commit or abort.  This also allows deadlocks to be
-		 * detected, though really a deadlock shouldn't be possible here.
-		 *
-		 * The lock is on "database 0", which is pretty ugly but it doesn't
-		 * seem worth inventing a special locktag category just for this.
-		 * (Historical note: before PG 9.0, a similar lock on "database 0" was
-		 * used by the flatfiles mechanism.)
+		 * Step 1: Write notification data to WAL.
+		 * This can be done in parallel with other transactions since we're
+		 * not holding any global locks yet.
 		 */
-		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
-						 AccessExclusiveLock);
+		
+		/* First pass: calculate total size needed for serialization */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+			
+			/* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */
+			total_size += 4 + n->channel_len + 1 + n->payload_len + 1;
+			nnotifications++;
+		}
 
-		/* Now push the notifications into the queue */
-		nextNotify = list_head(pendingNotifies->events);
-		while (nextNotify != NULL)
+		/* Allocate buffer for notification data */
+		notifications_data = palloc(total_size);
+		ptr = notifications_data;
+
+		/* Second pass: serialize all notifications */
+		foreach(l, pendingNotifies->events)
 		{
-			/*
-			 * Add the pending notifications to the queue.  We acquire and
-			 * release NotifyQueueLock once per page, which might be overkill
-			 * but it does allow readers to get in while we're doing this.
-			 *
-			 * A full queue is very uncommon and should really not happen,
-			 * given that we have so much space available in the SLRU pages.
-			 * Nevertheless we need to deal with this possibility. Note that
-			 * when we get here we are in the process of committing our
-			 * transaction, but we have not yet committed to clog, so at this
-			 * point in time we can still roll the transaction back.
-			 */
-			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-			asyncQueueFillWarning();
-			if (asyncQueueIsFull())
-				ereport(ERROR,
-						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-						 errmsg("too many notifications in the NOTIFY queue")));
-			nextNotify = asyncQueueAddEntries(nextNotify);
-			LWLockRelease(NotifyQueueLock);
+			Notification *n = (Notification *) lfirst(l);
+			char	   *channel = n->data;
+			char	   *payload = n->data + n->channel_len + 1;
+
+			/* Write channel length, payload length, channel, and payload */
+			memcpy(ptr, &n->channel_len, 2);
+			ptr += 2;
+			memcpy(ptr, &n->payload_len, 2);
+			ptr += 2;
+			memcpy(ptr, channel, n->channel_len + 1);
+			ptr += n->channel_len + 1;
+			memcpy(ptr, payload, n->payload_len + 1);
+			ptr += n->payload_len + 1;
 		}
 
-		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
+		/* Write notification data to WAL */
+		notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid,
+										nnotifications, total_size,
+										notifications_data);
+
+		pfree(notifications_data);
+
+		/*
+		 * Step 2: Store the notification LSN in PROC for use during commit.
+		 * The queue entry will be written just before the commit record
+		 * while holding the global notification commit lock to ensure proper ordering.
+		 */
+		MyProc->notifyCommitLsn = notify_lsn;
 	}
 }
 
@@ -1006,12 +1024,19 @@ AtCommit_Notify(void)
 		asyncQueueUnregister();
 
 	/*
-	 * Send signals to listening backends.  We need do this only if there are
-	 * pending notifies, which were previously added to the shared queue by
-	 * PreCommit_Notify().
+	 * If we had notifications, they were already written to the queue in
+	 * PreCommit_Notify. Now that we've committed, signal listening backends
+	 * to check the queue. The transaction visibility logic will now see our
+	 * XID as committed and process the notifications.
 	 */
-	if (pendingNotifies != NULL)
+	if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn))
+	{
+		/* Signal listening backends to check the queue */
 		SignalBackends();
+		
+		/* Clear the flag now that we're done */
+		MyProc->notifyCommitLsn = InvalidXLogRecPtr;
+	}
 
 	/*
 	 * If it's time to try to advance the global tail pointer, do that.
@@ -1263,21 +1288,6 @@ asyncQueueUnregister(void)
 	amRegisteredListener = false;
 }
 
-/*
- * Test whether there is room to insert more notification messages.
- *
- * Caller must hold at least shared NotifyQueueLock.
- */
-static bool
-asyncQueueIsFull(void)
-{
-	int64		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int64		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
-	int64		occupied = headPage - tailPage;
-
-	return occupied >= max_notify_queue_pages;
-}
-
 /*
  * Advance the QueuePosition to the next entry, assuming that the current
  * entry is of length entryLength.  If we jump to a new page the function
@@ -1313,166 +1323,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
 	return pageJump;
 }
 
-/*
- * Fill the AsyncQueueEntry at *qe with an outbound notification message.
- */
-static void
-asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
-{
-	size_t		channellen = n->channel_len;
-	size_t		payloadlen = n->payload_len;
-	int			entryLength;
-
-	Assert(channellen < NAMEDATALEN);
-	Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
-
-	/* The terminators are already included in AsyncQueueEntryEmptySize */
-	entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
-	entryLength = QUEUEALIGN(entryLength);
-	qe->length = entryLength;
-	qe->dboid = MyDatabaseId;
-	qe->xid = GetCurrentTransactionId();
-	qe->srcPid = MyProcPid;
-	memcpy(qe->data, n->data, channellen + payloadlen + 2);
-}
-
-/*
- * Add pending notifications to the queue.
- *
- * We go page by page here, i.e. we stop once we have to go to a new page but
- * we will be called again and then fill that next page. If an entry does not
- * fit into the current page, we write a dummy entry with an InvalidOid as the
- * database OID in order to fill the page. So every page is always used up to
- * the last byte which simplifies reading the page later.
- *
- * We are passed the list cell (in pendingNotifies->events) containing the next
- * notification to write and return the first still-unwritten cell back.
- * Eventually we will return NULL indicating all is done.
- *
- * We are holding NotifyQueueLock already from the caller and grab
- * page specific SLRU bank lock locally in this function.
- */
-static ListCell *
-asyncQueueAddEntries(ListCell *nextNotify)
-{
-	AsyncQueueEntry qe;
-	QueuePosition queue_head;
-	int64		pageno;
-	int			offset;
-	int			slotno;
-	LWLock	   *prevlock;
-
-	/*
-	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
-	 * memory upon exiting.  The reason for this is that if we have to advance
-	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
-	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
-	 * subsequent insertions would try to put entries into a page that slru.c
-	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
-	 * that if we do fail, any already-inserted queue entries are forgotten;
-	 * this is okay, since they'd be useless anyway after our transaction
-	 * rolls back.
-	 */
-	queue_head = QUEUE_HEAD;
-
-	/*
-	 * If this is the first write since the postmaster started, we need to
-	 * initialize the first page of the async SLRU.  Otherwise, the current
-	 * page should be initialized already, so just fetch it.
-	 */
-	pageno = QUEUE_POS_PAGE(queue_head);
-	prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
-
-	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
-	LWLockAcquire(prevlock, LW_EXCLUSIVE);
-
-	if (QUEUE_POS_IS_ZERO(queue_head))
-		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
-	else
-		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
-								   InvalidTransactionId);
-
-	/* Note we mark the page dirty before writing in it */
-	NotifyCtl->shared->page_dirty[slotno] = true;
-
-	while (nextNotify != NULL)
-	{
-		Notification *n = (Notification *) lfirst(nextNotify);
-
-		/* Construct a valid queue entry in local variable qe */
-		asyncQueueNotificationToEntry(n, &qe);
-
-		offset = QUEUE_POS_OFFSET(queue_head);
-
-		/* Check whether the entry really fits on the current page */
-		if (offset + qe.length <= QUEUE_PAGESIZE)
-		{
-			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies->events, nextNotify);
-		}
-		else
-		{
-			/*
-			 * Write a dummy entry to fill up the page. Actually readers will
-			 * only check dboid and since it won't match any reader's database
-			 * OID, they will ignore this entry and move on.
-			 */
-			qe.length = QUEUE_PAGESIZE - offset;
-			qe.dboid = InvalidOid;
-			qe.data[0] = '\0';	/* empty channel */
-			qe.data[1] = '\0';	/* empty payload */
-		}
-
-		/* Now copy qe into the shared buffer page */
-		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
-			   &qe,
-			   qe.length);
-
-		/* Advance queue_head appropriately, and detect if page is full */
-		if (asyncQueueAdvance(&(queue_head), qe.length))
-		{
-			LWLock	   *lock;
-
-			pageno = QUEUE_POS_PAGE(queue_head);
-			lock = SimpleLruGetBankLock(NotifyCtl, pageno);
-			if (lock != prevlock)
-			{
-				LWLockRelease(prevlock);
-				LWLockAcquire(lock, LW_EXCLUSIVE);
-				prevlock = lock;
-			}
-
-			/*
-			 * Page is full, so we're done here, but first fill the next page
-			 * with zeroes.  The reason to do this is to ensure that slru.c's
-			 * idea of the head page is always the same as ours, which avoids
-			 * boundary problems in SimpleLruTruncate.  The test in
-			 * asyncQueueIsFull() ensured that there is room to create this
-			 * page without overrunning the queue.
-			 */
-			slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
-
-			/*
-			 * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
-			 * set flag to remember that we should try to advance the tail
-			 * pointer (we don't want to actually do that right here).
-			 */
-			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
-				tryAdvanceTail = true;
-
-			/* And exit the loop */
-			break;
-		}
-	}
-
-	/* Success, so update the global QUEUE_HEAD */
-	QUEUE_HEAD = queue_head;
-
-	LWLockRelease(prevlock);
-
-	return nextNotify;
-}
-
 /*
  * SQL function to return the fraction of the notification queue currently
  * occupied.
@@ -1515,52 +1365,6 @@ asyncQueueUsage(void)
 	return (double) occupied / (double) max_notify_queue_pages;
 }
 
-/*
- * Check whether the queue is at least half full, and emit a warning if so.
- *
- * This is unlikely given the size of the queue, but possible.
- * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-asyncQueueFillWarning(void)
-{
-	double		fillDegree;
-	TimestampTz t;
-
-	fillDegree = asyncQueueUsage();
-	if (fillDegree < 0.5)
-		return;
-
-	t = GetCurrentTimestamp();
-
-	if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
-								   t, QUEUE_FULL_WARN_INTERVAL))
-	{
-		QueuePosition min = QUEUE_HEAD;
-		int32		minPid = InvalidPid;
-
-		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
-		{
-			Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-			min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
-			if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
-				minPid = QUEUE_BACKEND_PID(i);
-		}
-
-		ereport(WARNING,
-				(errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
-				 (minPid != InvalidPid ?
-				  errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
-				  : 0),
-				 (minPid != InvalidPid ?
-				  errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
-				  : 0)));
-
-		asyncQueueControl->lastQueueFillWarn = t;
-	}
-}
 
 /*
  * Send signals to listening backends.
@@ -1577,7 +1381,7 @@ asyncQueueFillWarning(void)
  * This is called during CommitTransaction(), so it's important for it
  * to have very low probability of failure.
  */
-static void
+void
 SignalBackends(void)
 {
 	int32	   *pids;
@@ -1844,15 +1648,13 @@ ProcessNotifyInterrupt(bool flush)
 
 /*
  * Read all pending notifications from the queue, and deliver appropriate
- * ones to my frontend.  Stop when we reach queue head or an uncommitted
- * notification.
+ * ones to my frontend.  Stop when we reach queue head.
  */
 static void
 asyncQueueReadAllNotifications(void)
 {
 	volatile QueuePosition pos;
 	QueuePosition head;
-	Snapshot	snapshot;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1875,46 +1677,6 @@ asyncQueueReadAllNotifications(void)
 		return;
 	}
 
-	/*----------
-	 * Get snapshot we'll use to decide which xacts are still in progress.
-	 * This is trickier than it might seem, because of race conditions.
-	 * Consider the following example:
-	 *
-	 * Backend 1:					 Backend 2:
-	 *
-	 * transaction starts
-	 * UPDATE foo SET ...;
-	 * NOTIFY foo;
-	 * commit starts
-	 * queue the notify message
-	 *								 transaction starts
-	 *								 LISTEN foo;  -- first LISTEN in session
-	 *								 SELECT * FROM foo WHERE ...;
-	 * commit to clog
-	 *								 commit starts
-	 *								 add backend 2 to array of listeners
-	 *								 advance to queue head (this code)
-	 *								 commit to clog
-	 *
-	 * Transaction 2's SELECT has not seen the UPDATE's effects, since that
-	 * wasn't committed yet.  Ideally we'd ensure that client 2 would
-	 * eventually get transaction 1's notify message, but there's no way
-	 * to do that; until we're in the listener array, there's no guarantee
-	 * that the notify message doesn't get removed from the queue.
-	 *
-	 * Therefore the coding technique transaction 2 is using is unsafe:
-	 * applications must commit a LISTEN before inspecting database state,
-	 * if they want to ensure they will see notifications about subsequent
-	 * changes to that state.
-	 *
-	 * What we do guarantee is that we'll see all notifications from
-	 * transactions committing after the snapshot we take here.
-	 * Exec_ListenPreCommit has already added us to the listener array,
-	 * so no not-yet-committed messages can be removed from the queue
-	 * before we see them.
-	 *----------
-	 */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
 
 	/*
 	 * It is possible that we fail while trying to send a message to our
@@ -1979,8 +1741,7 @@ asyncQueueReadAllNotifications(void)
 			 * while sending the notifications to the frontend.
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
+													   page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -1992,8 +1753,6 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
-	/* Done with snapshot */
-	UnregisterSnapshot(snapshot);
 }
 
 /*
@@ -2004,19 +1763,17 @@ asyncQueueReadAllNotifications(void)
  * memory.  (We could access the page right in shared memory, but that
  * would imply holding the SLRU bank lock throughout this routine.)
  *
- * We stop if we reach the "stop" position, or reach a notification from an
- * uncommitted transaction, or reach the end of the page.
+ * We stop if we reach the "stop" position or reach the end of the page.
  *
- * The function returns true once we have reached the stop position or an
- * uncommitted notification, and false if we have finished with the page.
+ * The function returns true once we have reached the stop position, and false
+ * if we have finished with the page.
  * In other words: once it returns true there is no need to look further.
  * The QueuePosition *current is advanced past all processed messages.
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
-							 char *page_buffer,
-							 Snapshot snapshot)
+							 char *page_buffer)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2032,60 +1789,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 		qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
 
 		/*
-		 * Advance *current over this message, possibly to the next page. As
-		 * noted in the comments for asyncQueueReadAllNotifications, we must
-		 * do this before possibly failing while processing the message.
+		 * Advance *current over this compact entry. The new compact entries are
+		 * fixed-size, making this much simpler than the old variable-length entries.
 		 */
-		reachedEndOfPage = asyncQueueAdvance(current, qe->length);
+		reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry));
 
 		/* Ignore messages destined for other databases */
-		if (qe->dboid == MyDatabaseId)
+		if (qe->dbid == MyDatabaseId)
 		{
-			if (XidInMVCCSnapshot(qe->xid, snapshot))
-			{
-				/*
-				 * The source transaction is still in progress, so we can't
-				 * process this message yet.  Break out of the loop, but first
-				 * back up *current so we will reprocess the message next
-				 * time.  (Note: it is unlikely but not impossible for
-				 * TransactionIdDidCommit to fail, so we can't really avoid
-				 * this advance-then-back-up behavior when dealing with an
-				 * uncommitted message.)
-				 *
-				 * Note that we must test XidInMVCCSnapshot before we test
-				 * TransactionIdDidCommit, else we might return a message from
-				 * a transaction that is not yet visible to snapshots; compare
-				 * the comments at the head of heapam_visibility.c.
-				 *
-				 * Also, while our own xact won't be listed in the snapshot,
-				 * we need not check for TransactionIdIsCurrentTransactionId
-				 * because our transaction cannot (yet) have queued any
-				 * messages.
-				 */
-				*current = thisentry;
-				reachedStop = true;
-				break;
-			}
-			else if (TransactionIdDidCommit(qe->xid))
-			{
-				/* qe->data is the null-terminated channel name */
-				char	   *channel = qe->data;
-
-				if (IsListeningOn(channel))
-				{
-					/* payload follows channel name */
-					char	   *payload = qe->data + strlen(channel) + 1;
-
-					NotifyMyFrontEnd(channel, payload, qe->srcPid);
-				}
-			}
-			else
-			{
-				/*
-				 * The source transaction aborted or crashed, so we just
-				 * ignore its notifications.
-				 */
-			}
+			/*
+			 * Since queue entries are written atomically with commit records
+			 * while holding NotifyQueueLock exclusively, all entries in the queue
+			 * are guaranteed to be from committed transactions.
+			 *
+			 * Step 5: Read notification data from WAL using stored LSN.
+			 * The compact entry only contains metadata; actual notification
+			 * content is retrieved from WAL on demand.
+			 */
+			processNotificationFromWAL(qe->notify_lsn);
 		}
 
 		/* Loop back if we're not at end of page */
@@ -2097,6 +1818,220 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 	return reachedStop;
 }
 
+/*
+ * processNotificationFromWAL
+ *
+ * Fetch notification data from WAL using the stored LSN and process
+ * the individual notifications for delivery to listening frontend.
+ * This implements Step 5 of the new WAL-based notification system.
+ */
+static void
+processNotificationFromWAL(XLogRecPtr notify_lsn)
+{
+	XLogReaderState *xlogreader;
+	DecodedXLogRecord *record;
+	xl_async_notify_data *xlrec;
+	char	   *data;
+	char	   *ptr;
+	uint32_t	remaining;
+	int			srcPid;
+	char	   *errormsg;
+
+	/*
+	 * Create XLog reader to fetch the notification data record.
+	 * We use a temporary reader since this is called during normal
+	 * notification processing, not during recovery.
+	 */
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									NULL);
+	if (!xlogreader)
+		elog(ERROR, "failed to allocate XLog reader for notification data");
+
+    /* Start reading exactly at the NOTIFY_DATA record begin LSN */
+    XLogBeginRead(xlogreader, notify_lsn);
+
+    /* Read the NOTIFY_DATA record */
+    record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg);
+    if (record == NULL)
+        elog(ERROR, "failed to read notification data from WAL at %X/%X: %s",
+             LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message");
+
+    /* Verify this is the expected record type */
+    if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID ||
+        (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA)
+        elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u",
+             LSN_FORMAT_ARGS(notify_lsn),
+             XLogRecGetRmid(xlogreader),
+             (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK));
+
+	/* Extract the notification data from the WAL record */
+	xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader);
+	srcPid = xlrec->srcPid;
+	data = (char *) xlrec + SizeOfAsyncNotifyData;
+	ptr = data;
+	remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData;
+
+	/*
+	 * Process each notification in the serialized data.
+	 * The format is: 2-byte channel_len, 2-byte payload_len,
+	 * null-terminated channel, null-terminated payload.
+	 */
+	for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++)
+	{
+		uint16		channel_len;
+		uint16		payload_len;
+		char	   *channel;
+		char	   *payload;
+
+		/* Read lengths */
+		memcpy(&channel_len, ptr, 2);
+		ptr += 2;
+		memcpy(&payload_len, ptr, 2);
+		ptr += 2;
+		remaining -= 4;
+
+		/* Verify we have enough data */
+		if (remaining < channel_len + 1 + payload_len + 1)
+			break;
+
+		/* Extract channel and payload strings */
+		channel = ptr;
+		ptr += channel_len + 1;
+		payload = ptr;
+		ptr += payload_len + 1;
+		remaining -= (channel_len + 1 + payload_len + 1);
+
+		/* Deliver notification if we're listening on this channel */
+		if (IsListeningOn(channel))
+			NotifyMyFrontEnd(channel, payload, srcPid);
+	}
+
+	/* Clean up */
+	XLogReaderFree(xlogreader);
+}
+
+
+/*
+ * asyncQueueAddCompactEntry
+ *
+ * Add a compact entry to the notification SLRU queue containing only
+ * metadata (dbid, xid, notify_lsn) that points to the full notification 
+ * data in WAL. This is much more efficient than the old approach of
+ * storing complete notification content in the SLRU queue.
+ */
+void
+asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn)
+{
+	AsyncQueueEntry entry;
+	QueuePosition queue_head;
+	int64		pageno;
+	int			offset;
+	int			slotno;
+	LWLock	   *banklock;
+
+	/*
+	 * Fill in the compact entry with just the metadata.
+	 * No payload data is stored here - it's all in WAL.
+	 */
+	entry.dbid = dbid;
+	entry.xid = xid;
+	entry.notify_lsn = notify_lsn;
+
+	/* Caller should already hold NotifyQueueLock in exclusive mode */
+	queue_head = QUEUE_HEAD;
+
+	/*
+	 * Get the current page. If this is the first write since postmaster
+	 * started, initialize the first page.
+	 */
+	pageno = QUEUE_POS_PAGE(queue_head);
+	banklock = SimpleLruGetBankLock(NotifyCtl, pageno);
+
+	LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+	if (QUEUE_POS_IS_ZERO(queue_head))
+		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
+	else
+		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
+								   InvalidTransactionId);
+
+	/* Mark the page dirty before writing */
+	NotifyCtl->shared->page_dirty[slotno] = true;
+
+	offset = QUEUE_POS_OFFSET(queue_head);
+
+	/* Check if the compact entry fits on the current page */
+	if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE)
+	{
+		/* Copy the compact entry to the shared buffer */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &entry,
+			   sizeof(AsyncQueueEntry));
+
+		/* Advance queue head by the size of our compact entry */
+		if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry)))
+		{
+			/*
+			 * Page became full. Initialize the next page to ensure SLRU
+			 * consistency (similar to what asyncQueueAddEntries does).
+			 */
+			LWLock	   *nextlock;
+
+			pageno = QUEUE_POS_PAGE(queue_head);
+			nextlock = SimpleLruGetBankLock(NotifyCtl, pageno);
+			if (nextlock != banklock)
+			{
+				LWLockRelease(banklock);
+				LWLockAcquire(nextlock, LW_EXCLUSIVE);
+			}
+			SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
+			if (nextlock != banklock)
+			{
+				LWLockRelease(nextlock);
+				LWLockAcquire(banklock, LW_EXCLUSIVE);
+			}
+
+			/* Set cleanup flag if appropriate */
+			if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
+				tryAdvanceTail = true;
+		}
+
+		/* Update the global queue head */
+		QUEUE_HEAD = queue_head;
+	}
+	else
+	{
+		/*
+		 * Entry doesn't fit on current page. This should be very rare with
+		 * our small compact entries, but handle it by padding the page and
+		 * writing to the next page.
+		 */
+		AsyncQueueEntry padding;
+
+		memset(&padding, 0, sizeof(padding));
+		padding.dbid = InvalidOid;  /* Mark as padding */
+
+		/* Fill the rest of the page with padding */
+		memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
+			   &padding,
+			   QUEUE_PAGESIZE - offset);
+
+		/* Advance to next page */
+		asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset);
+
+		/* Recursively add the entry on the new page */
+		QUEUE_HEAD = queue_head;
+		LWLockRelease(banklock);
+		asyncQueueAddCompactEntry(dbid, xid, notify_lsn);
+		return;
+	}
+
+	LWLockRelease(banklock);
+}
+
 /*
  * Advance the shared queue tail variable to the minimum of all the
  * per-backend tail pointers.  Truncate pg_notify space if possible.
@@ -2395,3 +2330,62 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Write a WAL record containing async notification data
+ *
+ * This logs notification data to WAL, allowing us to release locks earlier
+ * and maintain commit ordering through WAL's natural ordering guarantees.
+ */
+XLogRecPtr
+LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+			   uint32 nnotifications, Size data_len, char *data)
+{
+	xl_async_notify_data xlrec;
+
+
+	xlrec.dbid = dboid;
+	xlrec.xid = xid;
+	xlrec.srcPid = srcPid;
+	xlrec.nnotifications = nnotifications;
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData);
+    XLogRegisterData(data, data_len);
+
+    (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA);
+
+    /* Return the begin LSN of the record we just inserted. */
+    return ProcLastRecPtr;
+}
+
+
+
+
+/*
+ * Redo function for async notification WAL records
+ *
+ * During recovery, we need to replay notification records. For now,
+ * we'll add them to the traditional notification queue. In a complete
+ * implementation, replaying backends would read directly from WAL.
+ */
+void
+async_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_ASYNC_NOTIFY_DATA:
+			/* 
+			 * For notification data records, we don't need to do anything
+			 * during recovery since listeners will read directly from WAL.
+			 * The data is already durably stored in the WAL record itself.
+			 */
+			break;
+
+
+		default:
+			elog(PANIC, "async_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 8f4b282c6b1..b35b007e51c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -19,6 +19,7 @@
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "fe_utils/archive.h"
 #include "filemap.h"
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index fac509ed134..03e73ae33c9 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -8,6 +8,7 @@
 #define FRONTEND 1
 #include "postgres.h"
 
+#include "access/async_xlog.h"
 #include "access/brin_xlog.h"
 #include "access/clog.h"
 #include "access/commit_ts.h"
@@ -23,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
+#include "commands/async.h"
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h
new file mode 100644
index 00000000000..d4c0c828e84
--- /dev/null
+++ b/src/include/access/async_xlog.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * async_xlog.h
+ *	  Async notification WAL definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/async_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ASYNC_XLOG_H
+#define ASYNC_XLOG_H
+
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+/*
+ * WAL record types for async notifications
+ */
+#define XLOG_ASYNC_NOTIFY_DATA	0x00	/* notification data */
+
+/*
+ * WAL record for notification data (written in PreCommit_Notify)
+ */
+typedef struct xl_async_notify_data
+{
+	Oid			dbid;			/* database ID */
+	TransactionId xid;			/* transaction ID */
+	int32		srcPid;			/* source backend PID */
+	uint32		nnotifications;	/* number of notifications */
+	/* followed by serialized notification data */
+} xl_async_notify_data;
+
+#define SizeOfAsyncNotifyData	(offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32))
+
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+#endif							/* ASYNC_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 8e7fc9db877..58293e05165 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..aa1e2733976 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -194,6 +194,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
 #define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
+#define XACT_XINFO_HAS_NOTIFY			(1U << 9)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -317,6 +318,11 @@ typedef struct xl_xact_origin
 	TimestampTz origin_timestamp;
 } xl_xact_origin;
 
+typedef struct xl_xact_notify
+{
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} xl_xact_notify;
+
 typedef struct xl_xact_commit
 {
 	TimestampTz xact_time;		/* time of commit */
@@ -330,6 +336,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
 	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
+	/* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */
 } xl_xact_commit;
 #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
@@ -403,6 +410,8 @@ typedef struct xl_xact_parsed_commit
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
+
+	XLogRecPtr	notify_lsn;		/* LSN of notification data */
 } xl_xact_parsed_commit;
 
 typedef xl_xact_parsed_commit xl_xact_parsed_prepare;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7e9f10cb84b 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -14,11 +14,25 @@
 #define ASYNC_H
 
 #include <signal.h>
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
+/*
+ * Compact SLRU queue entry - stores metadata pointing to WAL data
+ */
+typedef struct AsyncQueueEntry
+{
+	Oid			dbid;			/* database ID for quick filtering */
+	TransactionId	xid;			/* transaction ID */
+	XLogRecPtr	notify_lsn;		/* LSN of notification data in WAL */
+} AsyncQueueEntry;
+
+#define ASYNC_QUEUE_ENTRY_SIZE	sizeof(AsyncQueueEntry)
+
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
 
@@ -46,4 +60,15 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* WAL-based notification functions */
+extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid,
+									 uint32 nnotifications, Size data_len, char *data);
+extern void async_redo(XLogReaderState *record);
+extern void async_desc(StringInfo buf, XLogReaderState *record);
+extern const char *async_identify(uint8 info);
+
+/* notification queue functions */
+extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn);
+extern void SignalBackends(void);
+
 #endif							/* ASYNC_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c6f5ebceefd..71459fe5529 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -301,6 +301,9 @@ struct PGPROC
 
 	uint32		wait_event_info;	/* proc's wait information */
 
+	/* Support for async notifications */
+	XLogRecPtr	notifyCommitLsn;	/* LSN of notification data for current xact */
+
 	/* Support for group transaction status update. */
 	bool		clogGroupMember;	/* true, if member of clog group */
 	pg_atomic_uint32 clogGroupNext; /* next clog group member */
-- 
2.39.3 (Apple Git-145)

