I implemented a LISTEN command that supports matching names in the LIKE format.

Just like

LISTEN 'c%';
NOTIFY c1;NOTIFY c2;

Notifications are received for c1 and c2.

For grammatical reasons, LISTEN 'v_'; with LISTEN v_; It's weird.

Should it be defined in a way that makes it easier to distinguish?
And support for more matching patterns.

For example
LISTEN [LIKE] 'like_pattern';
LISTEN SIMILAR 'regex_pattern';

--
Zongliang Quan
diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml
index 6c1f09bd455..fe0374a6006 100644
--- a/doc/src/sgml/ref/listen.sgml
+++ b/doc/src/sgml/ref/listen.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-LISTEN <replaceable class="parameter">channel</replaceable>
+LISTEN { <replaceable class="parameter">channel</replaceable> | 
<replaceable>pattern</replaceable> }
 </synopsis>
  </refsynopsisdiv>
 
@@ -30,17 +30,18 @@ LISTEN <replaceable class="parameter">channel</replaceable>
 
   <para>
    <command>LISTEN</command> registers the current session as a
-   listener on the notification channel named <replaceable
-   class="parameter">channel</replaceable>.
+   listener on the notification channels named <replaceable
+   class="parameter">channel</replaceable> or whose name match
+   the <replaceable class="parameter">pattern</replaceable>.
    If the current session is already registered as a listener for
-   this notification channel, nothing is done.
+   these notification channels, nothing is done.
   </para>
 
   <para>
    Whenever the command <command>NOTIFY <replaceable
    class="parameter">channel</replaceable></command> is invoked, either
    by this session or another one connected to the same database, all
-   the sessions currently listening on that notification channel are
+   the sessions currently listening on those notification channels are
    notified, and each will in turn notify its connected client
    application.
   </para>
@@ -77,6 +78,15 @@ LISTEN <replaceable class="parameter">channel</replaceable>
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><replaceable class="parameter">pattern</replaceable></term>
+    <listitem>
+     <para>
+      Pattern of notification channel names (<xref linkend="functions-like"/> 
expression).
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
  </refsect1>
 
@@ -130,6 +140,17 @@ LISTEN <replaceable class="parameter">channel</replaceable>
 LISTEN virtual;
 NOTIFY virtual;
 Asynchronous notification "virtual" received from server process with PID 8448.
+</programlisting></para>
+  <para>
+   Configure and execute a listen pattern from
+   <application>psql</application>:
+
+<programlisting>
+LISTEN 'virtual%';
+NOTIFY virtual0;
+Asynchronous notification "virtual0" received from server process with PID 
8448.
+NOTIFY virtual1;
+Asynchronous notification "virtual1" received from server process with PID 
8448.
 </programlisting></para>
  </refsect1>
 
diff --git a/doc/src/sgml/ref/unlisten.sgml b/doc/src/sgml/ref/unlisten.sgml
index 687bf485c94..332aba50fd2 100644
--- a/doc/src/sgml/ref/unlisten.sgml
+++ b/doc/src/sgml/ref/unlisten.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-UNLISTEN { <replaceable class="parameter">channel</replaceable> | * }
+UNLISTEN { <replaceable class="parameter">channel</replaceable> | <replaceable 
class="parameter">pattern</replaceable> | * }
 </synopsis>
  </refsynopsisdiv>
 
@@ -33,10 +33,11 @@ UNLISTEN { <replaceable 
class="parameter">channel</replaceable> | * }
    registration for <command>NOTIFY</command> events.
    <command>UNLISTEN</command> cancels any existing registration of
    the current <productname>PostgreSQL</productname> session as a
-   listener on the notification channel named <replaceable
-   class="parameter">channel</replaceable>.  The special wildcard
-   <literal>*</literal> cancels all listener registrations for the
-   current session.
+   listener on the notification channels named <replaceable
+   class="parameter">channel</replaceable> or whose name match
+   the <replaceable class="parameter">pattern</replaceable>.
+   The special wildcard <literal>*</literal> cancels all listener
+   registrations for the current session.
   </para>
 
   <para>
@@ -60,6 +61,15 @@ UNLISTEN { <replaceable 
class="parameter">channel</replaceable> | * }
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><replaceable class="parameter">pattern</replaceable></term>
+    <listitem>
+     <para>
+      Pattern of notification channel names (<xref linkend="functions-like"/> 
expression).
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>*</literal></term>
     <listitem>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..5488e50ddf2 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,7 @@
 #include "access/slru.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_database.h"
 #include "commands/async.h"
 #include "common/hashfn.h"
@@ -312,6 +313,12 @@ static SlruCtlData NotifyCtlData;
 
 #define QUEUE_FULL_WARN_INTERVAL       5000    /* warn at most once every 5s */
 
