 src/backend/commands/async.c                   | 291 +++++++++++++++++++++----
 src/backend/nodes/copyfuncs.c                  |   5 +-
 src/backend/nodes/equalfuncs.c                 |   5 +-
 src/backend/parser/gram.y                      |  20 +-
 src/backend/tcop/utility.c                     |   6 +-
 src/include/commands/async.h                   |   4 +-
 src/include/nodes/parsenodes.h                 |   5 +-
 src/test/isolation/expected/async-notify-2.out | 118 ++++++++++
 src/test/isolation/isolation_schedule          |   1 +
 src/test/isolation/isolationtester.c           |  36 ++-
 src/test/isolation/specs/async-notify-2.spec   |  31 +++
 src/test/regress/expected/async.out            |  45 +++-
 src/test/regress/sql/async.sql                 |  26 ++-
 13 files changed, 530 insertions(+), 63 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bacc08e..51acf44 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -138,7 +138,10 @@
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/timestamp.h"
-
+#include "utils/elog.h"
+#include "regex/regex.h"
+#include "catalog/pg_collation.h"
+#include "miscadmin.h"
 
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
@@ -286,9 +289,19 @@ static SlruCtlData AsyncCtlData;
  */
 #define QUEUE_MAX_PAGE			(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
 
+ /*
+ * Currently listened channel consisting of compiled RE
+ * used for pattern listeners and the pattern send by the user. 
+ */
+typedef struct ListenChannel
+{
+	regex_t		*compiledRegex;
+	char		userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+} ListenChannel;
+
 /*
  * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
+ * (ie, have committed a LISTEN on).  It is a list of ListenChannel,
  * allocated in TopMemoryContext.
  */
 static List *listenChannels = NIL;	/* list of C strings */
@@ -313,7 +326,9 @@ typedef enum
 typedef struct
 {
 	ListenActionKind action;
-	char		channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+	bool			 actionApplied;
+	regex_t			*compiledRegex;
+	char			 userPattern[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
 } ListenAction;
 
 static List *pendingActions = NIL;	/* list of ListenAction */
@@ -369,12 +384,12 @@ bool		Trace_notify = false;
 
 /* local function prototypes */
 static bool asyncQueuePagePrecedes(int p, int q);
-static void queue_listen(ListenActionKind action, const char *channel);
+static void queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static bool Exec_ListenCommit(const char *pattern, regex_t *compiledRegex);
+static bool Exec_UnlistenCommit(const char *pattern);
+static bool Exec_UnlistenAllCommit(void);
 static bool IsListeningOn(const char *channel);
 static void asyncQueueUnregister(void);
 static bool asyncQueueIsFull(void);
@@ -594,6 +609,36 @@ Async_Notify(const char *channel, const char *payload)
 }
 
 /*
+* compile_regex
+*		Compiles RE pattern into a compiled RE.
+*
+*		Returns result code from pg_regcomp.
+*/
+static int
+compile_regex(const char *pattern, regex_t *compiled_regex)
+{
+	pg_wchar	*wcharpattern;
+	int			resregcomp;
+	int			lenwchar;
+	int			lenpattern = strlen(pattern);
+
+	wcharpattern = (pg_wchar *)palloc((lenpattern + 1) * sizeof(pg_wchar));
+	lenwchar = pg_mb2wchar_with_len(pattern,
+									wcharpattern,
+									lenpattern);
+
+	resregcomp = pg_regcomp(compiled_regex,
+							wcharpattern,
+							lenwchar,
+							REG_ADVANCED,
+							DEFAULT_COLLATION_OID);
+
+	pfree(wcharpattern);
+
+	return resregcomp;
+}
+
+/*
  * queue_listen
  *		Common code for listen, unlisten, unlisten all commands.
  *
@@ -602,10 +647,15 @@ Async_Notify(const char *channel, const char *payload)
  *		commit.
  */
 static void
