From aba0ffb2a9e1c5d77393a92c0ce43a968c23cbb5 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sun, 15 Jun 2025 00:09:43 +0200
Subject: [PATCH] Optimize LISTEN/NOTIFY signaling for single-listener channels
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, the implementation would signal every backend process that was
listening on any channel in the same database. This signaling is performed via
SendProcSignal(), which ultimately issues a kill(pid, SIGUSR1) syscall for each
listening backend.

This broadcast approach is well-suited for use cases like cache invalidation but
limits the scalability of application patterns where backends listen on distinct
channels. For example, a system of worker processes might use unique channel
names to direct work to a specific worker. In these scenarios, a NOTIFY intended
for a single listener unnecessarily triggers a syscall for every other listening
backend.

This commit improves scalability for such workloads by optimizing for
the single-listener case. By making this pattern more performant, we enable it
to be used more effectively in high-throughput systems, pushing PostgreSQL's
scalability limits for this class of applications. A new shared memory hash
table is introduced to track which backend process is listening on each channel.
When a NOTIFY is issued, if a channel has exactly one registered listener, we
can signal that specific backend directly.

The system gracefully falls back to broadcast behavior under two conditions:

1. When a channel has multiple backends listening to it.
2. If the shared hash table runs out of memory and cannot create a new entry.

To support this, the LISTEN and UNLISTEN commands, as well as the backend exit
cleanup logic in asyncQueueUnregister, are updated to manage entries in the new
channel hash table. The main signaling logic in SignalBackends has been reworked
to implement the targeted-vs-broadcast decision.

To ensure the global queue tail can always advance, this change also includes a
"wake only tail" optimization, contributed by Marko Tikkaja (johto). Instead
of waking all backends that are lagging far behind, this logic specifically
signals only the backend that is currently at the queue tail. This targeted
wake-up prevents a "thundering herd" of signals and relies on a chain
reaction—where each backend wakes the next—to process the queue efficiently.
This mechanism works in conjunction with both the new targeted signaling and
the broadcast fallback.

CAVEAT: This patch should be considered a first-step, proof-of-concept
optimization. It uses a simple boolean flag to distinguish single-listener
channels from multi-listener ones and does not track the full list of backends
for a multi-listener channel. As a result, it cannot remove a hash entry for
a channel once it has been marked as having multiple listeners, causing such
entries to persist even after all listeners have departed. A more complete
solution would likely involve reference counting to track all listening backends
for each channel. This would not only prevent stuck hash entries but could also
enable targeted signaling to all listeners of a specific multi-user channel,
further refining the optimization and avoiding the fallback to a full
database-wide broadcast.
---
 src/backend/commands/async.c | 572 ++++++++++++++++++++++++++++++++---
 1 file changed, 537 insertions(+), 35 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..a0b7daaef7d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,11 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  In addition to each backend maintaining its own list of channels, we also
+ *	  maintain a central hash table that tracks channels with single listeners.
+ *	  When a channel has exactly one listening backend, we can signal just that
+ *	  backend. For channels with multiple listeners, we signal all listening
+ *	  backends.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -71,13 +74,16 @@
  *	  make any actual updates to the effective listen state (listenChannels).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which has two modes of operation, depending on
+ *	  if any of our channels have multiple listening backends or not:
+ *	  a) If there are multiple listening backends, a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to every listening backend.
+ *	  b) Otherwise, such signals are only sent to each single listening backend
+ *	  per channel.
+ *	  Additionally, we use a "wake only tail" optimization: we always signal
+ *	  the backend furthest behind in the queue to help prevent backends from
+ *	  getting far behind and create a chain reaction of wake-ups.
+ *	  We can exclude backends that are already up to date, though.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -146,6 +152,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -162,6 +169,58 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Channel hash table definitions
+ *
+ * This hash table provides an optimization by tracking which backend is
+ * listening on each channel. Channels are identified by database OID and
+ * channel name, making them database-specific.
+ *
+ * When exactly one backend listens on a channel, we signal that specific
+ * backend, avoiding unnecessary signals to all listening backends.
+ *
+ * We fall back to broadcast mode and signal all listening backends when:
+ * 1) Multiple backends listen on the same channel, OR
+ * 2) The hash table runs out of shared memory for new entries
+ *
+ * Note that CHANNEL_HASH_MAX_SIZE is not a hard limit - the hash table can
+ * store more entries than this, but performance will degrade due to bucket
+ * overflow. The actual fallback to broadcast mode occurs only when shared
+ * memory is exhausted and we cannot allocate new hash entries.
+ *
+ * The maximum size (CHANNEL_HASH_MAX_SIZE) is based on the typical OS port
+ * range. This provides a reasonable upper bound for systems that use
+ * per-connection channels.
+ *
+ */
+#define CHANNEL_HASH_INIT_SIZE		256
+#define CHANNEL_HASH_MAX_SIZE		65535
+
+/*
+ * Key structure for the channel hash table.
+ * Channels are database-specific, so we need both the database OID
+ * and the channel name to uniquely identify a channel.
+ */
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+}			ChannelHashKey;
+
+/*
+ * Each entry contains a channel key (database OID + channel name) and a
+ * single backend ProcNumber that is listening on that channel. If multiple
+ * backends try to listen on the same channel, we mark it as having multiple
+ * listeners and fall back to broadcast behavior.
+ */
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	ProcNumber	listener;		/* single backend ID, or INVALID_PROC_NUMBER
+								 * if multiple */
+	bool		has_multiple_listeners;
+}			ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -293,6 +352,39 @@ typedef struct AsyncQueueControl
 
 static AsyncQueueControl *asyncQueueControl;
 
