Hello, Euler.

At Mon, 8 Jun 2020 07:51:18 -0300, Euler Taveira 
<euler.tave...@2ndquadrant.com> wrote in 
> On Mon, 8 Jun 2020 at 05:27, Kyotaro Horiguchi <horikyota....@gmail.com>
> wrote:
> 
> >
> > That can be fixed by calling ProcessCompletedNotifies() in
> > apply_handle_commit. The function has a code to write out
> > notifications to connected clients but it doesn't nothing on logical
> > replication workers.
> >
> >
> This bug was already reported some time ago (#15293) but it slipped through
> the
> cracks. I don't think you should simply call ProcessCompletedNotifies [1].

Yeah, Thanks for pointing that. I faintly thought of a similar thing
to the discussion there. Just calling ProcessCompletedNotifies in
apply_handle_commit is actually wrong.

We can move only SignalBackends() to AtCommit_Notify since
asyncQueueAdvanceTail() is no longer dependent on the result of
SignalBackends, but anyway we need to call asyncQueueAdvanceTail in
AtCommit_Notify and AtAbort_Notify since otherwise the queue cannot be
shorten while running logical replication. This can slightly defers
tail-advancing but I think it wouldn't be a significant problem.

> [1] https://www.postgresql.org/message-id/13844.1532468610%40sss.pgh.pa.us

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From c3aa3e584cf57632284dc9b282dd635c418f3084 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga....@gmail.com>
Date: Tue, 9 Jun 2020 14:01:34 +0900
Subject: [PATCH v2] Fix notification signaling

Notifications are signaled at command loop. That prevents logical
replication apply loop from signaling properly.  To fix, send signal
in AtCommit_Notify instead of the top-level command loop.

Discussion: https://www.postgresql.org/message-id/13844.1532468610%40sss.pgh.pa.us
Discussion: https://www.postgresql.org/message-id/20200608.172730.68580977059033.horikyota.ntt%40gmail.com
---
 src/backend/commands/async.c | 103 +++++++++++++++++++++--------------
 1 file changed, 63 insertions(+), 40 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 71b7577afc..590ad7fcc8 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -449,7 +449,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static void SignalBackends(void);
+static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
@@ -976,7 +976,8 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
+ *		Update listenChannels and clear transaction-local state. Send signals
+ *		for notifications to other backends to process them.
  */
 void
 AtCommit_Notify(void)
@@ -1021,6 +1022,29 @@ AtCommit_Notify(void)
 
 	/* And clean up */
 	ClearPendingActionsAndNotifies();
+
+	/* signal our notifications to other backends */
+	if (backendHasSentNotifications)
+	{
+		/*
+		 * No use reading the queue at idle time later if this backend is not a
+		 * listener.
+		 */
+		if (!SignalBackends())
+			backendHasSentNotifications = false;
+
+		/*
+		 * If it's time to try to advance the global tail pointer, do that. We
+		 * need do this here in case where many transactions are committed
+		 * without returning to the top-level loop, like logical replication
+		 * apply loop.
+		 */
+		if (backendTryAdvanceTail)
+		{
+			backendTryAdvanceTail = false;
+			asyncQueueAdvanceTail();
+		}
+	}
 }
 
 /*
@@ -1196,10 +1220,8 @@ Exec_UnlistenAllCommit(void)
  *
  * This is called from postgres.c just before going idle at the completion
  * of a transaction.  If we issued any notifications in the just-completed
- * transaction, send signals to other backends to process them, and also
- * process the queue ourselves to send messages to our own frontend.
- * Also, if we filled enough queue pages with new notifies, try to advance
- * the queue tail pointer.
+ * transaction, process the queue ourselves to send messages to our own
+ * frontend.
  *
  * The reason that this is not done in AtCommit_Notify is that there is
  * a nonzero chance of errors here (for example, encoding conversion errors
@@ -1208,17 +1230,11 @@ Exec_UnlistenAllCommit(void)
  * to ensure that a transaction's self-notifies are delivered to the frontend
  * before it gets the terminating ReadyForQuery message.
  *
- * Note that we send signals and process the queue even if the transaction
- * eventually aborted.  This is because we need to clean out whatever got
- * added to the queue.
- *
  * NOTE: we are outside of any transaction here.
  */
 void
 ProcessCompletedNotifies(void)
 {
-	MemoryContext caller_context;
-
 	/* Nothing to do if we didn't send any notifications */
 	if (!backendHasSentNotifications)
 		return;
@@ -1230,43 +1246,32 @@ ProcessCompletedNotifies(void)
 	 */
 	backendHasSentNotifications = false;
 
-	/*
-	 * We must preserve the caller's memory context (probably MessageContext)
-	 * across the transaction we do here.
-	 */
-	caller_context = CurrentMemoryContext;
-
 	if (Trace_notify)
 		elog(DEBUG1, "ProcessCompletedNotifies");
 
-	/*
-	 * We must run asyncQueueReadAllNotifications inside a transaction, else
-	 * bad things happen if it gets an error.
-	 */
-	StartTransactionCommand();
-
-	/* Send signals to other backends */
-	SignalBackends();
-
 	if (listenChannels != NIL)
 	{
+		MemoryContext caller_context;
+
+		/*
+		 * We must preserve the caller's memory context (probably
+		 * MessageContext) across the transaction we do here.
+		 */
+		caller_context = CurrentMemoryContext;
+
+		/*
+		 * We must run asyncQueueReadAllNotifications inside a transaction,
+		 * else bad things happen if it gets an error.
+		 */
+		StartTransactionCommand();
+
 		/* Read the queue ourselves, and send relevant stuff to the frontend */
 		asyncQueueReadAllNotifications();
-	}
+		CommitTransactionCommand();
 
-	/*
-	 * If it's time to try to advance the global tail pointer, do that.
-	 */
-	if (backendTryAdvanceTail)
-	{
-		backendTryAdvanceTail = false;
-		asyncQueueAdvanceTail();
+		MemoryContextSwitchTo(caller_context);
 	}
 
-	CommitTransactionCommand();
-
-	MemoryContextSwitchTo(caller_context);
-
 	/* We don't need pq_flush() here since postgres.c will do one shortly */
 }
 
@@ -1655,13 +1660,16 @@ asyncQueueFillWarning(void)
  * advance their queue position pointers, allowing the global tail to advance.
  *
  * Since we know the BackendId and the Pid the signaling is quite cheap.
+ *
+ * Returns true if this backend is listening to notifications.
  */
-static void
+static bool
 SignalBackends(void)
 {
 	int32	   *pids;
 	BackendId  *ids;
 	int			count;
+	bool		am_listener = false;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1684,7 +1692,10 @@ SignalBackends(void)
 
 		Assert(pid != InvalidPid);
 		if (pid == MyProcPid)
+		{
+			am_listener = true;
 			continue;			/* never signal self */
+		}
 		pos = QUEUE_BACKEND_POS(i);
 		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
 		{
@@ -1729,6 +1740,8 @@ SignalBackends(void)
 
 	pfree(pids);
 	pfree(ids);
+
+	return am_listener;
 }
 
 /*
@@ -1752,6 +1765,16 @@ AtAbort_Notify(void)
 
 	/* And clean up */
 	ClearPendingActionsAndNotifies();
+
+	/*
+	 * We can reach here having some notifications queued in this
+	 * transaction. Advance tail pointer in that case.
+	 */
+	if (backendTryAdvanceTail)
+	{
+		backendTryAdvanceTail = false;
+		asyncQueueAdvanceTail();
+	}
 }
 
 /*
-- 
2.18.2

Reply via email to