-queue_listen(ListenActionKind action, const char *channel)
+queue_listen(ListenActionKind action, const char *pattern, bool isSimilarToPattern)
 {
 	MemoryContext oldcontext;
 	ListenAction *actrec;
+	regex_t		 compreg;
+	regex_t		 *pcompreg;
+	int			 resregcomp;
+	char		 errormsg[100];
+	Datum		 datum;
 
 	/*
 	 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
@@ -615,11 +665,44 @@ queue_listen(ListenActionKind action, const char *channel)
 	 */
 	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
 
+	if (isSimilarToPattern)
+	{
+		/* convert to regex pattern */
+		datum = DirectFunctionCall1(similar_escape, CStringGetTextDatum(pattern));
+
+		/*
+		* Regex pattern is now compiled to ensure any errors are captured at this point.
+		* Compiled regex is copied to top memory context when we reach transaction commit.
+		* If compiled RE was not applied as a listener then it is freed at transaction commit.
+		*/
+		resregcomp = compile_regex(TextDatumGetCString(datum), &compreg);
+
+		if (resregcomp != REG_OKAY)
+		{
+			MemoryContextSwitchTo(oldcontext);
+
+			CHECK_FOR_INTERRUPTS();
+			pg_regerror(resregcomp, &compreg, errormsg, sizeof(errormsg));
+			ereport(ERROR,
+				(errcode(ERRCODE_INVALID_REGULAR_EXPRESSION),
+					errmsg("invalid regular expression: %s", errormsg)));
+		}
+
+		pcompreg = palloc(sizeof(regex_t));
+		memcpy(pcompreg, &compreg, sizeof(regex_t));
+	}
+	else
+	{
+		pcompreg = NULL;
+	}
+
 	/* space for terminating null is included in sizeof(ListenAction) */
-	actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
-									 strlen(channel) + 1);
+	actrec = (ListenAction *) palloc(offsetof(ListenAction, userPattern) +
+									 strlen(pattern) + 1);
 	actrec->action = action;
-	strcpy(actrec->channel, channel);
+	actrec->actionApplied = false;
+	actrec->compiledRegex = pcompreg;
+	strcpy(actrec->userPattern, pattern);
 
 	pendingActions = lappend(pendingActions, actrec);
 
@@ -632,12 +715,12 @@ queue_listen(ListenActionKind action, const char *channel)
  *		This is executed by the SQL listen command.
  */
 void
-Async_Listen(const char *channel)
+Async_Listen(const char *pattern, bool isSimilarToPattern)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Listen(%s,%d)", pattern, MyProcPid);
 
-	queue_listen(LISTEN_LISTEN, channel);
+	queue_listen(LISTEN_LISTEN, pattern, isSimilarToPattern);
 }
 
 /*
@@ -646,16 +729,16 @@ Async_Listen(const char *channel)
  *		This is executed by the SQL unlisten command.
  */
 void
-Async_Unlisten(const char *channel)
+Async_Unlisten(const char *pattern)
 {
 	if (Trace_notify)
-		elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Async_Unlisten(%s,%d)", pattern, MyProcPid);
 
 	/* If we couldn't possibly be listening, no need to queue anything */
 	if (pendingActions == NIL && !unlistenExitRegistered)
 		return;
 
-	queue_listen(LISTEN_UNLISTEN, channel);
+	queue_listen(LISTEN_UNLISTEN, pattern, false);
 }
 
 /*
@@ -673,7 +756,7 @@ Async_UnlistenAll(void)
 	if (pendingActions == NIL && !unlistenExitRegistered)
 		return;
 
-	queue_listen(LISTEN_UNLISTEN_ALL, "");
+	queue_listen(LISTEN_UNLISTEN_ALL, "", false);
 }
 
 /*
@@ -714,10 +797,10 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 
 	while (*lcp != NULL)
 	{
-		char	   *channel = (char *) lfirst(*lcp);
+		ListenChannel	   *channel = (ListenChannel *) lfirst(*lcp);
 
 		*lcp = lnext(*lcp);
-		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel->userPattern));
 	}
 
 	SRF_RETURN_DONE(funcctx);
@@ -890,13 +973,13 @@ AtCommit_Notify(void)
 		switch (actrec->action)
 		{
 			case LISTEN_LISTEN:
-				Exec_ListenCommit(actrec->channel);
+				actrec->actionApplied = Exec_ListenCommit(actrec->userPattern, actrec->compiledRegex);
 				break;
 			case LISTEN_UNLISTEN:
-				Exec_UnlistenCommit(actrec->channel);
+				actrec->actionApplied = Exec_UnlistenCommit(actrec->userPattern);
 				break;
 			case LISTEN_UNLISTEN_ALL:
-				Exec_UnlistenAllCommit();
+				actrec->actionApplied = Exec_UnlistenAllCommit();
 				break;
 		}
 	}
@@ -1000,14 +1083,23 @@ Exec_ListenPreCommit(void)
  *
  * Add the channel to the list of channels we are listening on.
  */
