From 32f2b6818169381f2795e7c3264bb3710e9f6eae Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sun, 15 Jun 2025 00:09:43 +0200
Subject: [PATCH] Improve NOTIFY scalability with multicast signaling

Previously, NOTIFY would signal all listening backends in a database for
any channel with more than one listener. This broadcast approach scales
poorly for workloads that rely on targeted notifications to small groups
of backends, as every NOTIFY could wake up many unrelated processes.

This commit introduces a multicast signaling optimization to improve
scalability for such use-cases. A new GUC, `notify_multicast_threshold`,
is added to control the maximum number of listeners to track per
channel. When a NOTIFY is issued, if the number of listeners is at or
below this threshold, only those specific backends are signaled. If the
limit is exceeded, the system falls back to the original broadcast
behavior.

The default for this threshold is set to 16. Benchmarks show this
provides a good balance, with significant performance gains for small to
medium-sized listener groups and diminishing returns for higher values.
Setting the threshold to 0 disables multicast signaling, forcing a
fallback to the broadcast path for all notifications.

To implement this, a new partitioned hash table is introduced in shared
memory to track listeners. Locking is managed with an optimistic
read-then-upgrade pattern. This allows concurrent LISTEN/UNLISTEN
operations on *different* channels to proceed in parallel, as they will
only acquire locks on their respective partitions.

For correctness and to prevent deadlocks, a strict lock ordering
hierarchy (NotifyQueueLock before any partition lock) is observed. The
signaling path in NOTIFY must acquire the global NotifyQueueLock first
before consulting the partitioned hash table, which serializes
concurrent NOTIFYs. The primary concurrency win is for LISTEN/UNLISTEN
operations, which are now much more scalable.

The "wake only tail" optimization, which signals backends that are far
behind in the queue, is also included to ensure the global queue tail
can always advance.

Thanks to Rishu Bagga for the multicast idea.
---
 src/backend/commands/async.c        | 825 ++++++++++++++++++++++++++--
 src/backend/utils/init/globals.c    |   1 +
 src/backend/utils/misc/guc_tables.c |  13 +
 src/include/miscadmin.h             |   1 +
 src/include/utils/guc_hooks.h       |   1 +
 5 files changed, 808 insertions(+), 33 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..56a74b707fc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,13 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  In addition to each backend maintaining its own list of channels, we also
+ *	  maintain a central hash table that tracks listeners for each channel, up
+ *	  to a configurable threshold ('notify_multicast_threshold'). When the
+ *	  number of listeners is within this threshold, we can perform a targeted
+ *	  "multicast" by signaling only those specific backends. If the number of
+ *	  listeners exceeds the threshold, we fall back to the original broadcast
+ *	  behavior of signaling all listening backends in the database.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -71,13 +76,17 @@
  *	  make any actual updates to the effective listen state (listenChannels).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which has two modes of operation:
+ *	  a) Multicast mode: For channels with a number of listeners not exceeding
+ *	  'notify_multicast_threshold', signals are sent only to those specific
+ *	  backends.
+ *	  b) Broadcast mode: If any channel being notified has more listeners than
+ *	  the threshold, we revert to the original behavior and send a
+ *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend in the database.
+ *	  Additionally, we use a "wake only tail" optimization: we always signal
+ *	  the backend furthest behind in the queue to help prevent backends from
+ *	  getting far behind and create a chain reaction of wake-ups.
+ *	  We can exclude backends that are already up to date, though.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +137,7 @@
 #include <limits.h>
 #include <unistd.h>
 #include <signal.h>
+#include <string.h>
 
 #include "access/parallel.h"
 #include "access/slru.h"