+/* Channel hash table for single listening backend signalling */
+static HTAB *channelHash = NULL;
+
+/*
+ * GetChannelHash
+ *		Get the channel hash table, initializing our backend's pointer if needed.
+ *
+ * This must be called before any access to the channel hash table.
+ * The hash table itself is created in shared memory during AsyncShmemInit,
+ * but each backend needs to get its own pointer to it.
+ */
+static HTAB *
+GetChannelHash(void)
+{
+	if (channelHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		/* Set up to attach to the existing shared hash table */
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+		hash_ctl.entrysize = sizeof(ChannelEntry);
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS);
+	}
+
+	return channelHash;
+}
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -458,6 +550,14 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+/* Channel hash table management functions */
+static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveBackendFromAll(ProcNumber procno);
+static ChannelEntry * ChannelHashLookup(const char *channel);
+static List *GetPendingNotifyChannels(void);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -492,6 +592,9 @@ AsyncShmemSize(void)
 
 	size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
 
+	size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
+											 sizeof(ChannelEntry)));
+
 	return size;
 }
 
@@ -546,6 +649,23 @@ AsyncShmemInit(void)
 		 */
 		(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
 	}
+
+	/*
+	 * Create or attach to the channel hash table.
+	 */
+	{
+		HASHCTL		hash_ctl;
+
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+		hash_ctl.entrysize = sizeof(ChannelEntry);
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS);
+	}
 }
 
 
@@ -1043,6 +1163,7 @@ Exec_ListenPreCommit(void)
 	QueuePosition head;
 	QueuePosition max;
 	ProcNumber	prevListener;
+	ListCell   *p;
 
 	/*
 	 * Nothing to do if we are already listening to something, nor if we
@@ -1110,6 +1231,18 @@ Exec_ListenPreCommit(void)
 		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
 		QUEUE_FIRST_LISTENER = MyProcNumber;
 	}
+
+	/*
+	 * Add all our channels to the channel hash table while we still hold
+	 * exclusive lock on NotifyQueueLock.
+	 */
+	foreach(p, listenChannels)
+	{
+		char	   *channel = (char *) lfirst(p);
+
+		ChannelHashAddListener(channel, MyProcNumber);
+	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now we are listed in the global array, so remember we're listening */
@@ -1152,6 +1285,10 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+	ChannelHashAddListener(channel, MyProcNumber);
+	LWLockRelease(NotifyQueueLock);
 }
 
 /*
@@ -1175,6 +1312,10 @@ Exec_UnlistenCommit(const char *channel)
 		{
 			listenChannels = foreach_delete_current(listenChannels, q);
 			pfree(lchan);
+
+			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+			ChannelHashRemoveListener(channel, MyProcNumber);
+			LWLockRelease(NotifyQueueLock);
 			break;
 		}
 	}
@@ -1239,6 +1380,9 @@ asyncQueueUnregister(void)
 	 * Need exclusive lock here to manipulate list links.
 	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	ChannelHashRemoveBackendFromAll(MyProcNumber);
+
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
@@ -1565,12 +1709,18 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * This function operates in two modes:
+ * 1. Selective mode: When all pending notification channels have exactly one
+ *    listener each, we signal only those specific backends that are listening
+ *    on the channels with pending notifications.
+ * 2. Broadcast mode: When any channel has multiple listeners (or we ran out
+ *    of shared memory for the channel hash table), we signal all listening
+ *    backends in our database.
+ *
+ * In addition to the channel-specific signaling, we also implement a "wake
+ * only tail" optimization: we signal the backend that is furthest behind
+ * in the queue to help prevent backends from getting far behind and create
+ * a chain reaction of wake-ups. This avoids thundering herd problems.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1733,11 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	List	   *channels;
+	ListCell   *p;
+	bool	   *signaled;
+	bool		broadcast_mode = false;
+	bool		tail_woken = false;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1594,40 +1749,159 @@ SignalBackends(void)
 	 */
 	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
 	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+	signaled = (bool *) palloc0(MaxBackends * sizeof(bool));
 	count = 0;
 