-static void
-Exec_ListenCommit(const char *channel)
+static bool
+Exec_ListenCommit(const char *pattern, regex_t *compiledRegex)
 {
+	ListCell	  *p;
 	MemoryContext oldcontext;
+	ListenChannel *lchan;
+	regex_t		  *copiedcompreg;
 
-	/* Do nothing if we are already listening on this channel */
-	if (IsListeningOn(channel))
-		return;
+	/* Do nothing if we are already using this pattern for listening */
+
+	foreach(p, listenChannels)
+	{
+		ListenChannel	   *lchan = (ListenChannel *)lfirst(p);
+
+		if (strcmp(lchan->userPattern, pattern) == 0)
+			return false;
+	}
 
 	/*
 	 * Add the new channel name to listenChannels.
@@ -1017,9 +1109,31 @@ Exec_ListenCommit(const char *channel)
 	 * doesn't seem worth trying to guard against that, but maybe improve this
 	 * later.
 	 */
+
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
+
+	if (compiledRegex != NULL)
+	{
+		/* copy the compiled RE to top memory context */
+
+		copiedcompreg = (regex_t *)palloc(sizeof(regex_t));
+		memcpy(copiedcompreg, compiledRegex, sizeof(regex_t));
+	}
+	else
+	{
+		copiedcompreg = NULL;
+	}
+
+	lchan = (ListenChannel *)palloc(offsetof(ListenChannel, userPattern) + 
+									strlen(pattern) + 1);
+	lchan->compiledRegex = copiedcompreg;
+	strcpy(lchan->userPattern, pattern);
+
+	listenChannels = lappend(listenChannels, lchan);
+
 	MemoryContextSwitchTo(oldcontext);
+
+	return true;
 }
 
 /*
@@ -1027,24 +1141,35 @@ Exec_ListenCommit(const char *channel)
  *
  * Remove the specified channel name from listenChannels.
  */
-static void
-Exec_UnlistenCommit(const char *channel)
+static bool
+Exec_UnlistenCommit(const char *pattern)
 {
 	ListCell   *q;
 	ListCell   *prev;
+	bool	   found;
 
 	if (Trace_notify)
-		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", pattern, MyProcPid);
 
+	found = false;
 	prev = NULL;
 	foreach(q, listenChannels)
 	{
-		char	   *lchan = (char *) lfirst(q);
+		ListenChannel	   *lchan = (ListenChannel *) lfirst(q);
 
-		if (strcmp(lchan, channel) == 0)
+		if (strcmp(lchan->userPattern, pattern) == 0)
 		{
+			if (lchan->compiledRegex != NULL)
+			{
+				pg_regfree(lchan->compiledRegex);
+				pfree(lchan->compiledRegex);
+			}
+
 			listenChannels = list_delete_cell(listenChannels, q, prev);
+
 			pfree(lchan);
+
+			found = true;
 			break;
 		}
 		prev = q;
@@ -1054,6 +1179,8 @@ Exec_UnlistenCommit(const char *channel)
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
 	 */
+
+	return found;
 }
 
 /*
@@ -1061,14 +1188,34 @@ Exec_UnlistenCommit(const char *channel)
  *
  *		Unlisten on all channels for this backend.
  */
-static void
+static bool
 Exec_UnlistenAllCommit(void)
 {
+	ListCell	*p;
+	bool		is_empty;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
+	is_empty = true;
+
+	foreach(p, listenChannels)
+	{
+		ListenChannel		*lchan = (ListenChannel *)lfirst(p);
+
+		if (lchan->compiledRegex != NULL)
+		{
+			pg_regfree(lchan->compiledRegex);
+			pfree(lchan->compiledRegex);
+		}
+
+		is_empty = false;
+	}
+
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
+	
+	return is_empty;
 }
 
 /*
@@ -1163,16 +1310,66 @@ ProcessCompletedNotifies(void)
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
+	ListCell	*p;
+	pg_wchar	*wcharchannel;
+	int			lenwchar;
+	int			resregexec;
+	char		errormsg[100];
+	bool		matches;
+
+	wcharchannel = NULL;
+	matches = false;
 
 	foreach(p, listenChannels)
 	{
-		char	   *lchan = (char *) lfirst(p);
+		ListenChannel		*lchan = (ListenChannel *) lfirst(p);
 
-		if (strcmp(lchan, channel) == 0)
-			return true;
+		if (lchan->compiledRegex == NULL)
+		{
+			if (strcmp(lchan->userPattern, channel) == 0)
+			{
+				matches = true;
+				break;
+			}
+		}
+		else
+		{
+			if (wcharchannel == NULL)
+			{
+				/* Convert channel string to wide characters */
+				wcharchannel = (pg_wchar *)palloc((strlen(channel) + 1) * sizeof(pg_wchar));
+				lenwchar = pg_mb2wchar_with_len(channel, wcharchannel, strlen(channel));
+			}
+
+			/* Check RE match */
+			resregexec = pg_regexec(lchan->compiledRegex, wcharchannel, lenwchar, 0, NULL, 0, NULL, 0);
+
+			if (resregexec != REG_OKAY && resregexec != REG_NOMATCH)
+			{
+				pfree(wcharchannel);
+				wcharchannel = NULL;
+
+				CHECK_FOR_INTERRUPTS();
+				pg_regerror(resregexec, lchan->compiledRegex, errormsg, sizeof(errormsg));
+				ereport(ERROR,
+					(errcode(ERRCODE_INVALID_REGULAR_EXPRESSION),
+						errmsg("regular expression failed: %s", errormsg)));
+			}
+
+			if (resregexec == REG_OKAY)
+			{
+				matches = true;
+				break;
+			}
+		}
 	}