@@ -146,6 +156,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -162,6 +173,71 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Number of partitions for the channel hash table's locks.
+ * This must be a power of two.
+ */
+#define NUM_NOTIFY_PARTITIONS 128
+
+/*
+ * Channel hash table definitions
+ *
+ * This hash table provides an optimization by tracking which backends are
+ * listening on each channel, up to a certain threshold. Channels are
+ * identified by database OID and channel name, making them
+ * database-specific.
+ *
+ * To improve scalability of concurrent LISTEN/UNLISTEN operations, the hash
+ * table is partitioned, with each partition protected by its own LWLock.
+ * This avoids serializing all operations on a single global lock.
+ *
+ * When the number of backends listening on a channel is at or below
+ * 'notify_multicast_threshold', we store their ProcNumbers and signal them
+ * directly (multicast).
+ *
+ * We fall back to broadcast mode and signal all listening backends when:
+ * 1) More backends listen on the same channel than the threshold allows, OR
+ * 2) The hash table runs out of shared memory for new entries
+ *
+ * Note that CHANNEL_HASH_MAX_SIZE is not a hard limit - the hash table can
+ * store more entries than this, but performance will degrade due to bucket
+ * overflow. The actual fallback to broadcast mode occurs only when shared
+ * memory is exhausted and we cannot allocate new hash entries.
+ *
+ * The maximum size (CHANNEL_HASH_MAX_SIZE) is based on the typical OS port
+ * range. This provides a reasonable upper bound for systems that use
+ * per-connection channels.
+ *
+ */
+#define CHANNEL_HASH_INIT_SIZE		256
+#define CHANNEL_HASH_MAX_SIZE		65535
+
+/*
+ * Key structure for the channel hash table.
+ * Channels are database-specific, so we need both the database OID
+ * and the channel name to uniquely identify a channel.
+ */
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+}			ChannelHashKey;
+
+/*
+ * Each entry contains a channel key (database OID + channel name) and an array
+ * of listening backend ProcNumbers, up to notify_multicast_threshold. If the
+ * number of listeners exceeds the threshold, we mark the channel for
+ * broadcast and stop tracking individual listeners.
+ */
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	bool		is_broadcast;	/* True if num_listeners > threshold */
+	uint8		num_listeners;	/* Number of listeners currently stored */
+	/* Listeners array follows, of size notify_multicast_threshold */
+	ProcNumber	listeners[FLEXIBLE_ARRAY_MEMBER];
+}			ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -269,6 +345,11 @@ typedef struct QueueBackendStatus
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
  * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
  *
+ * The channel hash table is protected by a separate set of partitioned
+ * locks. To prevent deadlocks between these and NotifyQueueLock, the global
+ * lock-ordering rule is: always acquire NotifyQueueLock *before* acquiring
+ * any channel hash partition lock.
+ *
  * Each backend uses the backend[] array entry with index equal to its
  * ProcNumber.  We rely on this to make SendProcSignal fast.
  *
@@ -293,6 +374,69 @@ typedef struct AsyncQueueControl
 
 static AsyncQueueControl *asyncQueueControl;
 
+/* Locks for partitioned channel hash table */
+static LWLock *channelHashLocks;
+static int	channelHashTrancheId = 0;
+
+/* Structure to hold channel hash locks and tranche ID in shared memory */
+typedef struct ChannelHashLockData
+{
+	int			trancheId;
+	LWLock		locks[FLEXIBLE_ARRAY_MEMBER];
+}			ChannelHashLockData;
+
+static ChannelHashLockData * channelHashLockData;
+
+/* Channel hash table for multicast signalling */
+static HTAB *channelHash = NULL;
+
+/* Forward declaration needed by GetChannelHash */
+static uint32 channel_hash_func(const void *key, Size keysize);
+
+/*
+ * GetChannelHash
+ *		Get the channel hash table, initializing our backend's pointer if needed.
+ *
+ * This must be called before any access to the channel hash table.
+ * The hash table itself is created in shared memory during AsyncShmemInit,
+ * but each backend needs to get its own pointer to it.
+ */
+static HTAB *
+GetChannelHash(void)
+{
+	if (channelHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+		Size		entrysize;
+
+		/*
+		 * Set up to attach to the existing shared hash table. The hash
+		 * control parameters must match those used in AsyncShmemInit.
+		 */
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+
+		/*
+		 * The size of a channel entry is flexible. We must have enough space
+		 * for the maximum number of listeners specified by the threshold.
+		 */
+		entrysize = add_size(offsetof(ChannelEntry, listeners),
+							 mul_size(notify_multicast_threshold, sizeof(ProcNumber)));
+		hash_ctl.entrysize = entrysize;
+
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	return channelHash;
+}
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -458,6 +602,14 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+/* Channel hash table management functions */
+static LWLock *GetChannelHashLock(const char *channel);
+static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
+static ChannelEntry * ChannelHashLookup(const char *channel);
+static List *GetPendingNotifyChannels(void);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -485,6 +637,7 @@ Size
 AsyncShmemSize(void)
 {
 	Size		size;
+	Size		entrysize;
 
 	/* This had better match AsyncShmemInit */
 	size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
@@ -492,6 +645,18 @@ AsyncShmemSize(void)
 
 	size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
 
+	/*
+	 * The size of a channel entry is flexible. We must allocate enough space
+	 * for the maximum number of listeners specified by the threshold.
+	 */
+	entrysize = add_size(offsetof(ChannelEntry, listeners),
+						 mul_size(notify_multicast_threshold, sizeof(ProcNumber)));
+	size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
+											 entrysize));
+
+	size = add_size(size, offsetof(ChannelHashLockData, locks) +
+					mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock)));
+
 	return size;
 }
 
