From e5bd0959b756dd7e52ffcc1e0a7005ce27f9cabb Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sun, 28 Sep 2025 14:53:57 +0200
Subject: [PATCH] Fix LISTEN/NOTIFY so it scales with idle listening backends

Currently, idle listening backends cause a dramatic slowdown due to
context switching when they are signaled and wake up. This is wasteful
when they are not listening to the channel being notified.

Just 10 extra idle listening connections cause a slowdown from 8700 TPS
to 6100 TPS, 100 extra cause it to drop to 2000 TPS, and at 1000 extra
it falls to 250 TPS.

To improve scalability with the number of idle listening backends, this
patch introduces a shared hash table to keep track of channels per
listening backend. This hash table is partitioned to reduce contention
on concurrent LISTEN/UNLISTEN operations.

We keep track of up to NOTIFY_MULTICAST_THRESHOLD (16) listeners per
channel. Benchmarks indicated diminishing gains above this level.
Setting it lower seems unnecessary, so a constant seemed fine; a GUC did
not seem motivated.

This patch also adds a wakeup_pending flag to each backend's queue
status to avoid redundant signaling when a wakeup is already pending as
the backend is signaled again. The flag is set when a backend is
signaled and cleared before processing the queue. This order is
important to ensure correctness.

It was also necessary to add a new bgworker, notify_bgworker, whose sole
responsibility is to wake up lagging listening backends, ensuring they
are kicked when they are about to fall too far behind. This bgworker is
always started at postmaster startup, but is only activated upon NOTIFY
by signaling it, unless it is already active. The notify_bgworker
staggers the signaling of lagging listening backends by sleeping 100 ms
between each signal, to prevent the thundering herd problem we would
otherwise get if all listening backends woke up at the same time. It
loops until there are no more lagging listening backends, and then
becomes inactive.
---
 src/backend/commands/async.c                  | 882 +++++++++++++++++-
 src/backend/postmaster/Makefile               |   1 +
 src/backend/postmaster/bgworker.c             |   4 +
 src/backend/postmaster/meson.build            |   1 +
 src/backend/postmaster/notify_bgworker.c      | 225 +++++
 src/backend/postmaster/postmaster.c           |   6 +
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/postmaster/notify_bgworker.h      |  40 +
 src/include/storage/lwlocklist.h              |   1 +
 9 files changed, 1122 insertions(+), 39 deletions(-)
 create mode 100644 src/backend/postmaster/notify_bgworker.c
 create mode 100644 src/include/postmaster/notify_bgworker.h

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..fd32e207408 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,12 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  In addition to each backend maintaining its own list of channels, we also
+ *	  maintain a central hash table that tracks listeners for each channel, up
+ *	  to NOTIFY_MULTICAST_THRESHOLD. When the number of listeners is below
+ *	  this threshold, we can perform a targeted "multicast" by signaling only
+ *	  those specific backends. If the number of listeners reaches or exceeds the
+ *	  threshold, we fall back to signaling all listening backends in the database.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -71,13 +75,19 @@
  *	  make any actual updates to the effective listen state (listenChannels).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which sends PROCSIG_NOTIFY_INTERRUPT signals to
+ *	  listening backends, and has two modes of operation:
+ *	  a) Multicast mode: For channels with a number of listeners not exceeding
+ *	  NOTIFY_MULTICAST_THRESHOLD, signals are sent only to those specific
+ *	  backends.
+ *	  b) Broadcast mode: If any channel being notified has more listeners than
+ *	  the threshold (or if the hash table runs out of shared memory for
+ *	  new entries), we signal every listening backend in the database.
+ *
+ *	  After sending immediate signals, SignalBackends() also triggers a deferred
+ *	  wakeup background worker (if not already active) that handles waking up
+ *	  backends that have fallen behind by QUEUE_CLEANUP_DELAY or more pages,
+ *	  using staggered delays to prevent thundering herd effects.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +138,7 @@
 #include <limits.h>
 #include <unistd.h>
 #include <signal.h>
+#include <string.h>
 
 #include "access/parallel.h"
 #include "access/slru.h"
@@ -137,6 +148,7 @@
 #include "commands/async.h"
 #include "common/hashfn.h"
 #include "funcapi.h"