-	return false;
+
+	if (wcharchannel != NULL)
+	{
+		pfree(wcharchannel);
+	}
+
+	return matches;
 }
 
 /*
@@ -2149,6 +2346,20 @@ AsyncExistsPendingNotify(const char *channel, const char *payload)
 static void
 ClearPendingActionsAndNotifies(void)
 {
+	ListCell   *p;
+
+	/* free compiled REs that were not added as new listeners */
+	foreach(p, pendingActions)
+	{
+		ListenAction *actrec = (ListenAction *)lfirst(p);
+
+		if (!actrec->actionApplied && actrec->compiledRegex != NULL)
+		{
+			pg_regfree(actrec->compiledRegex);
+			pfree(actrec->compiledRegex);
+		}
+	}
+
 	/*
 	 * We used to have to explicitly deallocate the list members and nodes,
 	 * because they were malloc'd.  Now, since we know they are palloc'd in
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 45a04b0..dd1d1ef 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3545,7 +3545,8 @@ _copyListenStmt(const ListenStmt *from)
 {
 	ListenStmt *newnode = makeNode(ListenStmt);
 
-	COPY_STRING_FIELD(conditionname);
+	COPY_STRING_FIELD(pattern);
+	COPY_SCALAR_FIELD(isSimilarToPattern);
 
 	return newnode;
 }
@@ -3555,7 +3556,7 @@ _copyUnlistenStmt(const UnlistenStmt *from)
 {
 	UnlistenStmt *newnode = makeNode(UnlistenStmt);
 
-	COPY_STRING_FIELD(conditionname);
+	COPY_STRING_FIELD(pattern);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 8d92c03..a3ac36b 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1482,7 +1482,8 @@ _equalNotifyStmt(const NotifyStmt *a, const NotifyStmt *b)
 static bool
 _equalListenStmt(const ListenStmt *a, const ListenStmt *b)
 {
-	COMPARE_STRING_FIELD(conditionname);
+	COMPARE_STRING_FIELD(pattern);
+	COMPARE_SCALAR_FIELD(isSimilarToPattern);
 
 	return true;
 }
@@ -1490,7 +1491,7 @@ _equalListenStmt(const ListenStmt *a, const ListenStmt *b)
 static bool
 _equalUnlistenStmt(const UnlistenStmt *a, const UnlistenStmt *b)
 {
-	COMPARE_STRING_FIELD(conditionname);
+	COMPARE_STRING_FIELD(pattern);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4b1ce09..b7a4cf4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9451,7 +9451,15 @@ notify_payload:
 ListenStmt: LISTEN ColId
 				{
 					ListenStmt *n = makeNode(ListenStmt);
-					n->conditionname = $2;
+					n->pattern = $2;
+					n->isSimilarToPattern = false;
+					$$ = (Node *)n;
+				}
+			| LISTEN SIMILAR TO Sconst
+				{
+					ListenStmt *n = makeNode(ListenStmt);
+					n->pattern = $4;
+					n->isSimilarToPattern = true;
 					$$ = (Node *)n;
 				}
 		;
@@ -9460,13 +9468,19 @@ UnlistenStmt:
 			UNLISTEN ColId
 				{
 					UnlistenStmt *n = makeNode(UnlistenStmt);
-					n->conditionname = $2;
+					n->pattern = $2;
+					$$ = (Node *)n;
+				}
+			| UNLISTEN Sconst
+				{
+					UnlistenStmt *n = makeNode(UnlistenStmt);
+					n->pattern = $2;
 					$$ = (Node *)n;
 				}
 			| UNLISTEN '*'
 				{
 					UnlistenStmt *n = makeNode(UnlistenStmt);
-					n->conditionname = NULL;
+					n->pattern = NULL;
 					$$ = (Node *)n;
 				}
 		;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ddacac8..9251ad2 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -630,7 +630,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
 				PreventCommandDuringRecovery("LISTEN");
 				CheckRestrictedOperation("LISTEN");
-				Async_Listen(stmt->conditionname);
+				Async_Listen(stmt->pattern, stmt->isSimilarToPattern);
 			}
 			break;
 
@@ -640,8 +640,8 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
 				PreventCommandDuringRecovery("UNLISTEN");
 				CheckRestrictedOperation("UNLISTEN");
-				if (stmt->conditionname)
-					Async_Unlisten(stmt->conditionname);
+				if (stmt->pattern)
+					Async_Unlisten(stmt->pattern);
 				else
 					Async_UnlistenAll();
 			}
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index 939711d..f86a482 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -34,8 +34,8 @@ extern void NotifyMyFrontEnd(const char *channel,
 
 /* notify-related SQL statements */
 extern void Async_Notify(const char *channel, const char *payload);