@@ -546,6 +711,58 @@ AsyncShmemInit(void)
 		 */
 		(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
 	}
+
+	/*
+	 * Create or attach to the channel hash table.
+	 */
+	{
+		HASHCTL		hash_ctl;
+		Size		entrysize;
+
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+
+		/*
+		 * The size of a channel entry is flexible. We must have enough space
+		 * for the maximum number of listeners specified by the threshold.
+		 */
+		entrysize = add_size(offsetof(ChannelEntry, listeners),
+							 mul_size(notify_multicast_threshold, sizeof(ProcNumber)));
+		hash_ctl.entrysize = entrysize;
+
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	/* Initialize locks for the partitioned hash table */
+	size = offsetof(ChannelHashLockData, locks) +
+		mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock));
+	channelHashLockData = (ChannelHashLockData *)
+		ShmemInitStruct("Channel Hash Lock Data", size, &found);
+	if (!found)
+	{
+		/* First time through: initialize the locks and tranche ID */
+		channelHashLockData->trancheId = LWLockNewTrancheId();
+		for (int i = 0; i < NUM_NOTIFY_PARTITIONS; i++)
+		{
+			LWLockInitialize(&channelHashLockData->locks[i],
+							 channelHashLockData->trancheId);
+		}
+	}
+
+	/*
+	 * Set up local pointers for convenience. We must also register the
+	 * tranche ID in every backend that will use these locks.
+	 */
+	channelHashLocks = channelHashLockData->locks;
+	channelHashTrancheId = channelHashLockData->trancheId;
+	LWLockRegisterTranche(channelHashTrancheId, "ChannelHashPartition");
 }
 
 
@@ -1110,6 +1327,7 @@ Exec_ListenPreCommit(void)
 		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
 		QUEUE_FIRST_LISTENER = MyProcNumber;
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now we are listed in the global array, so remember we're listening */
@@ -1152,6 +1370,8 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	ChannelHashAddListener(channel, MyProcNumber);
 }
 
 /*
@@ -1175,6 +1395,7 @@ Exec_UnlistenCommit(const char *channel)
 		{
 			listenChannels = foreach_delete_current(listenChannels, q);
 			pfree(lchan);
+			ChannelHashRemoveListener(channel, MyProcNumber);
 			break;
 		}
 	}
@@ -1193,9 +1414,22 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	ListCell   *p;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
+	/*
+	 * Before freeing the local list, iterate through it and perform a
+	 * targeted removal for each of our channels from the shared hash table.
+	 */
+	foreach(p, listenChannels)
+	{
+		char	   *channel = (char *) lfirst(p);
+
+		ChannelHashRemoveListener(channel, MyProcNumber);
+	}
+
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
 }
@@ -1239,6 +1473,7 @@ asyncQueueUnregister(void)
 	 * Need exclusive lock here to manipulate list links.
 	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