+typedef struct
+{
+       bool    ispatt;
+       char    channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+} ListenChannel;
+
 /*
  * listenChannels identifies the channels we are actually listening to
  * (ie, have committed a LISTEN on).  It is a simple list of channel names,
@@ -339,6 +346,7 @@ typedef enum
 typedef struct
 {
        ListenActionKind action;
+       bool                     ispatt;
        char            channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated 
string */
 } ListenAction;
 
@@ -430,13 +438,13 @@ int                       max_notify_queue_pages = 
1048576;
 /* local function prototypes */
 static inline int64 asyncQueuePageDiff(int64 p, int64 q);
 static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
-static void queue_listen(ListenActionKind action, const char *channel);
+static void queue_listen(ListenActionKind action, const bool ispatt, const 
char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
+static void Exec_ListenCommit(const bool ispatt, const char *channel);
+static void Exec_UnlistenCommit(const bool ispatt, const char *channel);
 static void Exec_UnlistenAllCommit(void);
-static bool IsListeningOn(const char *channel);
+static bool IsListeningOn(const bool trymatch, const bool ispatt, const char 
*channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
 static bool asyncQueueAdvance(volatile QueuePosition *position, int 
entryLength);
@@ -687,7 +695,7 @@ Async_Notify(const char *channel, const char *payload)
  *             commit.
  */
 static void
-queue_listen(ListenActionKind action, const char *channel)
+queue_listen(ListenActionKind action, const bool ispatt, const char *channel)
 {
        MemoryContext oldcontext;
        ListenAction *actrec;
@@ -705,6 +713,7 @@ queue_listen(ListenActionKind action, const char *channel)
        actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
                                                                         
strlen(channel) + 1);
        actrec->action = action;
+       actrec->ispatt = ispatt;
        strcpy(actrec->channel, channel);
 
        if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
@@ -735,12 +744,12 @@ queue_listen(ListenActionKind action, const char *channel)
  *             This is executed by the SQL listen command.
  */
 void
-Async_Listen(const char *channel)
+Async_Listen(const bool ispatt, const char *channel)
 {
        if (Trace_notify)
                elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
 
-       queue_listen(LISTEN_LISTEN, channel);
+       queue_listen(LISTEN_LISTEN, ispatt, channel);
 }
 
 /*
@@ -749,7 +758,7 @@ Async_Listen(const char *channel)
  *             This is executed by the SQL unlisten command.
  */
 void
-Async_Unlisten(const char *channel)
+Async_Unlisten(const bool ispatt, const char *channel)
 {
        if (Trace_notify)
                elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
@@ -758,7 +767,7 @@ Async_Unlisten(const char *channel)
        if (pendingActions == NULL && !unlistenExitRegistered)
                return;
 
-       queue_listen(LISTEN_UNLISTEN, channel);
+       queue_listen(LISTEN_UNLISTEN, ispatt, channel);
 }
 
 /*
@@ -776,7 +785,7 @@ Async_UnlistenAll(void)
        if (pendingActions == NULL && !unlistenExitRegistered)
                return;
 
-       queue_listen(LISTEN_UNLISTEN_ALL, "");
+       queue_listen(LISTEN_UNLISTEN_ALL, false, "");
 }
 
 /*
@@ -803,10 +812,31 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 
        if (funcctx->call_cntr < list_length(listenChannels))
        {
-               char       *channel = (char *) list_nth(listenChannels,
-                                                                               
                funcctx->call_cntr);
+               ListenChannel *chnl;
+
+               chnl = (ListenChannel *)list_nth(listenChannels, 
funcctx->call_cntr);
+
+               if (chnl->ispatt)
+               {
+                       Size plen;
+                       char *result;
+                       MemoryContext oldcontext;
+
+                       oldcontext = 
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+                       plen = strlen(chnl->channel);
+                       result = (char *)palloc(plen + 3);
+                       result[0] = '\'';
+                       memcpy(result + 1, chnl->channel, plen);
+                       result[plen + 1] = '\'';
+                       result[plen + 2] = '\0';
 
-               SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+                       MemoryContextSwitchTo(oldcontext);
+
+                       SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(result));
+               }
+               else
+                       SRF_RETURN_NEXT(funcctx, 
CStringGetTextDatum(chnl->channel));
        }
 
        SRF_RETURN_DONE(funcctx);
@@ -989,10 +1019,10 @@ AtCommit_Notify(void)
                        switch (actrec->action)
                        {
                                case LISTEN_LISTEN:
-                                       Exec_ListenCommit(actrec->channel);
+                                       Exec_ListenCommit(actrec->ispatt, 
actrec->channel);
                                        break;
                                case LISTEN_UNLISTEN:
-                                       Exec_UnlistenCommit(actrec->channel);
+                                       Exec_UnlistenCommit(actrec->ispatt, 
actrec->channel);
                                        break;
                                case LISTEN_UNLISTEN_ALL:
                                        Exec_UnlistenAllCommit();
@@ -1133,12 +1163,13 @@ Exec_ListenPreCommit(void)
  * Add the channel to the list of channels we are listening on.
  */
 static void
-Exec_ListenCommit(const char *channel)
+Exec_ListenCommit(const bool ispatt, const char *channel)
 {
-       MemoryContext oldcontext;
+       MemoryContext   oldcontext;
+       ListenChannel  *chnl;
 
        /* Do nothing if we are already listening on this channel */
-       if (IsListeningOn(channel))
+       if (IsListeningOn(false, ispatt, channel))
                return;
 
        /*
@@ -1150,7 +1181,15 @@ Exec_ListenCommit(const char *channel)
         * later.
         */
        oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-       listenChannels = lappend(listenChannels, pstrdup(channel));
+
+       chnl = (ListenChannel *) palloc(offsetof(ListenChannel, channel) +
+                                                               strlen(channel) 
+ 1);
+
+       chnl->ispatt = ispatt;
+       strcpy(chnl->channel, channel);
+
+       listenChannels = lappend(listenChannels, chnl);
+
        MemoryContextSwitchTo(oldcontext);
 }
 
@@ -1160,7 +1199,7 @@ Exec_ListenCommit(const char *channel)
  * Remove the specified channel name from listenChannels.
  */
 static void
-Exec_UnlistenCommit(const char *channel)
+Exec_UnlistenCommit(const bool ispatt, const char *channel)
 {
        ListCell   *q;
 
@@ -1169,9 +1208,12 @@ Exec_UnlistenCommit(const char *channel)
 
        foreach(q, listenChannels)
        {
-               char       *lchan = (char *) lfirst(q);
+               ListenChannel *lchan = (ListenChannel *) lfirst(q);
+
+               if (lchan->ispatt != ispatt)
+                       continue;
 
-               if (strcmp(lchan, channel) == 0)
+               if (strcmp(lchan->channel, channel) == 0)
                {
                        listenChannels = foreach_delete_current(listenChannels, 
q);
                        pfree(lchan);
@@ -1209,16 +1251,37 @@ Exec_UnlistenAllCommit(void)
  * fairly short, though.
  */
 static bool
-IsListeningOn(const char *channel)
+IsListeningOn(const bool trymatch, const bool ispatt, const char *channel)
 {
        ListCell   *p;
 
        foreach(p, listenChannels)
        {
-               char       *lchan = (char *) lfirst(p);
+               ListenChannel *lchan = (ListenChannel *) lfirst(p);
 
-               if (strcmp(lchan, channel) == 0)
-                       return true;
+               if (trymatch)
+               {
+                       Assert(!ispatt);
+
+                       if (lchan->ispatt)
+                       {
+                               Datum s = 
PointerGetDatum(cstring_to_text(channel));
+                               Datum p = 
PointerGetDatum(cstring_to_text(lchan->channel));
+
+                               if 
(DatumGetBool(DirectFunctionCall2Coll(textlike, DEFAULT_COLLATION_OID, s, p)))
+                                       return true;
+                       }
+                       else if (strcmp(lchan->channel, channel) == 0)
+                               return true;
+               }
+               else
+               {
+                       if (ispatt == lchan->ispatt)
+                       {
+                               if (strcmp(lchan->channel, channel) == 0)
+                                       return true;
+                       }
+               }
        }
        return false;
 }
@@ -2071,7 +2134,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;
 
-                               if (IsListeningOn(channel))
+                               if (IsListeningOn(true, false, channel))
                                {
                                        /* payload follows channel name */
                                        char       *payload = qe->data + 
strlen(channel) + 1;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c6..e4031b4a038 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -11034,6 +11034,14 @@ ListenStmt: LISTEN ColId
                                {
                                        ListenStmt *n = makeNode(ListenStmt);
 
+                                       n->conditionname = $2;
+                                       $$ = (Node *) n;
+                               }
+                       | LISTEN Sconst
+                               {
+                                       ListenStmt *n = makeNode(ListenStmt);
+
+                                       n->ispatt = true;
                                        n->conditionname = $2;
                                        $$ = (Node *) n;
                                }
@@ -11054,6 +11062,14 @@ UnlistenStmt:
                                        n->conditionname = NULL;
                                        $$ = (Node *) n;
                                }
+                       | UNLISTEN Sconst
+                               {
+                                       UnlistenStmt *n = 
makeNode(UnlistenStmt);
+
+                                       n->ispatt = true;
+                                       n->conditionname = $2;
+                                       $$ = (Node *) n;
+                               }
                ;
 
 
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 25fe3d58016..993cc152909 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -824,7 +824,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                                         errmsg("cannot execute 
%s within a background process",
                                                                        
"LISTEN")));
 
-                               Async_Listen(stmt->conditionname);
+                               Async_Listen(stmt->ispatt, stmt->conditionname);
                        }
                        break;
 
@@ -834,7 +834,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
                                CheckRestrictedOperation("UNLISTEN");
                                if (stmt->conditionname)
-                                       Async_Unlisten(stmt->conditionname);
+                                       Async_Unlisten(stmt->ispatt, 
stmt->conditionname);
                                else
                                        Async_UnlistenAll();
                        }
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..4cb7ca38a5e 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -28,8 +28,8 @@ extern void NotifyMyFrontEnd(const char *channel,
 
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
-extern void Async_Listen(const char *channel);
-extern void Async_Unlisten(const char *channel);
+extern void Async_Listen(const bool ispatt, const char *channel);
+extern void Async_Unlisten(const bool ispatt, const char *channel);
 extern void Async_UnlistenAll(void);
 
 /* perform (or cancel) outbound notify processing at transaction commit */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0b208f51bdd..878049ec3b5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3720,6 +3720,7 @@ typedef struct NotifyStmt
 typedef struct ListenStmt
 {
        NodeTag         type;
+       bool            ispatt;                 /* condition name is a pattern 
*/
        char       *conditionname;      /* condition name to listen on */
 } ListenStmt;
 
@@ -3730,6 +3731,7 @@ typedef struct ListenStmt
 typedef struct UnlistenStmt
 {
        NodeTag         type;
+       bool            ispatt;                 /* condition name is a pattern 
*/
        char       *conditionname;      /* name to unlisten on, or NULL for all 
*/
 } UnlistenStmt;
 
diff --git a/src/test/isolation/expected/async-notify.out 
b/src/test/isolation/expected/async-notify.out
index 556e1805893..f7cbfa26128 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 4 sessions
 
 starting permutation: listenc notify1 notify2 notify3 notifyf
 step listenc: LISTEN c1; LISTEN c2;
@@ -104,6 +104,16 @@ step l2commit: COMMIT;
 listener2: NOTIFY "c1" with payload "" from notifier
 step l2stop: UNLISTEN *;
 
+starting permutation: l3listen l3begin notify1 notify2 l3commit l3stop
+step l3listen: LISTEN 'c_';
+step l3begin: BEGIN;
+step notify1: NOTIFY c1;
+step notify2: NOTIFY c2, 'payload';
+step l3commit: COMMIT;
+listener3: NOTIFY "c1" with payload "" from notifier
+listener3: NOTIFY "c2" with payload "payload" from notifier
+step l3stop: UNLISTEN *;
+
 starting permutation: llisten lbegin usage bignotify usage
 step llisten: LISTEN c1; LISTEN c2;
 step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec 
b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..26113b5fe6e 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -53,6 +53,11 @@ step l2begin { BEGIN; }
 step l2commit  { COMMIT; }
 step l2stop            { UNLISTEN *; }
 
+session listener3
+step l3listen  { LISTEN 'c_'; }
+step l3begin   { BEGIN; }
+step l3commit  { COMMIT; }
+step l3stop            { UNLISTEN *; }
 
 # Trivial cases.
 permutation listenc notify1 notify2 notify3 notifyf
@@ -72,6 +77,7 @@ permutation listenc llisten notify1 notify2 notify3 notifyf 
lcheck
 # Check for bug when initial listen is only action in a serializable xact,
 # and notify queue is not empty
 permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+permutation l3listen l3begin notify1 notify2 l3commit l3stop
 
 # Verify that pg_notification_queue_usage correctly reports a non-zero result,
 # after submitting notifications while another connection is listening for
diff --git a/src/test/regress/expected/async.out 
b/src/test/regress/expected/async.out
index 19cbe38e636..40d51399c9b 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -31,6 +31,22 @@ ERROR:  channel name too long
 NOTIFY notify_async2;
 LISTEN notify_async2;
 UNLISTEN notify_async2;
+UNLISTEN *;
+NOTIFY notify_async100;
+NOTIFY notify_async200;
+LISTEN 'notify_async%';
+SELECT pg_listening_channels();
+ pg_listening_channels 
+-----------------------
+ 'notify_async%'
+(1 row)
+
+UNLISTEN 'notify_async%';
+SELECT pg_listening_channels();
+ pg_listening_channels 
+-----------------------
+(0 rows)
+
 UNLISTEN *;
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e015387..8f2bd716bda 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -18,6 +18,14 @@ LISTEN notify_async2;
 UNLISTEN notify_async2;
 UNLISTEN *;
 
+NOTIFY notify_async100;
+NOTIFY notify_async200;
+LISTEN 'notify_async%';
+SELECT pg_listening_channels();
+UNLISTEN 'notify_async%';
+SELECT pg_listening_channels();
+UNLISTEN *;
+
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();

Reply via email to