-extern void Async_Listen(const char *channel);
-extern void Async_Unlisten(const char *channel);
+extern void Async_Listen(const char *pattern, bool isSimilarToPattern);
+extern void Async_Unlisten(const char *pattern);
 extern void Async_UnlistenAll(void);
 
 /* perform (or cancel) outbound notify processing at transaction commit */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 5f2a4a7..3f9fe9d 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2889,7 +2889,8 @@ typedef struct NotifyStmt
 typedef struct ListenStmt
 {
 	NodeTag		type;
-	char	   *conditionname;	/* condition name to listen on */
+	bool		isSimilarToPattern;
+	char	   *pattern;	/* condition pattern to listen on */
 } ListenStmt;
 
 /* ----------------------
@@ -2899,7 +2900,7 @@ typedef struct ListenStmt
 typedef struct UnlistenStmt
 {
 	NodeTag		type;
-	char	   *conditionname;	/* name to unlisten on, or NULL for all */
+	char	   *pattern;	/* condition pattern to unlisten on, or NULL for all */
 } UnlistenStmt;
 
 /* ----------------------
diff --git a/src/test/isolation/expected/async-notify-2.out b/src/test/isolation/expected/async-notify-2.out
new file mode 100644
index 0000000..081cae3
--- /dev/null
+++ b/src/test/isolation/expected/async-notify-2.out
@@ -0,0 +1,118 @@
+Parsed test spec with 2 sessions
+
+starting permutation: listen_normal notify consume
+step listen_normal: LISTEN test;
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+
+starting permutation: listen_pattern_1 notify consume
+step listen_pattern_1: LISTEN SIMILAR TO 'te%';
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+ASYNC NOTIFY of 'test_2' with payload '1' received
+ASYNC NOTIFY of 'test_2' with payload '2' received
+ASYNC NOTIFY of 'test_2' with payload '3' received
+ASYNC NOTIFY of 'test_2' with payload '4' received
+ASYNC NOTIFY of 'test_2' with payload '5' received
+
+starting permutation: listen_pattern_2 notify consume
+step listen_pattern_2: LISTEN SIMILAR TO 'test';
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+
+starting permutation: listen_pattern_3 notify consume
+step listen_pattern_3: LISTEN SIMILAR TO 'te';
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+
+starting permutation: listen_pattern_invalid notify consume
+step listen_pattern_invalid: LISTEN SIMILAR TO '*';
+ERROR:  invalid regular expression: quantifier operand invalid
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+
+starting permutation: listen_normal listen_pattern_1 unlisten_1 notify consume
+step listen_normal: LISTEN test;
+step listen_pattern_1: LISTEN SIMILAR TO 'te%';
+step unlisten_1: UNLISTEN 't%';
+step notify: 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+
+count          
+
+5              
+count          
+
+5              
+step consume: BEGIN; END;
+ASYNC NOTIFY of 'test' with payload '1' received
+ASYNC NOTIFY of 'test' with payload '2' received
+ASYNC NOTIFY of 'test' with payload '3' received
+ASYNC NOTIFY of 'test' with payload '4' received
+ASYNC NOTIFY of 'test' with payload '5' received
+ASYNC NOTIFY of 'test_2' with payload '1' received
+ASYNC NOTIFY of 'test_2' with payload '2' received
+ASYNC NOTIFY of 'test_2' with payload '3' received
+ASYNC NOTIFY of 'test_2' with payload '4' received
+ASYNC NOTIFY of 'test_2' with payload '5' received
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 32c965b..a25cdeb 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -60,5 +60,6 @@ test: alter-table-3
 test: create-trigger
 test: sequence-ddl
 test: async-notify
+test: async-notify-2
 test: vacuum-reltuples
 test: timeouts
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index ba8082c..b1eb17d 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -46,7 +46,8 @@ static bool try_complete_step(Step *step, int flags);
 static int	step_qsort_cmp(const void *a, const void *b);
 static int	step_bsearch_cmp(const void *a, const void *b);
 
-static void printResultSet(PGresult *res);
+static void printResultSet(PGresult *res, PGconn *conn);
+static void printAsyncNotify(PGconn *conn);
 
 /* close all connections and exit */
 static void