@@ -1565,12 +1800,18 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * This function operates in two modes:
+ * 1. Multicast mode: If all pending notification channels have a number of
+ *    listeners at or below the 'notify_multicast_threshold', we signal only
+ *    those specific backends.
+ * 2. Broadcast mode: If any channel has more listeners than the threshold (or
+ *    we ran out of shared memory for the channel hash table), we signal all
+ *    listening backends in our database.
+ *
+ * In addition to the channel-specific signaling, we also implement a "wake
+ * only tail" optimization: we signal the backend that is furthest behind
+ * in the queue to help prevent backends from getting far behind and create
+ * a chain reaction of wake-ups. This avoids thundering herd problems.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1824,11 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	List	   *channels;
+	ListCell   *p;
+	bool	   *signaled;
+	bool		broadcast_mode = false;
+	bool		tail_woken = false;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1594,40 +1840,173 @@ SignalBackends(void)
 	 */
 	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
 	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+	signaled = (bool *) palloc0(MaxBackends * sizeof(bool));
 	count = 0;
 
+	/* Get list of channels that have pending notifications */
+	channels = GetPendingNotifyChannels();
+
+	/*
+	 * To prevent deadlocks, we must always acquire locks in the same order:
+	 * global NotifyQueueLock first, then individual partition locks.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+
+	/*
+	 * Determine if we can use targeted signaling or must broadcast. This
+	 * check must be done while holding NotifyQueueLock to prevent deadlocks
+	 * against other backends that might be modifying the listener list and
+	 * hash table simultaneously (e.g., asyncQueueUnregister).
+	 */
+	foreach(p, channels)
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
-		QueuePosition pos;
+		char	   *channel = (char *) lfirst(p);
+		ChannelEntry *entry;
+		LWLock	   *lock = GetChannelHashLock(channel);
+
+		LWLockAcquire(lock, LW_SHARED);
+		entry = ChannelHashLookup(channel);
+
+		/*
+		 * If there is no entry, it could mean we ran out of shared memory
+		 * when trying to add this channel to the hash table. If the entry is
+		 * marked for broadcast, we must use broadcast mode.
+		 */
+		if (!entry || entry->is_broadcast)
+		{
+			broadcast_mode = true;
+			LWLockRelease(lock);
+			break;
+		}
+		LWLockRelease(lock);
+	}
 
-		Assert(pid != InvalidPid);
-		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+	if (broadcast_mode)
+	{
+		/*
+		 * In broadcast mode, we iterate over all listening backends and
+		 * signal the ones in our database that are not already caught up.
+		 */
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 		{
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
 			/*
 			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
+			 * already caught up.
 			 */
 			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
 				continue;
+
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			/* OK, need to signal this one */
+			pids[count] = pid;
+			procnos[count] = i;
+			signaled[i] = true;
+			count++;
 		}
-		else
+	}
+	else
+	{
+		/*
+		 * In multicast mode, signal specific listening backends. We must
+		 * re-check the hash entries here inside the lock to avoid races.
+		 */
+		foreach(p, channels)
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
+			char	   *channel = (char *) lfirst(p);
+			ChannelEntry *entry;
+			LWLock	   *lock = GetChannelHashLock(channel);
+
+			LWLockAcquire(lock, LW_SHARED);
+			entry = ChannelHashLookup(channel);
+
+			if (entry && !entry->is_broadcast)
+			{
+				for (int j = 0; j < entry->num_listeners; j++)
+				{
+					ProcNumber	i = entry->listeners[j];
+					int32		pid;
+					QueuePosition pos;
+
+					if (signaled[i])
+						continue;
+
+					pos = QUEUE_BACKEND_POS(i);
+
+					if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+						continue;
+
+					if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+						continue;
+
+					pid = QUEUE_BACKEND_PID(i);
+					Assert(pid != InvalidPid);
+
+					pids[count] = pid;
+					procnos[count] = i;
+					signaled[i] = true;
+					count++;
+				}
+			}
+			LWLockRelease(lock);
 		}