+	/* Get list of channels that have pending notifications */
+	channels = GetPendingNotifyChannels();
+
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/*
+	 * Check if any channel has multiple listeners, in which case we would
+	 * need to signal all backends anyway.
+	 */
+	foreach(p, channels)
+	{
+		char	   *channel = (char *) lfirst(p);
+		ChannelEntry *entry = ChannelHashLookup(channel);
+
+		/*
+		 * If there is no entry, it could mean we ran out of shared memory
+		 * when trying to add this channel to the hash table, so we need to
+		 * broadcast in that case as well.
+		 */
+		if (!entry || entry->has_multiple_listeners)
+		{
+			broadcast_mode = true;
+			break;
+		}
+	}
+
+	if (broadcast_mode)
+	{
+		/*
+		 * In broadcast mode, we iterate over all listening backends and
+		 * signal the ones in our database that are not already caught up.
+		 */
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+		{
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
+			/*
+			 * Always signal listeners in our own database, unless they're
+			 * already caught up.
+			 */
+			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+				continue;
+
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			/* OK, need to signal this one */
+			pids[count] = pid;
+			procnos[count] = i;
+			signaled[i] = true;
+			count++;
+		}
+	}
+	else
+	{
+		/*
+		 * Signal specific listening backends
+		 */
+		foreach(p, channels)
+		{
+			char	   *channel = (char *) lfirst(p);
+			ChannelEntry *entry = ChannelHashLookup(channel);
+
+			ProcNumber	i = entry->listener;
+			int32		pid;
+			QueuePosition pos;
+
+			Assert(entry && !entry->has_multiple_listeners);
+
+			if (signaled[i])
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
+			/*
+			 * Skip signaling listeners if they already caught up.
+			 */
+			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+				continue;
+
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			/* OK, need to signal this one */
+			pids[count] = pid;
+			procnos[count] = i;
+			signaled[i] = true;
+			count++;
+		}
+	}
+
+	/*
+	 * Also check for any backends that are far behind. This ensures the
+	 * global tail can advance even if they're not actively receiving
+	 * notifications on their channels.
+	 */
 	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
+		int32		pid;
 		QueuePosition pos;
 
-		Assert(pid != InvalidPid);
+		/*
+		 * Skip if we've already decided to signal this one.
+		 */
+		if (signaled[i])
+			continue;
+
 		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
-		{
-			/*
-			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
-			 */
-			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-				continue;
-		}
+
+		/*
+		 * Skip signaling listeners if they already caught up.
+		 */
+		if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+			continue;
+
+		/*
+		 * Wake only tail optimization: Signal the backend that is furthest
+		 * behind to help prevent backends from getting far behind in the
+		 * first place. This creates a chain reaction where each backend
+		 * eventually wakes up the next one as notifications are processed,
+		 * avoiding thundering herd.
+		 *
+		 * Otherwise, only skip signaling listeners if they are not far
+		 * behind.
+		 */
+		if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+											  QUEUE_POS_PAGE(pos)) == 0)
+			tail_woken = true;
 		else
-		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
-		}
+			continue;
+
+		pid = QUEUE_BACKEND_PID(i);
+		Assert(pid != InvalidPid);
 		/* OK, need to signal this one */
 		pids[count] = pid;
 		procnos[count] = i;
 		count++;
+
+
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
@@ -1657,6 +1931,7 @@ SignalBackends(void)
 
 	pfree(pids);
 	pfree(procnos);