@@ -487,7 +488,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
 		res = PQexec(conns[0], testspec->setupsqls[i]);
 		if (PQresultStatus(res) == PGRES_TUPLES_OK)
 		{
-			printResultSet(res);
+			printResultSet(res, conns[i + 1]);
 		}
 		else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
@@ -505,7 +506,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
 			res = PQexec(conns[i + 1], testspec->sessions[i]->setupsql);
 			if (PQresultStatus(res) == PGRES_TUPLES_OK)
 			{
-				printResultSet(res);
+				printResultSet(res, conns[i + 1]);
 			}
 			else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -640,7 +641,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
 			res = PQexec(conns[i + 1], testspec->sessions[i]->teardownsql);
 			if (PQresultStatus(res) == PGRES_TUPLES_OK)
 			{
-				printResultSet(res);
+				printResultSet(res, conns[i + 1]);
 			}
 			else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -659,7 +660,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
 		res = PQexec(conns[0], testspec->teardownsql);
 		if (PQresultStatus(res) == PGRES_TUPLES_OK)
 		{
-			printResultSet(res);
+			printResultSet(res, conns[0]);
 		}
 		else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
@@ -821,9 +822,10 @@ try_complete_step(Step *step, int flags)
 		switch (PQresultStatus(res))
 		{
 			case PGRES_COMMAND_OK:
+				printAsyncNotify(conn);
 				break;
 			case PGRES_TUPLES_OK:
-				printResultSet(res);
+				printResultSet(res, conn);
 				break;
 			case PGRES_FATAL_ERROR:
 				if (step->errormsg != NULL)
@@ -860,7 +862,7 @@ try_complete_step(Step *step, int flags)
 }
 
 static void
-printResultSet(PGresult *res)
+printResultSet(PGresult *res, PGconn *conn)
 {
 	int			nFields;
 	int			i,
@@ -879,4 +881,24 @@ printResultSet(PGresult *res)
 			printf("%-15s", PQgetvalue(res, i, j));
 		printf("\n");
 	}
+
+	printAsyncNotify(conn);
+}
+
+static void
+printAsyncNotify(PGconn *conn)
+{
+	PGnotify   *notify;
+
+	while ((notify = PQnotifies(conn)) != NULL)
+	{
+		if (notify->extra[0])
+			printf("ASYNC NOTIFY of '%s' with payload '%s' received\n",
+				notify->relname, notify->extra);
+		else
+			printf("ASYNC NOTIFY of '%s' received\n",
+				notify->relname);
+
+		PQfreemem(notify);
+	}
 }