+	}
+
+	/*
+	 * Also check for any backends that are far behind. This ensures the
+	 * global tail can advance even if they're not actively receiving
+	 * notifications on their channels.
+	 */
+	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	{
+		int32		pid;
+		QueuePosition pos;
+
+		/*
+		 * Skip if we've already decided to signal this one.
+		 */
+		if (signaled[i])
+			continue;
+
+		pos = QUEUE_BACKEND_POS(i);
+
+		/*
+		 * Skip signaling listeners if they already caught up.
+		 */
+		if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+			continue;
+
+		/*
+		 * Wake only tail optimization: Signal the backend that is furthest
+		 * behind to help prevent backends from getting far behind in the
+		 * first place. This finds the backend(s) on the same page as the
+		 * global tail, which are the ones holding up truncation. This creates
+		 * a chain reaction where each backend eventually wakes up the next
+		 * one as notifications are processed, avoiding thundering herd.
+		 */
+		if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+											  QUEUE_POS_PAGE(pos)) == 0)
+			tail_woken = true;
+		else
+			continue;
+
+		pid = QUEUE_BACKEND_PID(i);
+		Assert(pid != InvalidPid);
 		/* OK, need to signal this one */
 		pids[count] = pid;
 		procnos[count] = i;
 		count++;
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
@@ -1647,9 +2026,9 @@ SignalBackends(void)
 
 		/*
 		 * Note: assuming things aren't broken, a signal failure here could
-		 * only occur if the target backend exited since we released
-		 * NotifyQueueLock; which is unlikely but certainly possible. So we
-		 * just log a low-level debug message if it happens.
+		 * only occur if the target backend exited since we released the lock;
+		 * which is unlikely but certainly possible. So we just log a
+		 * low-level debug message if it happens.
 		 */
 		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
 			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
@@ -1657,6 +2036,7 @@ SignalBackends(void)
 
 	pfree(pids);
 	pfree(procnos);