+#include "postmaster/notify_bgworker.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
@@ -146,6 +158,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -162,6 +175,79 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Maximum number of listeners to track per channel for multicast signaling.
+ * When the number of listeners on a channel exceeds this threshold, NOTIFY
+ * will signal all listening backends rather than just those listening on the
+ * specific channel. Setting to 0 disables multicast signaling entirely.
+ */
+#define NOTIFY_MULTICAST_THRESHOLD	16
+
+/*
+ * Number of partitions for the channel hash table's locks.
+ * This must be a power of two.
+ */
+#define NUM_NOTIFY_PARTITIONS 128
+
+/*
+ * Channel hash table definitions
+ *
+ * This hash table provides an optimization by tracking which backends are
+ * listening on each channel, up to a certain threshold. Channels are
+ * identified by database OID and channel name, making them
+ * database-specific.
+ *
+ * To improve scalability of concurrent LISTEN/UNLISTEN operations, the hash
+ * table is partitioned, with each partition protected by its own LWLock.
+ * This avoids serializing all operations on a single global lock.
+ *
+ * When the number of backends listening on a channel is at or below
+ * NOTIFY_MULTICAST_THRESHOLD, we store their ProcNumbers and signal them
+ * directly (multicast).
+ *
+ * We fall back to broadcast mode and signal all listening backends when:
+ * 1) More backends listen on the same channel than the threshold allows, OR
+ * 2) The hash table runs out of shared memory for new entries
+ *
+ * Note that CHANNEL_HASH_MAX_SIZE is not a hard limit - the hash table can
+ * store more entries than this, but performance will degrade due to bucket
+ * overflow. The actual fallback to broadcast mode occurs only when shared
+ * memory is exhausted and we cannot allocate new hash entries.
+ *
+ * The maximum size (CHANNEL_HASH_MAX_SIZE) is based on the typical OS port
+ * range. This provides a reasonable upper bound for systems that use
+ * per-connection channels.
+ *
+ */
+#define CHANNEL_HASH_INIT_SIZE		256
+#define CHANNEL_HASH_MAX_SIZE		65535
+
+/*
+ * Key structure for the channel hash table.
+ * Channels are database-specific, so we need both the database OID
+ * and the channel name to uniquely identify a channel.
+ */
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+}			ChannelHashKey;
+
+/*
+ * Each entry contains a channel key (database OID + channel name) and an array
+ * of listening backend ProcNumbers, up to NOTIFY_MULTICAST_THRESHOLD. If the
+ * number of listeners exceeds the threshold, we mark the channel for
+ * broadcast and stop tracking individual listeners.
+ */
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	bool		is_broadcast;	/* True if num_listeners >= threshold */
+	uint8		num_listeners;	/* Number of listeners currently stored */
+	/* Listeners array follows, of size NOTIFY_MULTICAST_THRESHOLD */
+	ProcNumber	listeners[FLEXIBLE_ARRAY_MEMBER];
+}			ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -227,8 +313,8 @@ typedef struct QueuePosition
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend needs to be behind before we'll
+ * decide we need to wake it up to advance its pointer.
  *
  * Resist the temptation to make this really large.  While that would save
  * work in some places, it would add cost in others.  In particular, this
@@ -246,6 +332,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeup_pending;
 } QueueBackendStatus;
 
 /*
@@ -269,6 +356,11 @@ typedef struct QueueBackendStatus
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
  * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
  *
+ * The channel hash table is protected by a separate set of partitioned
+ * locks. To prevent deadlocks between these and NotifyQueueLock, the global
+ * lock-ordering rule is: always acquire NotifyQueueLock *before* acquiring
+ * any channel hash partition lock.
+ *
  * Each backend uses the backend[] array entry with index equal to its
  * ProcNumber.  We rely on this to make SendProcSignal fast.
  *
@@ -288,11 +380,67 @@ typedef struct AsyncQueueControl
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	/* Deferred wakeup worker state */
+	bool		deferredWakeupWorkerActive; /* is worker processing? */
+	pid_t		deferredWakeupWorkerPid;	/* PID of worker for signaling */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
 
+/* Locks for partitioned channel hash table */
+static LWLock *channelHashLocks;
+
+/* Channel hash table for multicast signalling */
+static HTAB *channelHash = NULL;
+
+/* Forward declaration needed by GetChannelHash */
+static uint32 channel_hash_func(const void *key, Size keysize);
+
+/*
+ * GetChannelHash
+ *		Get the channel hash table, initializing our backend's pointer if needed.
+ *
+ * This must be called before any access to the channel hash table.
+ * The hash table itself is created in shared memory during AsyncShmemInit,
+ * but each backend needs to get its own pointer to it.
+ */
+static HTAB *
+GetChannelHash(void)
+{
+	if (channelHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+		Size		entrysize;
+
+		/*
+		 * Set up to attach to the existing shared hash table. The hash
+		 * control parameters must match those used in AsyncShmemInit.
+		 */
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+
+		/*
+		 * The size of a channel entry is flexible. We must have enough space
+		 * for the maximum number of listeners specified by the threshold.
+		 */
+		entrysize = add_size(offsetof(ChannelEntry, listeners),
+							 mul_size(NOTIFY_MULTICAST_THRESHOLD, sizeof(ProcNumber)));
+		hash_ctl.entrysize = entrysize;
+
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	return channelHash;
+}
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -301,6 +449,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeup_pending)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -458,6 +607,14 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+/* Channel hash table management functions */
+static LWLock *GetChannelHashLock(const char *channel);
+static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
+static ChannelEntry * ChannelHashLookup(const char *channel);
+static List *GetPendingNotifyChannels(void);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -485,6 +642,7 @@ Size
 AsyncShmemSize(void)
 {
 	Size		size;
+	Size		entrysize;
 
 	/* This had better match AsyncShmemInit */
 	size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
@@ -492,6 +650,18 @@ AsyncShmemSize(void)
 
 	size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
 
+	/*
+	 * The size of a channel entry is flexible. We must allocate enough space
+	 * for the maximum number of listeners specified by the threshold.
+	 */
+	entrysize = add_size(offsetof(ChannelEntry, listeners),
+						 mul_size(NOTIFY_MULTICAST_THRESHOLD, sizeof(ProcNumber)));
+	size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
+											 entrysize));
+
+	/* Space for channel hash partition locks */
+	size = add_size(size, mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock)));
+
 	return size;
 }
 