+	pfree(signaled);
 }
 
 /*
@@ -2395,3 +2670,230 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Channel hash table management functions
+ */
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key (database OID + channel name) for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ *		Register the given backend as a listener for the specified channel
+ *		in the shared channel hash table.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashAddListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	bool		found;
+	ChannelHashKey key;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/* Look up or create the channel entry */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(),
+										 &key,
+										 HASH_ENTER_NULL,
+										 &found);
+
+	/*
+	 * If hash_search returned NULL, we've run out of shared memory to
+	 * allocate new hash entries. We gracefully degrade by not tracking this
+	 * channel in the hash. The channel will use the fallback broadcast
+	 * signalling.
+	 */
+	if (entry == NULL)
+	{
+		ereport(DEBUG1,
+				(errmsg("too many notification channels are already being tracked")));
+		return;
+	}
+
+	if (!found)
+	{
+		/* New channel, initialize the entry */
+		memcpy(&entry->key, &key, sizeof(ChannelHashKey));
+		entry->listener = procno;
+		entry->has_multiple_listeners = false;
+	}
+	else
+	{
+		/* Channel already exists */
+		if (!entry->has_multiple_listeners)
+		{
+			if (entry->listener == procno)
+				return;			/* Already listening */
+
+			/*
+			 * Another backend is already listening on this channel. Mark it
+			 * as having multiple listeners and fall back to broadcast
+			 * signalling.
+			 */
+			entry->has_multiple_listeners = true;
+			entry->listener = INVALID_PROC_NUMBER;
+		}
+		/* If already marked as having multiple listeners, nothing to do */
+	}
+}
+
+/*
+ * ChannelHashRemoveListener
+ *		Update the channel hash when a backend stops listening on a channel.
+ *
+ * If the channel entry currently tracks exactly one listener and that
+ * listener matches the supplied procno, remove the entry altogether.
+ *
+ * If the channel has already been flagged as having multiple listeners,
+ * we no longer track individual backends; therefore we cannot remove a
+ * single backend without additional bookkeeping.  In that situation we
+ * simply leave the entry in place (still marked as having multiple
+ * listeners) and return.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashRemoveListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	ChannelHashKey key;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/* Look up the channel entry */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(),
+										 &key,
+										 HASH_FIND,
+										 NULL);
+
+	if (!entry)
+		return;					/* Channel not found */
+
+	/*
+	 * If this channel has multiple listeners, we can't track individual
+	 * removals. Just leave it marked as having multiple listeners.
+	 */
+	if (entry->has_multiple_listeners)
+		return;
+
+	/* If this backend is the single listener, remove the channel entry */
+	if (entry->listener == procno)
+	{
+		hash_search(GetChannelHash(),
+					&key,
+					HASH_REMOVE,
+					NULL);
+	}
+}
+
+/*
+ * ChannelHashRemoveBackendFromAll
+ *		Sweep the channel hash and delete any channel entries for which
+ *		this backend is the only tracked listener in the current database.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashRemoveBackendFromAll(ProcNumber procno)
+{
+	HASH_SEQ_STATUS status;
+	ChannelEntry *entry;
+
+	hash_seq_init(&status, GetChannelHash());
+
+	while ((entry = (ChannelEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (entry->key.dboid != MyDatabaseId)
+			continue;
+
+		if (entry->has_multiple_listeners)
+			continue;
+
+		if (entry->listener == procno)
+		{
+			hash_search(GetChannelHash(),
+						&entry->key,
+						HASH_REMOVE,
+						NULL);
+		}
+	}
+}
+
+/*
+ * ChannelHashLookup
+ *		Look up the channel hash entry for the given channel name in the
+ *		current database.
+ *
+ * Returns NULL if the channel is not being tracked (no listeners, or channel
+ * fell back to broadcast mode because we ran out of shared memory when trying
+ * to add entries to the hash table).
+ *
+ * Caller must hold at least shared NotifyQueueLock.
+ */
+static ChannelEntry *
+ChannelHashLookup(const char *channel)
+{
+	ChannelHashKey key;
+
+	Assert(LWLockHeldByMe(NotifyQueueLock));
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	return (ChannelEntry *) hash_search(GetChannelHash(),
+										&key,
+										HASH_FIND,
+										NULL);
+}
+
+/*
+ * GetPendingNotifyChannels
+ *		Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+	List	   *channels = NIL;
+	ListCell   *p;
+	ListCell   *q;
+	bool		found;
+
+	if (!pendingNotifies)
+		return NIL;
+
+	/* Collect unique channel names from pending notifications */
+	foreach(p, pendingNotifies->events)
+	{
+		Notification *n = (Notification *) lfirst(p);
+		char	   *channel = n->data;
+
+		/* Check if we already have this channel in our list */
+		found = false;
+		foreach(q, channels)
+		{
+			char	   *existing = (char *) lfirst(q);
+
+			if (strcmp(existing, channel) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		if (!found)
+			channels = lappend(channels, channel);
+	}
+
+	return channels;
+}
-- 
2.47.1