+	pfree(signaled);
 }
 
 /*
@@ -2395,3 +2775,382 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * GUC check_hook for notify_multicast_threshold
+ */
+bool
+check_notify_multicast_threshold(int *newval, void **extra, GucSource source)
+{
+	/*
+	 * We don't allow values less than 0.  A value of 0 is special and means
+	 * the multicast optimization is disabled entirely.
+	 */
+	if (*newval < 0)
+	{
+		GUC_check_errdetail("notify_multicast_threshold must be non-negative.");
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Channel hash table management functions
+ */
+
+/*
+ * channel_hash_func
+ *     Custom hash function for the channel hash table. This function ensures
+ *     that the low-order bits of the hash are well-distributed, which is
+ *     critical for partitioned hash tables.
+ */
+static uint32
+channel_hash_func(const void *key, Size keysize)
+{
+	const		ChannelHashKey *k = (const ChannelHashKey *) key;
+	uint32		h;
+
+	/*
+	 * Mix the dboid and the channel name to produce a good hash. hash_any()
+	 * is a high-quality portable hash function. This prevents channels with
+	 * the same name in different databases from always mapping to the same
+	 * partition.
+	 */
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/*
+ * GetChannelHashLock
+ *     Return the LWLock that protects the partition for the given channel name.
+ */
+static LWLock *
+GetChannelHashLock(const char *channel)
+{
+	ChannelHashKey key;
+	uint32		hash;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+	hash = get_hash_value(GetChannelHash(), &key);
+
+	return &channelHashLocks[hash % NUM_NOTIFY_PARTITIONS];
+}
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key (database OID + channel name) for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ *     Register the given backend as a listener for the specified channel.
+ *
+ * This function uses an optimistic read-locking strategy to maximize
+ * concurrency. An exclusive lock is only taken when mutating the listener
+ * list.
+ *
+ * 1. It first takes a shared lock. If the channel is already in broadcast
+ *    mode, or if the current backend is already in the listener list, no write
+ *    is needed and we can return immediately.
+ *
+ * 2. If a write is needed, it releases the shared lock and acquires an
+ *    exclusive lock.
+ *
+ * 3. CRUCIALLY, after acquiring the exclusive lock, it must re-check the
+ *    state, as another backend may have modified the entry in the interim.
+ *
+ * 4. If the number of listeners is below 'notify_multicast_threshold', the
+ *    new listener is added. If the threshold is reached, the channel is
+ *    converted to broadcast mode.
+ */
+static void
+ChannelHashAddListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	bool		found;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+
+	/*
+	 * If the threshold is zero, this optimization is disabled. All channels
+	 * immediately use broadcast, so we don't need to track them.
+	 */
+	if (notify_multicast_threshold <= 0)
+		return;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * FAST PATH: Optimistically take a shared lock. If the channel is already
+	 * in broadcast mode, or if we are already listed, we are done.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry)
+	{
+		if (entry->is_broadcast)
+		{
+			LWLockRelease(lock);
+			return;
+		}
+		/* Check if we are already in the list */
+		for (int i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				LWLockRelease(lock);
+				return;
+			}
+		}
+	}
+	LWLockRelease(lock);
+
+	/*
+	 * SLOW PATH: We need to write. Acquire exclusive lock.
+	 */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check state after acquiring exclusive lock, as it may have changed.
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_ENTER_NULL, &found);
+
+	if (entry == NULL)
+	{
+		/* Out of memory in the hash partition. */
+		ereport(DEBUG1, (errmsg("too many notification channels are already being tracked")));
+		LWLockRelease(lock);
+		return;
+	}
+
+	if (!found)
+	{
+		/* First listener for this channel. */
+		entry->is_broadcast = false;
+		entry->num_listeners = 1;
+		entry->listeners[0] = procno;
+	}
+	else
+	{
+		/* Entry already exists, re-check everything. */
+		bool		already_present = false;
+
+		if (entry->is_broadcast)
+		{
+			/* Another backend set it to broadcast mode. We're done. */
+			LWLockRelease(lock);
+			return;
+		}
+
+		for (int i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				already_present = true;
+				break;
+			}
+		}
+
+		if (!already_present)
+		{
+			if (entry->num_listeners < notify_multicast_threshold)
+			{
+				/* Add ourselves to the list of listeners. */
+				entry->listeners[entry->num_listeners] = procno;
+				entry->num_listeners++;
+			}
+			else
+			{
+				/* We are the listener that exceeds the threshold. */
+				entry->is_broadcast = true;
+				entry->num_listeners = 0;	/* Clear the list */
+			}
+		}
+	}
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashRemoveListener
+ *		Update the channel hash when a backend stops listening on a channel.
+ *
+ * This function uses an optimistic read-lock strategy. An exclusive lock is
+ * only taken if we are in the listener list for a channel and need to remove
+ * ourselves. If a channel is in broadcast mode, we cannot safely modify it,
+ * as we can't know which backends are listening.
+ */
+static void
+ChannelHashRemoveListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+	bool		present = false;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * Take a shared lock first to see if a removal is even possible. If the
+	 * entry doesn't exist, is in broadcast mode, or we're not in its list, we
+	 * have nothing to do. This is the fast path.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (!entry || entry->is_broadcast)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+
+	/* Check if we are in the list */
+	for (int i = 0; i < entry->num_listeners; i++)
+	{
+		if (entry->listeners[i] == procno)
+		{
+			present = true;
+			break;
+		}
+	}
+	if (!present)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+	LWLockRelease(lock);
+
+	/* A removal is likely needed. Acquire an exclusive lock. */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check the state. Another backend might have changed it (e.g., to
+	 * broadcast mode).
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry && !entry->is_broadcast)
+	{
+		int			i;
+
+		for (i = 0; i < entry->num_listeners; i++)
+		{
+			if (entry->listeners[i] == procno)
+			{
+				/*
+				 * Found our procno. Remove it from the listener array.
+				 *
+				 * If this is the last listener, we remove the entire hash
+				 * entry for the channel.
+				 */
+				if (entry->num_listeners == 1)
+				{
+					(void) hash_search(GetChannelHash(), &key, HASH_REMOVE, NULL);
+				}
+				else
+				{
+					/*
+					 * To remove an element from the array while keeping it
+					 * contiguous, we first decrement the listener count.
+					 * Then, we shift all subsequent elements one position to
+					 * the left, overwriting the element we want to remove.
+					 *
+					 * The `if (i < entry->num_listeners)` condition
+					 * explicitly handles the case where the last element in
+					 * the array is being removed. In that scenario, `i`
+					 * equals the new `num_listeners`, so no memory movement
+					 * is necessary, and the `memmove` is correctly skipped.
+					 */
+					entry->num_listeners--;
+					if (i < entry->num_listeners)
+					{
+						Size		size_to_move;
+
+						size_to_move = mul_size(entry->num_listeners - i,
+												sizeof(ProcNumber));
+						memmove(&entry->listeners[i],
+								&entry->listeners[i + 1],
+								size_to_move);
+					}
+				}
+				break;			/* Found and removed, exit loop. */
+			}
+		}
+	}
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashLookup
+ *		Look up the channel hash entry for the given channel name in the
+ *		current database.
+ *
+ * Returns NULL if the channel is not being tracked (no listeners, or channel
+ * fell back to broadcast mode because we ran out of shared memory when trying
+ * to add entries to the hash table).
+ *
+ * Caller must hold the appropriate partition lock (shared is sufficient).
+ */
+static ChannelEntry *
+ChannelHashLookup(const char *channel)
+{
+	ChannelHashKey key;
+
+	Assert(LWLockHeldByMe(GetChannelHashLock(channel)));
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	return (ChannelEntry *) hash_search(GetChannelHash(),
+										&key,
+										HASH_FIND,
+										NULL);
+}
+
+/*
+ * GetPendingNotifyChannels
+ *		Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+	List	   *channels = NIL;
+	ListCell   *p;
+	ListCell   *q;
+	bool		found;
+
+	if (!pendingNotifies)
+		return NIL;
+
+	/* Collect unique channel names from pending notifications */
+	foreach(p, pendingNotifies->events)
+	{
+		Notification *n = (Notification *) lfirst(p);
+		char	   *channel = n->data;
+
+		/* Check if we already have this channel in our list */
+		found = false;
+		foreach(q, channels)
+		{
+			char	   *existing = (char *) lfirst(q);
+
+			if (strcmp(existing, channel) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		if (!found)
+			channels = lappend(channels, channel);
+	}
+
+	return channels;
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index d31cb45a058..25196e3246b 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -162,6 +162,7 @@ int			commit_timestamp_buffers = 0;
 int			multixact_member_buffers = 32;
 int			multixact_offset_buffers = 16;
 int			notify_buffers = 16;
+int			notify_multicast_threshold = 16;
 int			serializable_buffers = 32;
 int			subtransaction_buffers = 0;
 int			transaction_buffers = 0;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..1e642f9f69e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2464,6 +2464,19 @@ struct config_int ConfigureNamesInt[] =
 		check_notify_buffers, NULL, NULL
 	},
 
+	{
+		{"notify_multicast_threshold", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Sets the maximum number of listeners to track per channel for multicast signaling."),
+			gettext_noop("When the number of listeners on a channel exceeds this threshold, "
+						 "NOTIFY will signal all listening backends rather than just those "
+						 "listening on the specific channel. Setting to 0 disables multicast "
+						 "signaling entirely."),
+		},
+		&notify_multicast_threshold,
+		16, 0, MAX_BACKENDS,
+		check_notify_multicast_threshold, NULL, NULL
+	},
+
 	{
 		{"serializable_buffers", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Sets the size of the dedicated buffer pool used for the serializable transaction cache."),
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1bef98471c3..b23492653f3 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -182,6 +182,7 @@ extern PGDLLIMPORT int commit_timestamp_buffers;
 extern PGDLLIMPORT int multixact_member_buffers;
 extern PGDLLIMPORT int multixact_offset_buffers;
 extern PGDLLIMPORT int notify_buffers;
+extern PGDLLIMPORT int notify_multicast_threshold;
 extern PGDLLIMPORT int serializable_buffers;
 extern PGDLLIMPORT int subtransaction_buffers;
 extern PGDLLIMPORT int transaction_buffers;
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 82ac8646a8d..ed3a00bb7e4 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -92,6 +92,7 @@ extern bool check_multixact_member_buffers(int *newval, void **extra,
 extern bool check_multixact_offset_buffers(int *newval, void **extra,
 										   GucSource source);
 extern bool check_notify_buffers(int *newval, void **extra, GucSource source);
+extern bool check_notify_multicast_threshold(int *newval, void **extra, GucSource source);
 extern bool check_primary_slot_name(char **newval, void **extra,
 									GucSource source);
 extern bool check_random_seed(double *newval, void **extra, GucSource source);
-- 
2.47.1