@@ -521,12 +691,15 @@ AsyncShmemInit(void)
 		QUEUE_STOP_PAGE = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
+		asyncQueueControl->deferredWakeupWorkerActive = false;
+		asyncQueueControl->deferredWakeupWorkerPid = 0;
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
 
@@ -546,6 +719,48 @@ AsyncShmemInit(void)
 		 */
 		(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
 	}
+
+	/*
+	 * Create or attach to the channel hash table.
+	 */
+	{
+		HASHCTL		hash_ctl;
+		Size		entrysize;
+
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+
+		/*
+		 * The size of a channel entry is flexible. We must have enough space
+		 * for the maximum number of listeners specified by the threshold.
+		 */
+		entrysize = add_size(offsetof(ChannelEntry, listeners),
+							 mul_size(NOTIFY_MULTICAST_THRESHOLD, sizeof(ProcNumber)));
+		hash_ctl.entrysize = entrysize;
+
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	/* Initialize locks for the partitioned hash table */
+	size = mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock));
+	channelHashLocks = (LWLock *)
+		ShmemInitStruct("Channel Hash Locks", size, &found);
+	if (!found)
+	{
+		/* First time through: initialize the locks */
+		for (int i = 0; i < NUM_NOTIFY_PARTITIONS; i++)
+		{
+			LWLockInitialize(&channelHashLocks[i],
+							 LWTRANCHE_NOTIFY_CHANNEL_HASH);
+		}
+	}
 }
 
 
@@ -1152,6 +1367,8 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	ChannelHashAddListener(channel, MyProcNumber);
 }
 
 /*
@@ -1175,6 +1392,7 @@ Exec_UnlistenCommit(const char *channel)
 		{
 			listenChannels = foreach_delete_current(listenChannels, q);
 			pfree(lchan);
+			ChannelHashRemoveListener(channel, MyProcNumber);
 			break;
 		}
 	}
@@ -1193,9 +1411,22 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	ListCell   *p;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
+	/*
+	 * Before freeing the local list, iterate through it and perform a
+	 * targeted removal for each of our channels from the shared hash table.
+	 */
+	foreach(p, listenChannels)
+	{
+		char	   *channel = (char *) lfirst(p);
+
+		ChannelHashRemoveListener(channel, MyProcNumber);
+	}
+
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
 }
@@ -1565,12 +1796,12 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * This function operates in two modes:
+ * 1. Multicast mode: If all pending notification channels have listeners at or
+ *    below NOTIFY_MULTICAST_THRESHOLD, we signal only those specific backends.
+ * 2. Broadcast mode: If any channel's listener count exceeds the threshold OR
+ *    the hash table lacks memory for new entries, we signal all listening
+ *    backends in our database.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1814,12 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	List	   *channels;
+	ListCell   *p;
+	bool	   *signaled;
+	bool		broadcast_mode = false;
+	bool		trigger_deferred_wakeup = false;
+	pid_t		deferred_wakeup_pid = 0;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1594,40 +1831,149 @@ SignalBackends(void)
 	 */
 	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
 	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+	signaled = (bool *) palloc0(MaxBackends * sizeof(bool));
 	count = 0;
 
+	/* Get list of channels that have pending notifications */
+	channels = GetPendingNotifyChannels();
+
+	/*
+	 * To prevent deadlocks, we must always acquire locks in the same order:
+	 * global NotifyQueueLock first, then individual partition locks.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+
+	/*
+	 * Determine if we can use targeted signaling or must broadcast. This
+	 * check must be done while holding NotifyQueueLock to prevent deadlocks
+	 * against other backends that might be modifying the listener list and
+	 * hash table simultaneously (e.g., asyncQueueUnregister).
+	 */
+	foreach(p, channels)
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
-		QueuePosition pos;
+		char	   *channel = (char *) lfirst(p);
+		ChannelEntry *entry;
+		LWLock	   *lock = GetChannelHashLock(channel);
+
+		LWLockAcquire(lock, LW_SHARED);
+		entry = ChannelHashLookup(channel);
 