diff --git a/src/test/isolation/specs/async-notify-2.spec b/src/test/isolation/specs/async-notify-2.spec
new file mode 100644
index 0000000..dd1d38f
--- /dev/null
+++ b/src/test/isolation/specs/async-notify-2.spec
@@ -0,0 +1,31 @@
+# Verify that messages are consumed from the notify queue.
+
+session "listener" 
+step "listen_normal"			{ LISTEN test; }
+step "listen_pattern_1"			{ LISTEN SIMILAR TO 'te%'; }
+step "listen_pattern_2"			{ LISTEN SIMILAR TO 'test'; }
+step "listen_pattern_3"			{ LISTEN SIMILAR TO 'te'; }
+step "listen_pattern_invalid"	{ LISTEN SIMILAR TO '*'; }
+step "unlisten_1"				{ UNLISTEN 't%'; }
+step "consume"					{ BEGIN; END; }
+teardown						{ UNLISTEN *; }
+
+session "notifier"
+step "notify"	
+{ 
+ SELECT count(pg_notify('test', s::text)) FROM generate_series(1, 5) s;
+ SELECT count(pg_notify('test_2', s::text)) FROM generate_series(1, 5) s;
+}
+
+# Should print first notify channel
+permutation "listen_normal" "notify" "consume"
+# Should print both notify channels
+permutation "listen_pattern_1" "notify" "consume"
+# Should print first notify channel
+permutation "listen_pattern_2" "notify" "consume"
+# Should not print either notify channels
+permutation "listen_pattern_3" "notify" "consume"
+# Should fail to invalid RE pattern
+permutation "listen_pattern_invalid" "notify" "consume"
+# Test that UNLISTEN with a pattern does not work as a RE matcher
+permutation "listen_normal" "listen_pattern_1" "unlisten_1" "notify" "consume"
\ No newline at end of file
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38..963b1d0 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -27,11 +27,54 @@ SELECT pg_notify(NULL,'sample message1');
 ERROR:  channel name cannot be empty
 SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
 ERROR:  channel name too long
---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- src/test/isolation/specs/async-notify-2.spec tests for actual usage.
 NOTIFY notify_async2;
 LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_%';
 UNLISTEN notify_async2;
+UNLISTEN 'notify_%';
+UNLISTEN 'notify_(%';
 UNLISTEN *;
+-- Should fail. Invalid LISTEN command
+LISTEN *;
+ERROR:  syntax error at or near "*"
+LINE 1: LISTEN *;
+               ^
+LISTEN notify_%;
+ERROR:  syntax error at or near "%"
+LINE 1: LISTEN notify_%;
+                      ^
+LISTEN SIMILAR TO 'notify_(%';
+ERROR:  invalid regular expression: parentheses () not balanced
+LISTEN SIMILAR TO '*';
+ERROR:  invalid regular expression: quantifier operand invalid
+-- Should contain two listeners
+LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_async2';
+LISTEN SIMILAR TO 'notify_%';
+SELECT pg_listening_channels();
+ pg_listening_channels 
+-----------------------
+ notify_async2
+ notify_%
+(2 rows)
+
+-- Should contain one listener
+UNLISTEN 'notify_%';
+SELECT pg_listening_channels();
+ pg_listening_channels 
+-----------------------
+ notify_async2
+(1 row)
+
+-- Should not contain listeners
+UNLISTEN *;
+SELECT pg_listening_channels();
+ pg_listening_channels 
+-----------------------
+(0 rows)
+
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01..69d8f98 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -12,12 +12,36 @@ SELECT pg_notify('','sample message1');
 SELECT pg_notify(NULL,'sample message1');
 SELECT pg_notify('notify_async_channel_name_too_long______________________________','sample_message1');
 
---Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
+-- src/test/isolation/specs/async-notify-2.spec tests for actual usage.
 NOTIFY notify_async2;
 LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_%';
 UNLISTEN notify_async2;
+UNLISTEN 'notify_%';
+UNLISTEN 'notify_(%';
 UNLISTEN *;
 
+-- Should fail. Invalid LISTEN command
+LISTEN *;
+LISTEN notify_%;
+LISTEN SIMILAR TO 'notify_(%';
+LISTEN SIMILAR TO '*';
+
+-- Should contain two listeners
+LISTEN notify_async2;
+LISTEN SIMILAR TO 'notify_async2';
+LISTEN SIMILAR TO 'notify_%';
+SELECT pg_listening_channels();
+
+-- Should contain one listener
+UNLISTEN 'notify_%';
+SELECT pg_listening_channels();
+
+-- Should not contain listeners
+UNLISTEN *;
+SELECT pg_listening_channels();
+
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
