On 2024-12-24 13:09, Trey Boudreau wrote:
Based on [1], please find attached an implementation for listening
for notifications on all channels. The first two chunks lay some
groundwork and do not change the current LISTEN behavior, except
to improve performance of managing large numbers of channels.
Trying another email client to see if the patches show up.
From de7ed45ebf5026a4a43857b38fbe3437db09db65 Mon Sep 17 00:00:00 2001
From: "Boudreau, Trey" <t...@treysoft.com>
Date: Mon, 23 Dec 2024 14:18:44 -0600
Subject: [PATCH v1 1/3] Make simplehash more flexible.
Allow users of simplehash.h to have more control over the key when
inserting and removing items from the table. In particular one can now
say:
#define SH_KEY_TYPE const char *
#define SH_MAKE_KEY(tb, k) MemoryContextStrdup(tb->ctx, k);
#define SH_UNMAKE_KEY(tb, k) pfree(k)
and usefully use char pointers for keys. If the user provides
SH_UNMAKE_KEY then SH_DELETE, SH_DELETE_ITEM, SH_RESET, and SH_DESTROY
will use it to process the stored keys prior to doing their work.
---
src/include/lib/simplehash.h | 40 ++++++++++++++++++++++++++++++++++--
1 file changed, 38 insertions(+), 2 deletions(-)
diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 3e1b1f9461..d349b25717 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -46,6 +46,11 @@
* are defined, so you can supply your own
* The following parameters are only relevant when SH_DEFINE is defined:
* - SH_KEY - name of the element in SH_ELEMENT_TYPE containing the hash key
+ * - SH_MAKE_KEY(table, key) - if defined, given a key passed to SH_LOOKUP,
+ * SH_LOOKUP_HASH, SH_INSERT, or SH_INSERT_HASH, return a value that can
+ * be assigned to an SH_KEY
+ * - SH_UNMAKE_KEY(table, stored_key) - if defined, process a stored key
+ * prior to removing the entry via SH_DELETE or SH_DELETE_ITEM
* - SH_EQUAL(table, a, b) - compare two table keys
* - SH_HASH_KEY(table, key) - generate hash for the key
* - SH_STORE_HASH - if defined the hash is stored in the elements
@@ -277,6 +282,28 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb);
#define SH_GROW_MIN_FILLFACTOR 0.1
#endif
+#ifndef SH_MAKE_KEY
+#define SH_MAKE_KEY(table, key) key
+#endif
+
+#ifdef SH_UNMAKE_KEY
+#define SH_UNMAKE_ALL_KEYS(tb) \
+ if (tb->members) \
+ { \
+ SH_ELEMENT_TYPE *elem = tb->data; \
+ SH_ELEMENT_TYPE *end = elem + tb->size; \
+ while (elem < end) \
+ { \
+ if (elem->status == SH_STATUS_IN_USE) \
+ SH_UNMAKE_KEY(tb, elem->SH_KEY); \
+ elem++; \
+ } \
+ }
+#else
+#define SH_UNMAKE_KEY(table, key) do {} while (false)
+#define SH_UNMAKE_ALL_KEYS(table) do {} while (false)
+#endif
+
#ifdef SH_STORE_HASH
#define SH_COMPARE_KEYS(tb, ahash, akey, b) (ahash == SH_GET_HASH(tb, b) && SH_EQUAL(tb, b->SH_KEY, akey))
#else
@@ -471,6 +498,7 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data)
SH_SCOPE void
SH_DESTROY(SH_TYPE * tb)
{
+ SH_UNMAKE_ALL_KEYS(tb);
SH_FREE(tb, tb->data);
pfree(tb);
}
@@ -479,6 +507,7 @@ SH_DESTROY(SH_TYPE * tb)
SH_SCOPE void
SH_RESET(SH_TYPE * tb)
{
+ SH_UNMAKE_ALL_KEYS(tb);
memset(tb->data, 0, sizeof(SH_ELEMENT_TYPE) * tb->size);
tb->members = 0;
}
@@ -652,7 +681,7 @@ restart:
if (entry->status == SH_STATUS_EMPTY)
{
tb->members++;
- entry->SH_KEY = key;
+ entry->SH_KEY = SH_MAKE_KEY(tb, key);
#ifdef SH_STORE_HASH
SH_GET_HASH(tb, entry) = hash;
#endif
@@ -740,7 +769,7 @@ restart:
/* and fill the now empty spot */
tb->members++;
- entry->SH_KEY = key;
+ entry->SH_KEY = SH_MAKE_KEY(tb, key);
#ifdef SH_STORE_HASH
SH_GET_HASH(tb, entry) = hash;
#endif
@@ -872,6 +901,8 @@ SH_DELETE(SH_TYPE * tb, SH_KEY_TYPE key)
{
SH_ELEMENT_TYPE *lastentry = entry;
+ SH_UNMAKE_KEY(tb, entry->SH_KEY);
+
tb->members--;
/*
@@ -935,6 +966,8 @@ SH_DELETE_ITEM(SH_TYPE * tb, SH_ELEMENT_TYPE * entry)
/* Calculate the index of 'entry' */
curelem = entry - &tb->data[0];
+ SH_UNMAKE_KEY(tb, entry->SH_KEY);
+
tb->members--;
/*
@@ -1148,6 +1181,9 @@ SH_STAT(SH_TYPE * tb)
#undef SH_KEY_TYPE
#undef SH_KEY
#undef SH_ELEMENT_TYPE
+#undef SH_MAKE_KEY
+#undef SH_UNMAKE_KEY
+#undef SH_UNMAKE_ALL_KEYS
#undef SH_HASH_KEY
#undef SH_SCOPE
#undef SH_DECLARE
--
2.43.0
From 258d75348952c28d8ff3950363471c48e8a8c362 Mon Sep 17 00:00:00 2001
From: "Boudreau, Trey" <t...@treysoft.com>
Date: Tue, 24 Dec 2024 12:15:36 -0600
Subject: [PATCH v1 3/3] WIP Allow LISTENing for all channels.
Extend the notion of LISTENing to include all channels via
'LISTEN *', as per:
https://www.postgresql.org/message-id/737106.1734732462%40sss.pgh.pa.us
This commit does not change the results of pg_listening_channels(),
but some future version of it should.
---
doc/src/sgml/ref/listen.sgml | 37 +++++--
src/backend/commands/async.c | 149 ++++++++++++++++++++++++----
src/backend/parser/gram.y | 10 +-
src/backend/tcop/utility.c | 5 +-
src/include/commands/async.h | 1 +
src/include/nodes/parsenodes.h | 3 +-
src/test/regress/expected/async.out | 6 ++
src/test/regress/sql/async.sql | 7 ++
8 files changed, 189 insertions(+), 29 deletions(-)
diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml
index 6c1f09bd45..67bab0b8f1 100644
--- a/doc/src/sgml/ref/listen.sgml
+++ b/doc/src/sgml/ref/listen.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
-LISTEN <replaceable class="parameter">channel</replaceable>
+LISTEN { <replaceable class="parameter">channel</replaceable> | * }
</synopsis>
</refsynopsisdiv>
@@ -36,6 +36,25 @@ LISTEN <replaceable class="parameter">channel</replaceable>
this notification channel, nothing is done.
</para>
+ <para>
+ A session can be unregistered for a given notification channel with the
+ <command>UNLISTEN</command> command. A session's listen
+ registrations are automatically cleared when the session ends.
+ </para>
+
+ <para>
+ The special wildcard <literal>*</literal> cancels all listener
+ registrations for the current session and replaces them with a
+ virtual registration that matches all channels. Subsequent
+ <command>UNLISTEN <replaceable class="parameter">channel</replaceable>
+ </command> commands add to a list of exceptions and will not be
+ delivered. Any <command>LISTEN <replaceable class="parameter">channel</replaceable>
+ </command> commands after <command>LISTEN *</command> will remove
+ previously set <command>UNLISTEN</command> exceptions, if present.
+ Otherwise they will be ignored until the session sees the
+ <command>UNLISTEN *</command> command.
+ </para>
+
<para>
Whenever the command <command>NOTIFY <replaceable
class="parameter">channel</replaceable></command> is invoked, either
@@ -45,12 +64,6 @@ LISTEN <replaceable class="parameter">channel</replaceable>
application.
</para>
- <para>
- A session can be unregistered for a given notification channel with the
- <command>UNLISTEN</command> command. A session's listen
- registrations are automatically cleared when the session ends.
- </para>
-
<para>
The method a client application must use to detect notification events depends on
which <productname>PostgreSQL</productname> application programming interface it
@@ -77,6 +90,16 @@ LISTEN <replaceable class="parameter">channel</replaceable>
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>*</literal></term>
+ <listitem>
+ <para>
+ All current listen registrations for this session are cleared and
+ replaced by a virtual registration that matches all channels.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 54f47f024f..607ad81c5d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -336,11 +336,18 @@ typedef struct
#include "lib/simplehash.h"
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simplehash.h of channel names,
+ * listenChannels identifies the channels we are actually listening to.
+ * We represent it as a boolean flag indicating whether or not we want
+ * to see all notifications, and a simplehash.h of the exeptions,
* allocated in TopMemoryContext.
*/
-static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
+typedef struct
+{
+ listen_hash *exceptions;
+ bool wantAll;
+} ListenChannels;
+
+static ListenChannels * listenChannels = NULL;
/* Initialize the hash table with this many elements when we start listening. */
#define LISTEN_HASH_START_SIZE 16
@@ -358,6 +365,7 @@ static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
typedef enum
{
LISTEN_LISTEN,
+ LISTEN_LISTEN_ALL,
LISTEN_UNLISTEN,
LISTEN_UNLISTEN_ALL,
} ListenActionKind;
@@ -460,8 +468,11 @@ static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
static void Exec_ListenCommit(const char *channel);
+static void Exec_ListenAllCommit(void);
static void Exec_UnlistenCommit(const char *channel);
static void Exec_UnlistenAllCommit(void);
+static bool NoActiveListens(void);
+static void ResetListens(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -709,8 +720,8 @@ Async_Notify(const char *channel, const char *payload)
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
- * commit.
+ * Actual update of the listenChannels data structure happens during
+ * transaction commit.
*/
static void
queue_listen(ListenActionKind action, const char *channel)
@@ -769,6 +780,20 @@ Async_Listen(const char *channel)
queue_listen(LISTEN_LISTEN, channel);
}
+/*
+ * Async_ListenAll
+ *
+ * This is executed by the SQL LISTEN * command.
+ */
+void
+Async_ListenAll(void)
+{
+ if (Trace_notify)
+ elog(DEBUG1, "Async_ListenAll(%d)", MyProcPid);
+
+ queue_listen(LISTEN_LISTEN_ALL, "");
+}
+
/*
* Async_Unlisten
*
@@ -826,7 +851,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
if (listenChannels)
{
funcctx->user_fctx = MemoryContextAlloc(funcctx->multi_call_memory_ctx, sizeof(listen_iterator));
- listen_start_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+ listen_start_iterate(listenChannels->exceptions, (listen_iterator *) funcctx->user_fctx);
}
}
@@ -836,7 +861,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
if (listenChannels)
{
/* next item from the hash table iterator */
- item = listen_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+ item = listen_iterate(listenChannels->exceptions, (listen_iterator *) funcctx->user_fctx);
/* iterate has skipped over all empty slots for us */
if (item)
SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(item->channel));
@@ -911,6 +936,7 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
+ case LISTEN_LISTEN_ALL:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
@@ -1024,6 +1050,9 @@ AtCommit_Notify(void)
case LISTEN_LISTEN:
Exec_ListenCommit(actrec->channel);
break;
+ case LISTEN_LISTEN_ALL:
+ Exec_ListenAllCommit();
+ break;
case LISTEN_UNLISTEN:
Exec_UnlistenCommit(actrec->channel);
break;
@@ -1035,7 +1064,7 @@ AtCommit_Notify(void)
}
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && (listenChannels->members == 0))
+ if (amRegisteredListener && NoActiveListens())
asyncQueueUnregister();
/*
@@ -1146,7 +1175,8 @@ Exec_ListenPreCommit(void)
LWLockRelease(NotifyQueueLock);
/* Create the structures to manage listens */
- listenChannels = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
+ listenChannels = MemoryContextAllocZero(TopMemoryContext, sizeof(ListenChannels));
+ listenChannels->exceptions = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
/* Now we are listed in the global array, so remember we're listening */
amRegisteredListener = true;
@@ -1175,6 +1205,9 @@ Exec_ListenCommit(const char *channel)
Assert(listenChannels != NULL);
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
+
/*
* Add the new channel name to listenChannels.
*
@@ -1182,10 +1215,33 @@ Exec_ListenCommit(const char *channel)
* which would be bad because we already committed. For the moment it
* doesn't seem worth trying to guard against that, but maybe improve this
* later.
- *
- * Does nothing if we are already listening on this channel.
*/
- listen_insert(listenChannels, channel, &found);
+ if (listenChannels->wantAll)
+ {
+ /* remove any existing exception */
+ listen_delete(listenChannels->exceptions, channel);
+ }
+ else
+ {
+ /* add an explicit exception */
+ (void) listen_insert(listenChannels->exceptions, channel, &found);
+ }
+}
+
+/*
+ * Exec_ListenAllCommit --- subroutine for AtCommit_Notify
+ *
+ * Listen to ALL of the channels.
+ */
+static void
+Exec_ListenAllCommit(void)
+{
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenAlllistenCommit(%d)", MyProcPid);
+
+ /* By definition, 'LISTEN *' resets the world. */
+ ResetListens();
+ listenChannels->wantAll = true;
}
/*
@@ -1196,6 +1252,8 @@ Exec_ListenCommit(const char *channel)
static void
Exec_UnlistenCommit(const char *channel)
{
+ bool found;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
@@ -1203,7 +1261,16 @@ Exec_UnlistenCommit(const char *channel)
if (!listenChannels)
return;
- listen_delete(listenChannels, channel);
+ if (!listenChannels->wantAll)
+ {
+ /* remove any existing exception */
+ listen_delete(listenChannels->exceptions, channel);
+ }
+ else
+ {
+ /* add an explicit exception */
+ (void) listen_insert(listenChannels->exceptions, channel, &found);
+ }
/*
* We do not complain about unlistening something not being listened;
@@ -1226,7 +1293,23 @@ Exec_UnlistenAllCommit(void)
if (!listenChannels)
return;
- listen_reset(listenChannels);
+ ResetListens();
+ listenChannels->wantAll = false;
+}
+
+/*
+ * Test if we have any active listens.
+ */
+static bool
+NoActiveListens(void)
+{
+ /* Determine if we have any active listens, then invert that. */
+ return !(
+ /* Missing data structure? No listens possible. */
+ (listenChannels != NULL) &&
+ /* Want everything? No, want anything? */
+ ((listenChannels->wantAll) || (listenChannels->exceptions->members != 0))
+ );
}
/*
@@ -1237,9 +1320,36 @@ Exec_UnlistenAllCommit(void)
static bool
IsListeningOn(const char *channel)
{
+ bool found;
+
if (listenChannels == NULL)
return false;
- return listen_lookup(listenChannels, channel) != NULL;
+
+ /* determine if we have an entry for it */
+ found = listen_lookup(listenChannels->exceptions, channel) != NULL;
+ /*---------------------
+ * wantAll found result
+ * f f f
+ * f t t
+ * t f t
+ * t t f
+ *---------------------
+ */
+ return listenChannels->wantAll ^ found;
+}
+
+/* Reset the listen exceptions hash.
+ *
+ * Because we allocate the channel key separate from the table we
+ * must free them all before resetting the table.
+ */
+static void
+ResetListens(void)
+{
+ if ((listenChannels == NULL) || (listenChannels->exceptions == NULL))
+ return;
+
+ listen_reset(listenChannels->exceptions);
}
/*
@@ -1249,13 +1359,14 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert((listenChannels == NULL) || (listenChannels->members == 0)); /* else caller error */
+ Assert(NoActiveListens()); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
/* Release our listen_hash data structures */
- listen_destroy(listenChannels);
+ listen_destroy(listenChannels->exceptions);
+ pfree(listenChannels);
listenChannels = NULL;
/*
@@ -1698,7 +1809,7 @@ AtAbort_Notify(void)
* we have registered as a listener but have not made any entry in
* listenChannels. In that case, deregister again.
*/
- if (amRegisteredListener && (listenChannels->members == 0))
+ if (amRegisteredListener && NoActiveListens())
asyncQueueUnregister();
/* And clean up */
@@ -2209,7 +2320,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NULL)
+ if (NoActiveListens())
return;
if (Trace_notify)
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 67eb96396a..c190b29efc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10992,13 +10992,21 @@ notify_payload:
| /*EMPTY*/ { $$ = NULL; }
;
-ListenStmt: LISTEN ColId
+ListenStmt:
+ LISTEN ColId
{
ListenStmt *n = makeNode(ListenStmt);
n->conditionname = $2;
$$ = (Node *) n;
}
+ | LISTEN '*'
+ {
+ ListenStmt *n = makeNode(ListenStmt);
+
+ n->conditionname = NULL;
+ $$ = (Node *) n;
+ }
;
UnlistenStmt:
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index c2ed8214ef..f630251265 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -824,7 +824,10 @@ standard_ProcessUtility(PlannedStmt *pstmt,
errmsg("cannot execute %s within a background process",
"LISTEN")));
- Async_Listen(stmt->conditionname);
+ if (stmt->conditionname)
+ Async_Listen(stmt->conditionname);
+ else
+ Async_ListenAll();
}
break;
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 78daa25fa0..ac6cb6348a 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -29,6 +29,7 @@ extern void NotifyMyFrontEnd(const char *channel,
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
+extern void Async_ListenAll(void);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0f9462493e..58696297f1 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3661,7 +3661,8 @@ typedef struct NotifyStmt
typedef struct ListenStmt
{
NodeTag type;
- char *conditionname; /* condition name to listen on */
+ char *conditionname; /* condition name to listen on, or NULL for
+ * all */
} ListenStmt;
/* ----------------------
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index e01c953ae9..0b91857343 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -41,6 +41,12 @@ UNLISTEN notify_async2;
UNLISTEN notify_async2; -- no-op
--Should work. Ignore unlisten all with no channels
UNLISTEN *;
+--Should work. Allow listening on everything
+LISTEN *;
+LISTEN *;
+UNLISTEN notify_async3;
+LISTEN notify_async3;
+UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 382c80ac2a..4e90750f52 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -22,6 +22,13 @@ UNLISTEN notify_async2; -- no-op
--Should work. Ignore unlisten all with no channels
UNLISTEN *;
+--Should work. Allow listening on everything
+LISTEN *;
+LISTEN *;
+UNLISTEN notify_async3;
+LISTEN notify_async3;
+UNLISTEN *;
+
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();
--
2.43.0
From 151b803346562e0553dd7985d1cb42e6c309d0ab Mon Sep 17 00:00:00 2001
From: "Boudreau, Trey" <t...@treysoft.com>
Date: Tue, 24 Dec 2024 11:18:03 -0600
Subject: [PATCH v1 2/3] Improve LISTEN-NOTIFY-UNLISTEN performance.
Track LISTEN channels with a simplehash.h table instead of a List.
You won't notice a difference in performance unless you have more more
than a handful of channels.
Extend the test case to cover pg_listening_channels() and verify
that unlistening an already unlistened channel has no side effects.
---
src/backend/commands/async.c | 113 +++++++++++++++++-----------
src/test/regress/expected/async.out | 9 +++
src/test/regress/sql/async.sql | 4 +
3 files changed, 81 insertions(+), 45 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8ed503e1c1..54f47f024f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -312,12 +312,38 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
+typedef struct
+{
+ /* needed for simplehash */
+ char *channel;
+ uint32 hash;
+ char status;
+} ListenHashEntry;
+
+#define SH_PREFIX listen
+#define SH_ELEMENT_TYPE ListenHashEntry
+#define SH_KEY channel
+#define SH_KEY_TYPE const char *
+#define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
+#define SH_STORE_HASH
+#define SH_MAKE_KEY(tb, k) MemoryContextStrdup(tb->ctx, k);
+#define SH_UNMAKE_KEY(tb, k) pfree(k)
+#define SH_HASH_KEY(tb, k) hash_bytes((const unsigned char *)k, strlen(k))
+#define SH_GET_HASH(tb, a) a->hash
+#define SH_SCOPE static inline
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
/*
* listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
+ * (ie, have committed a LISTEN on). It is a simplehash.h of channel names,
* allocated in TopMemoryContext.
*/
-static List *listenChannels = NIL; /* list of C strings */
+static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
+
+/* Initialize the hash table with this many elements when we start listening. */
+#define LISTEN_HASH_START_SIZE 16
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
@@ -790,23 +816,30 @@ Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ ListenHashEntry *item;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+ if (listenChannels)
+ {
+ funcctx->user_fctx = MemoryContextAlloc(funcctx->multi_call_memory_ctx, sizeof(listen_iterator));
+ listen_start_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+ }
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (listenChannels)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
-
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ /* next item from the hash table iterator */
+ item = listen_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+ /* iterate has skipped over all empty slots for us */
+ if (item)
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(item->channel));
}
SRF_RETURN_DONE(funcctx);
@@ -1002,7 +1035,7 @@ AtCommit_Notify(void)
}
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && (listenChannels->members == 0))
asyncQueueUnregister();
/*
@@ -1112,6 +1145,9 @@ Exec_ListenPreCommit(void)
}
LWLockRelease(NotifyQueueLock);
+ /* Create the structures to manage listens */
+ listenChannels = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
+
/* Now we are listed in the global array, so remember we're listening */
amRegisteredListener = true;
@@ -1135,11 +1171,9 @@ Exec_ListenPreCommit(void)
static void
Exec_ListenCommit(const char *channel)
{
- MemoryContext oldcontext;
+ bool found;
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
+ Assert(listenChannels != NULL);
/*
* Add the new channel name to listenChannels.
@@ -1148,10 +1182,10 @@ Exec_ListenCommit(const char *channel)
* which would be bad because we already committed. For the moment it
* doesn't seem worth trying to guard against that, but maybe improve this
* later.
+ *
+ * Does nothing if we are already listening on this channel.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ listen_insert(listenChannels, channel, &found);
}
/*
@@ -1162,22 +1196,14 @@ Exec_ListenCommit(const char *channel)
static void
Exec_UnlistenCommit(const char *channel)
{
- ListCell *q;
-
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
- foreach(q, listenChannels)
- {
- char *lchan = (char *) lfirst(q);
+ /* Ignore UNLISTEN when not listening */
+ if (!listenChannels)
+ return;
- if (strcmp(lchan, channel) == 0)
- {
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
- break;
- }
- }
+ listen_delete(listenChannels, channel);
/*
* We do not complain about unlistening something not being listened;
@@ -1196,31 +1222,24 @@ Exec_UnlistenAllCommit(void)
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ /* Ignore UNLISTEN * when not listening */
+ if (!listenChannels)
+ return;
+
+ listen_reset(listenChannels);
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
-
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
-
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ if (listenChannels == NULL)
+ return false;
+ return listen_lookup(listenChannels, channel) != NULL;
}
/*
@@ -1230,11 +1249,15 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert((listenChannels == NULL) || (listenChannels->members == 0)); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
+ /* Release our listen_hash data structures */
+ listen_destroy(listenChannels);
+ listenChannels = NULL;
+
/*
* Need exclusive lock here to manipulate list links.
*/
@@ -1675,7 +1698,7 @@ AtAbort_Notify(void)
* we have registered as a listener but have not made any entry in
* listenChannels. In that case, deregister again.
*/
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener && (listenChannels->members == 0))
asyncQueueUnregister();
/* And clean up */
@@ -2186,7 +2209,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (listenChannels == NULL)
return;
if (Trace_notify)
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38e63..e01c953ae9 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -30,7 +30,16 @@ ERROR: channel name too long
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
LISTEN notify_async2;
+SELECT pg_listening_channels(); -- expect one entry
+ pg_listening_channels
+-----------------------
+ notify_async2
+(1 row)
+
UNLISTEN notify_async2;
+--Should work. Ignore unlistened channels
+UNLISTEN notify_async2; -- no-op
+--Should work. Ignore unlisten all with no channels
UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01538..382c80ac2a 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -15,7 +15,11 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
--Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
NOTIFY notify_async2;
LISTEN notify_async2;
+SELECT pg_listening_channels(); -- expect one entry
UNLISTEN notify_async2;
+--Should work. Ignore unlistened channels
+UNLISTEN notify_async2; -- no-op
+--Should work. Ignore unlisten all with no channels
UNLISTEN *;
-- Should return zero while there are no pending notifications.
--
2.43.0