-		Assert(pid != InvalidPid);
-		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+		/*
+		 * If there is no entry, it could mean we ran out of shared memory
+		 * when trying to add this channel to the hash table. If the entry is
+		 * marked for broadcast, we must use broadcast mode.
+		 */
+		if (!entry || entry->is_broadcast)
+		{
+			broadcast_mode = true;
+			LWLockRelease(lock);
+			break;
+		}
+		LWLockRelease(lock);
+	}
+
+	if (broadcast_mode)
+	{
+		/*
+		 * In broadcast mode, we iterate over all listening backends and
+		 * signal the ones in our database that are not already caught up.
+		 */
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 		{
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
 			/*
 			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
+			 * already caught up.
 			 */
 			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
 				continue;
+
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			/* OK, need to signal this one */
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			pids[count] = pid;
+			procnos[count] = i;
+			signaled[i] = true;
+			count++;
 		}
-		else
+	}
+	else
+	{
+		/*
+		 * In multicast mode, signal specific listening backends. We must
+		 * re-check the hash entries here inside the lock to avoid races.
+		 */
+		foreach(p, channels)
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
+			char	   *channel = (char *) lfirst(p);
+			ChannelEntry *entry;
+			LWLock	   *lock = GetChannelHashLock(channel);
+
+			LWLockAcquire(lock, LW_SHARED);
+			entry = ChannelHashLookup(channel);
+
+			if (entry && !entry->is_broadcast)
+			{
+				for (int j = 0; j < entry->num_listeners; j++)
+				{
+					ProcNumber	i = entry->listeners[j];
+					int32		pid;
+					QueuePosition pos;
+
+					if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+						continue;
+
+					if (signaled[i])
+						continue;
+
+					pos = QUEUE_BACKEND_POS(i);
+
+					if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+						continue;
+
+					if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+						continue;
+
+					pid = QUEUE_BACKEND_PID(i);
+					Assert(pid != InvalidPid);
+
+					/* OK, need to signal this one */
+					QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+					pids[count] = pid;
+					procnos[count] = i;
+					signaled[i] = true;
+					count++;
+				}
+			}
+			LWLockRelease(lock);
 		}
-		/* OK, need to signal this one */
-		pids[count] = pid;
-		procnos[count] = i;
-		count++;
 	}
+
+	/*
+	 * Check if we should trigger the deferred wakeup worker after we're done
+	 * sending immediate signals. We do this check while still holding the
+	 * lock to avoid needing to reacquire it later.
+	 */
+	if (!asyncQueueControl->deferredWakeupWorkerActive &&
+		asyncQueueControl->deferredWakeupWorkerPid != 0)
+	{
+		asyncQueueControl->deferredWakeupWorkerActive = true;
+		trigger_deferred_wakeup = true;
+		deferred_wakeup_pid = asyncQueueControl->deferredWakeupWorkerPid;
+	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
@@ -1647,9 +1993,9 @@ SignalBackends(void)
 
 		/*
 		 * Note: assuming things aren't broken, a signal failure here could
-		 * only occur if the target backend exited since we released
-		 * NotifyQueueLock; which is unlikely but certainly possible. So we
-		 * just log a low-level debug message if it happens.
+		 * only occur if the target backend exited since we released the lock;
+		 * which is unlikely but certainly possible. So we just log a
+		 * low-level debug message if it happens.
 		 */
 		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
 			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
@@ -1657,6 +2003,25 @@ SignalBackends(void)
 
 	pfree(pids);
 	pfree(procnos);
+	pfree(signaled);
+
+	/*
+	 * Trigger the deferred wakeup worker if needed. The worker will check for
+	 * lagging backends and wake them up with staggered delays.
+	 */
+	if (trigger_deferred_wakeup)
+	{
+		if (kill(deferred_wakeup_pid, SIGUSR1) < 0)
+		{
+			/* Worker might have died, clear the flags */
+			elog(WARNING, "could not signal deferred wakeup worker: %m");
+
+			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+			asyncQueueControl->deferredWakeupWorkerActive = false;
+			asyncQueueControl->deferredWakeupWorkerPid = 0;
+			LWLockRelease(NotifyQueueLock);
+		}
+	}
 }
 
 /*
@@ -1865,6 +2230,7 @@ asyncQueueReadAllNotifications(void)
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2395,3 +2761,441 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ * Channel hash table management functions
+ */
+
+/*
+ * channel_hash_func
+ *     Custom hash function for the channel hash table. This function ensures
+ *     that the low-order bits of the hash are well-distributed, which is
+ *     critical for partitioned hash tables.
+ */
+static uint32
+channel_hash_func(const void *key, Size keysize)
+{
+	const		ChannelHashKey *k = (const ChannelHashKey *) key;
+	uint32		h;
+
+	/*
+	 * Mix the dboid and the channel name to produce a good hash. hash_any()
+	 * is a high-quality portable hash function. This prevents channels with
+	 * the same name in different databases from always mapping to the same
+	 * partition.
+	 */
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/*
+ * GetChannelHashLock
+ *     Return the LWLock that protects the partition for the given channel name.
+ */
+static LWLock *
+GetChannelHashLock(const char *channel)
+{
+	ChannelHashKey key;
+	uint32		hash;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+	hash = get_hash_value(GetChannelHash(), &key);
+
+	return &channelHashLocks[hash % NUM_NOTIFY_PARTITIONS];
+}
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key (database OID + channel name) for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ *     Register the given backend as a listener for the specified channel.
+ *
+ * This function uses an optimistic read-locking strategy to maximize
+ * concurrency. An exclusive lock is only taken when mutating the listener
+ * list.
+ *
+ * 1. It first takes a shared lock. If the channel is already in broadcast
+ *    mode, or if the current backend is already in the listener list, no write
+ *    is needed and we can return immediately.
+ *
+ * 2. If a write is needed, it releases the shared lock and acquires an
+ *    exclusive lock.
+ *
+ * 3. CRUCIALLY, after acquiring the exclusive lock, it must re-check the
+ *    state, as another backend may have modified the entry in the interim.
+ *
+ * 4. If the number of listeners is below NOTIFY_MULTICAST_THRESHOLD, the
+ *    new listener is added. If the threshold is reached, the channel is
+ *    converted to broadcast mode.
+ */
+static void
+ChannelHashAddListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	bool		found;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+
+	/*
+	 * If the threshold is zero, this optimization is disabled. All channels
+	 * immediately use broadcast, so we don't need to track them.
+	 */
+	if (NOTIFY_MULTICAST_THRESHOLD <= 0)
+		return;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * FAST PATH: Optimistically take a shared lock. If the channel is already
+	 * in broadcast mode, or if we are already listed, we are done.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry)
+	{
+		if (entry->is_broadcast)
+		{
+			LWLockRelease(lock);
+			return;
+		}
+		/* Check if we are already in the list */
+		for (int i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				LWLockRelease(lock);
+				return;
+			}
+		}
+	}
+	LWLockRelease(lock);
+
+	/*
+	 * SLOW PATH: We need to write. Acquire exclusive lock.
+	 */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check state after acquiring exclusive lock, as it may have changed.
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_ENTER_NULL, &found);
+
+	if (entry == NULL)
+	{
+		/* Out of memory in the hash partition. */
+		ereport(DEBUG1, (errmsg("too many notification channels are already being tracked")));
+		LWLockRelease(lock);
+		return;
+	}
+
+	if (!found)
+	{
+		/* First listener for this channel. */
+		entry->is_broadcast = false;
+		entry->num_listeners = 1;
+		entry->listeners[0] = procno;
+	}
+	else
+	{
+		/* Entry already exists, re-check everything. */
+		bool		already_present = false;
+
+		if (entry->is_broadcast)
+		{
+			/* Another backend set it to broadcast mode. We're done. */
+			LWLockRelease(lock);
+			return;
+		}
+
+		for (int i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				already_present = true;
+				break;
+			}
+		}
+
+		if (!already_present)
+		{
+			if (entry->num_listeners < NOTIFY_MULTICAST_THRESHOLD)
+			{
+				/* Add ourselves to the list of listeners. */
+				entry->listeners[entry->num_listeners] = procno;
+				entry->num_listeners++;
+			}
+			else
+			{
+				/* We are the listener that exceeds the threshold. */
+				entry->is_broadcast = true;
+				entry->num_listeners = 0;	/* Clear the list */
+			}
+		}
+	}
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashRemoveListener
+ *		Update the channel hash when a backend stops listening on a channel.
+ *
+ * This function uses an optimistic read-lock strategy. An exclusive lock is
+ * only taken if we are in the listener list for a channel and need to remove
+ * ourselves. If a channel is in broadcast mode, we cannot safely modify it,
+ * as we can't know which backends are listening.
+ */
+static void
+ChannelHashRemoveListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+	bool		present = false;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * Take a shared lock first to see if a removal is even possible. If the
+	 * entry doesn't exist, is in broadcast mode, or we're not in its list, we
+	 * have nothing to do. This is the fast path.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (!entry || entry->is_broadcast)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+
+	/* Check if we are in the list */
+	for (int i = 0; i < entry->num_listeners; i++)
+	{
+		if (entry->listeners[i] == procno)
+		{
+			present = true;
+			break;
+		}
+	}
+	if (!present)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+	LWLockRelease(lock);
+
+	/* A removal is likely needed. Acquire an exclusive lock. */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check the state. Another backend might have changed it (e.g., to
+	 * broadcast mode).
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry && !entry->is_broadcast)
+	{
+		int			i;
+
+		for (i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				/*
+				 * Found our procno. Remove it from the listener array.
+				 *
+				 * If this is the last listener, we remove the entire hash
+				 * entry for the channel.
+				 */
+				if (entry->num_listeners == 1)
+				{
+					(void) hash_search(GetChannelHash(), &key, HASH_REMOVE, NULL);
+				}
+				else
+				{
+					/*
+					 * To remove an element from the array while keeping it
+					 * contiguous, we first decrement the listener count.
+					 * Then, we shift all subsequent elements one position to
+					 * the left, overwriting the element we want to remove.
+					 *
+					 * The `if (i < entry->num_listeners)` condition
+					 * explicitly handles the case where the last element in
+					 * the array is being removed. In that scenario, `i`
+					 * equals the new `num_listeners`, so no memory movement
+					 * is necessary, and the `memmove` is correctly skipped.
+					 */
+					entry->num_listeners--;
+					if (i < entry->num_listeners)
+					{
+						Size		size_to_move;
+
+						size_to_move = mul_size(entry->num_listeners - i,
+												sizeof(ProcNumber));
+						memmove(&entry->listeners[i],
+								&entry->listeners[i + 1],
+								size_to_move);
+					}
+				}
+				break;			/* Found and removed, exit loop. */
+			}
+		}
+	}
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashLookup
+ *		Look up the channel hash entry for the given channel name in the
+ *		current database.
+ *
+ * Returns NULL if no hash entry exists for the channel. When an entry exists,
+ * the caller should check the is_broadcast field to determine if individual
+ * listeners are being tracked or if the channel uses broadcast mode.
+ *
+ * Caller must hold the appropriate partition lock (shared is sufficient).
+ */
+static ChannelEntry *
+ChannelHashLookup(const char *channel)
+{
+	ChannelHashKey key;
+
+	Assert(LWLockHeldByMe(GetChannelHashLock(channel)));
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	return (ChannelEntry *) hash_search(GetChannelHash(),
+										&key,
+										HASH_FIND,
+										NULL);
+}
+
+/*
+ * GetPendingNotifyChannels
+ *		Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+	List	   *channels = NIL;
+	ListCell   *p;
+	ListCell   *q;
+	bool		found;
+
+	if (!pendingNotifies)
+		return NIL;
+
+	/* Collect unique channel names from pending notifications */
+	foreach(p, pendingNotifies->events)
+	{
+		Notification *n = (Notification *) lfirst(p);
+		char	   *channel = n->data;
+
+		/* Check if we already have this channel in our list */
+		found = false;
+		foreach(q, channels)
+		{
+			char	   *existing = (char *) lfirst(q);
+
+			if (strcmp(existing, channel) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		if (!found)
+			channels = lappend(channels, channel);
+	}
+
+	return channels;
+}
+
+/*
+ * AsyncDeferredWakeupSetWorkerPid
+ *		Store the PID of the deferred wakeup worker in shared memory
+ */
+void
+AsyncDeferredWakeupSetWorkerPid(pid_t pid)
+{
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+	asyncQueueControl->deferredWakeupWorkerPid = pid;
+	LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * AsyncDeferredWakeupClearActive
+ *		Clear the active flag for the deferred wakeup worker
+ */
+void
+AsyncDeferredWakeupClearActive(void)
+{
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+	asyncQueueControl->deferredWakeupWorkerActive = false;
+	LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * AsyncGetLaggingBackends
+ *		Get list of lagging listening backends that need to be woken up
+ *
+ * Returns a list of BackendWakeupInfo structs. The caller is responsible
+ * for freeing the list and its contents.
+ */
+List *
+AsyncGetLaggingBackends(void)
+{
+	List	   *lagging_backends = NIL;
+	QueuePosition head;
+
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	head = QUEUE_HEAD;
+
+	/* Iterate through all listening backends */
+	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	{
+		QueuePosition pos;
+		int64		pageDiff;
+
+		/* Skip if wakeup is already pending */
+		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+			continue;
+
+		pos = QUEUE_BACKEND_POS(i);
+
+		/* Calculate how far behind this backend is */
+		pageDiff = asyncQueuePageDiff(QUEUE_POS_PAGE(head), QUEUE_POS_PAGE(pos));
+
+		/* If backend is lagging by QUEUE_CLEANUP_DELAY or more pages */
+		if (pageDiff >= QUEUE_CLEANUP_DELAY)
+		{
+			BackendWakeupInfo *info;
+
+			info = (BackendWakeupInfo *) palloc(sizeof(BackendWakeupInfo));
+			info->pid = QUEUE_BACKEND_PID(i);
+			info->procno = i;
+
+			/* Mark as having wakeup pending */
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+
+			lagging_backends = lappend(lagging_backends, info);
+		}
+	}
+
+	LWLockRelease(NotifyQueueLock);
+
+	return lagging_backends;
+}
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 0f4435d2d97..2ac4f3fd524 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -21,6 +21,7 @@ OBJS = \
 	fork_process.o \
 	interrupt.o \
 	launch_backend.o \
+	notify_bgworker.o \
 	pgarch.o \
 	pmchild.o \
 	postmaster.o \
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 1ad65c237c3..0946065895a 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -18,6 +18,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgworker_internals.h"
+#include "postmaster/notify_bgworker.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
@@ -132,6 +133,9 @@ static const struct
 	},
 	{
 		"TablesyncWorkerMain", TablesyncWorkerMain
+	},
+	{
+		"NotifyDeferredWakeupMain", NotifyDeferredWakeupMain
 	}
 };
 
diff --git a/src/backend/postmaster/meson.build b/src/backend/postmaster/meson.build
index 0008603cfee..c9d285570ae 100644
--- a/src/backend/postmaster/meson.build
+++ b/src/backend/postmaster/meson.build
@@ -9,6 +9,7 @@ backend_sources += files(
   'fork_process.c',
   'interrupt.c',
   'launch_backend.c',
+  'notify_bgworker.c',
   'pgarch.c',
   'pmchild.c',
   'postmaster.c',
diff --git a/src/backend/postmaster/notify_bgworker.c b/src/backend/postmaster/notify_bgworker.c
new file mode 100644
index 00000000000..f0c5514cff7
--- /dev/null
+++ b/src/backend/postmaster/notify_bgworker.c
@@ -0,0 +1,225 @@
+/*-------------------------------------------------------------------------
+ *
+ * notify_bgworker.c
+ *	  Background worker for deferred wakeup of lagging LISTEN/NOTIFY backends
+ *
+ * This background worker is responsible for performing staggered wakeup of
+ * listening backends that have fallen behind in processing the notification
+ * queue. It runs continuously but only performs work when signaled by the
+ * main NOTIFY mechanism.
+ *
+ * The worker is triggered when SignalBackends() in async.c determines that
+ * there are lagging backends that need to be woken up. The worker then
+ * performs a staggered wakeup with delays between signals to avoid
+ * thundering herd effects.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/postmaster/notify_bgworker.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "access/parallel.h"
+#include "commands/async.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/notify_bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/shm_toc.h"
+#include "storage/shmem.h"
+#include "tcop/tcopprot.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+
+/* Configuration constants */
+#define NOTIFY_DEFERRED_WAKEUP_DELAY_MS 100 /* milliseconds between signals */
+
+/* Flag to indicate SIGUSR1 was received */
+static volatile sig_atomic_t got_sigusr1 = false;
+
+/* Forward declaration */
+static void ProcessDeferredWakeups(void);
+
+/* Signal handler for SIGUSR1 */
+static void
+notify_bgworker_sigusr1(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	got_sigusr1 = true;
+	SetLatch(MyLatch);
+
+	errno = save_errno;
+}
+
+/*
+ * NotifyDeferredWakeupMain
+ *		Main entry point for the notify deferred wakeup background worker
+ */
+void
+NotifyDeferredWakeupMain(Datum main_arg)
+{
+	/* Establish signal handlers */
+	pqsignal(SIGUSR1, notify_bgworker_sigusr1);
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/* Store our PID in shared memory for signaling */
+	AsyncDeferredWakeupSetWorkerPid(MyProcPid);
+
+	ereport(LOG,
+			(errmsg("notify deferred wakeup worker started")));
+
+	/* Main loop */
+	for (;;)
+	{
+		int			rc;
+
+		/* Check for interrupts */
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Wait for signal to wake up. We use WL_LATCH_SET to wake on our
+		 * latch being set, and WL_EXIT_ON_PM_DEATH to ensure we exit if the
+		 * postmaster dies.
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+					   -1,
+					   WAIT_EVENT_NOTIFY_DEFERRED_WAKEUP);
+
+		ResetLatch(MyLatch);
+
+		/* Emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+
+		/* Process deferred wakeups if we were signaled */
+		if (got_sigusr1)
+		{
+			got_sigusr1 = false;
+			ProcessDeferredWakeups();
+		}
+	}
+}
+
+/*
+ * ProcessDeferredWakeups
+ *		Wake up lagging listening backends with staggered delays
+ *
+ * This function continues processing until there are no more lagging
+ * backends, ensuring all backends eventually get woken up.
+ */
+static void
+ProcessDeferredWakeups(void)
+{
+	int			total_wakeup_count = 0;
+
+	/*
+	 * Continue processing until there are no more lagging backends. This
+	 * ensures we handle all backends that need waking up, even if new ones
+	 * become lagging while we're processing.
+	 */
+	for (;;)
+	{
+		List	   *lagging_backends;
+		ListCell   *lc;
+		int			wakeup_count = 0;
+
+		/*
+		 * Build list of lagging backends while holding the lock. We need to
+		 * be quick here to avoid holding the lock for too long.
+		 */
+		lagging_backends = AsyncGetLaggingBackends();
+
+		if (lagging_backends == NIL)
+		{
+			/* No more lagging backends, we're done */
+			break;
+		}
+
+		/* Now perform the staggered wakeup without holding the lock */
+		foreach(lc, lagging_backends)
+		{
+			BackendWakeupInfo *info = (BackendWakeupInfo *) lfirst(lc);
+
+			/* Send signal to the backend */
+			if (SendProcSignal(info->pid, PROCSIG_NOTIFY_INTERRUPT, info->procno) < 0)
+			{
+				/* Backend might have exited, just log and continue */
+				elog(WARNING, "could not signal backend with PID %d: %m", info->pid);
+			}
+			else
+			{
+				wakeup_count++;
+				total_wakeup_count++;
+			}
+
+			pfree(info);
+
+			/* Sleep between signals to avoid thundering herd */
+			if (lnext(lagging_backends, lc) != NULL)
+			{
+				pg_usleep(NOTIFY_DEFERRED_WAKEUP_DELAY_MS * 1000L);
+
+				/* Check for interrupts between wakeups */
+				CHECK_FOR_INTERRUPTS();
+			}
+		}
+
+		list_free(lagging_backends);
+
+		if (wakeup_count > 0)
+		{
+			ereport(DEBUG1,
+					(errmsg("notify deferred wakeup worker signaled %d lagging backends in this round",
+							wakeup_count)));
+		}
+	}
+
+	if (total_wakeup_count > 0)
+	{
+		ereport(DEBUG1,
+				(errmsg("notify deferred wakeup worker signaled %d lagging backends total",
+						total_wakeup_count)));
+	}
+
+	/* Clear the active flag to indicate we're done */
+	AsyncDeferredWakeupClearActive();
+}
+
+/*
+ * NotifyDeferredWakeupWorkerRegister
+ *		Register the notify deferred wakeup background worker
+ */
+void
+NotifyDeferredWakeupWorkerRegister(void)
+{
+	BackgroundWorker worker;
+
+	memset(&worker, 0, sizeof(BackgroundWorker));
+	snprintf(worker.bgw_name, BGW_MAXLEN, "notify deferred wakeup");
+	snprintf(worker.bgw_type, BGW_MAXLEN, "notify deferred wakeup");
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+	worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	worker.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL;
+	snprintf(worker.bgw_library_name, MAXPGPATH, "postgres");
+	snprintf(worker.bgw_function_name, BGW_MAXLEN, "NotifyDeferredWakeupMain");
+	worker.bgw_main_arg = (Datum) 0;
+	worker.bgw_notify_pid = 0;
+
+	RegisterBackgroundWorker(&worker);
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index e1d643b013d..954c3b371c2 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -102,6 +102,7 @@
 #include "port/pg_bswap.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
+#include "postmaster/notify_bgworker.h"
 #include "postmaster/pgarch.h"
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
@@ -929,6 +930,11 @@ PostmasterMain(int argc, char *argv[])
 	 */
 	ApplyLauncherRegister();
 
+	/*
+	 * Register the notify deferred wakeup worker.
+	 */
+	NotifyDeferredWakeupWorkerRegister();
+
 	/*
 	 * process any libraries that should be preloaded at postmaster start
 	 */
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..a4fadbd0767 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -366,6 +366,7 @@ SubtransBuffer	"Waiting for I/O on a sub-transaction SLRU buffer."
 MultiXactOffsetBuffer	"Waiting for I/O on a multixact offset SLRU buffer."
 MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash	"Waiting to access the <command>NOTIFY</command> channel hash table."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
 BufferContent	"Waiting to access a data page in memory."
diff --git a/src/include/postmaster/notify_bgworker.h b/src/include/postmaster/notify_bgworker.h
new file mode 100644
index 00000000000..5d8b98b82a6
--- /dev/null
+++ b/src/include/postmaster/notify_bgworker.h
@@ -0,0 +1,40 @@
+/*-------------------------------------------------------------------------
+ *
+ * notify_bgworker.h
+ *	  Deferred wakeup background worker for LISTEN/NOTIFY
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/notify_bgworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NOTIFY_BGWORKER_H
+#define NOTIFY_BGWORKER_H
+
+#include "storage/proc.h"
+
+/* Structure to hold information about a backend that needs to be woken up */
+typedef struct BackendWakeupInfo
+{
+	int32		pid;
+	ProcNumber	procno;
+}			BackendWakeupInfo;
+
+/* Wait event for the notify deferred wakeup worker */
+#define WAIT_EVENT_NOTIFY_DEFERRED_WAKEUP	PG_WAIT_EXTENSION
+
+
+/* Main entry point for the background worker */
+extern void NotifyDeferredWakeupMain(Datum main_arg);
+
+/* Registration function */
+extern void NotifyDeferredWakeupWorkerRegister(void);
+
+/* Functions to be implemented in async.c for worker interaction */
+extern void AsyncDeferredWakeupSetWorkerPid(pid_t pid);
+extern void AsyncDeferredWakeupClearActive(void);
+extern List *AsyncGetLaggingBackends(void);
+
+#endif							/* NOTIFY_BGWORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..2768ddf4414 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -100,6 +100,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
 PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
-- 
2.50.1

