Hoi Tom,

On Wed, 11 Sep 2019 at 00:18, Tom Lane <t...@sss.pgh.pa.us> wrote:

>
> I pushed 0001 after doing some hacking on it --- it was sloppy about
> datatypes, and about whether the invalid-entry value is 0 or -1,
> and it was just wrong about keeping the list in backendid order.
> (You can't conditionally skip looking for where to put the new
> entry, if you want to maintain the order.  I thought about just
> defining the list as unordered, which would simplify joining the
> list initially, but that could get pretty cache-unfriendly when
> there are lots of entries.)
>
> 0002 is now going to need a rebase, so please do that.
>
>
Thanks for this, and good catch. Looks like I didn't test the first patch
by itself very well.

Here is the rebased second patch.

Thanks in advance,
-- 
Martijn van Oosterhout <klep...@gmail.com> http://svana.org/kleptog/
From bc4b1b458564f758b7fa1c1f7b0397aade71db06 Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <oosterh...@fox-it.com>
Date: Mon, 3 Jun 2019 17:13:31 +0200
Subject: [PATCH 1/2] Improve performance of async notifications

Advancing the tail pointer requires an exclusive lock which can block
backends from other databases, so it's worth keeping these attempts to a
minimum.

Instead of tracking the slowest backend exactly we update the queue more
lazily, only checking when we switch to a new SLRU page.  Additionally,
instead of waking up every slow backend at once, we do them one at a time.
---
 src/backend/commands/async.c | 142 +++++++++++++++++++++++++----------
 1 file changed, 101 insertions(+), 41 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b5ea..b9dd0ca139 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -73,10 +73,11 @@
  *	  Finally, after we are out of the transaction altogether, we check if
  *	  we need to signal listening backends.  In SignalBackends() we scan the
  *	  list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- *	  to every listening backend (we don't know which backend is listening on
- *	  which channel so we must signal them all). We can exclude backends that
- *	  are already up to date, though.  We don't bother with a self-signal
- *	  either, but just process the queue directly.
+ *	  to every listening backend for the relavent database (we don't know
+ *	  which backend is listening on which channel so we must signal them
+ *	  all).  We can exclude backends that are already up to date, though.
+ *	  We don't bother with a self-signal either, but just process the queue
+ *	  directly.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
  *	  sets the process's latch, which triggers the event to be processed
@@ -89,13 +90,25 @@
  *	  Inbound-notify processing consists of reading all of the notifications
  *	  that have arrived since scanning last time. We read every notification
  *	  until we reach either a notification from an uncommitted transaction or
- *	  the head pointer's position. Then we check if we were the laziest
- *	  backend: if our pointer is set to the same position as the global tail
- *	  pointer is set, then we move the global tail pointer ahead to where the
- *	  second-laziest backend is (in general, we take the MIN of the current
- *	  head position and all active backends' new tail pointers). Whenever we
- *	  move the global tail pointer we also truncate now-unused pages (i.e.,
- *	  delete files in pg_notify/ that are no longer used).
+ *	  the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and minimize disk space the tail pointer
+ *	  needs to be advanced so that old pages can be truncated.  This
+ *	  however requires an exclusive lock and as such should be done
+ *	  infrequently.
+ *
+ *	  When a new notification is added, the writer checks to see if the
+ *	  tail pointer is more than QUEUE_CLEANUP_DELAY pages behind.  If
+ *	  so, it attempts to advance the tail, and if there are slow
+ *	  backends (perhaps because all the notifications were for other
+ *	  databases), wake one of them up by sending a signal.
+ *
+ *	  When the slow backend processes the queue it notes it was behind
+ *	  and so also tries to advance the tail, possibly waking up another
+ *	  slow backend.  Eventually all backends will have processed the
+ *	  queue and the global tail pointer is move to a new page and we
+ *	  also truncate now-unused pages (i.e., delete files in pg_notify/
+ *	  that are no longer used).
  *
  * An application that listens on the same channel it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
@@ -211,6 +224,12 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* how many pages does a backend need to be behind before it needs to be signalled */
+#define QUEUE_CLEANUP_DELAY 4
+
+/* is a backend so far behind it needs to be signalled? */
+#define QUEUE_SLOW_BACKEND(i) \
+	(asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY)
 /*
  * Struct describing a listening backend's status
  */
@@ -252,7 +271,7 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
 	QueuePosition head;			/* head points to the next free location */
-	QueuePosition tail;			/* the global tail is equivalent to the pos of
+	QueuePosition tail;			/* the global tail is some place older than the
 								 * the "slowest" backend */
 	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
@@ -402,10 +421,15 @@ static bool amRegisteredListener = false;
 /* has this backend sent notifications in the current transaction? */
 static bool backendHasSentNotifications = false;
 
+/* has this backend switched to new page, and so should attempt to advance
+ * the queue tail?  */
+static bool backendTryAdvanceTail = false;
+
 /* GUC parameter */
 bool		Trace_notify = false;
 
 /* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
 static bool asyncQueuePagePrecedes(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +445,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static bool SignalMyDBBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
@@ -438,8 +462,8 @@ static void ClearPendingActionsAndNotifies(void);
 /*
  * We will work on the page range of 0..QUEUE_MAX_PAGE.
  */
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
 {
 	int			diff;
 
@@ -455,7 +479,13 @@ asyncQueuePagePrecedes(int p, int q)
 		diff -= QUEUE_MAX_PAGE + 1;
 	else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
 		diff += QUEUE_MAX_PAGE + 1;
-	return diff < 0;
+	return diff;
+}
+
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+	return asyncQueuePageDiff(p, q) < 0;
 }
 
 /*
@@ -905,6 +935,12 @@ PreCommit_Notify(void)
 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 						 errmsg("too many notifications in the NOTIFY queue")));
 			nextNotify = asyncQueueAddEntries(nextNotify);
+
+			/* If we are advancing to a new page, remember this so after the
+			 * transaction commits we can attempt to advance the tail
+			 * pointer, see ProcessCompletedNotifies() */
+			if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+				backendTryAdvanceTail = true;
 			LWLockRelease(AsyncQueueLock);
 		}
 	}
@@ -1051,8 +1087,6 @@ Exec_ListenPreCommit(void)
 	 * notification to the frontend.  Also, although our transaction might
 	 * have executed NOTIFY, those message(s) aren't queued yet so we can't
 	 * see them in the queue.
-	 *
-	 * This will also advance the global tail pointer if possible.
 	 */
 	if (!QUEUE_POS_EQUAL(max, head))
 		asyncQueueReadAllNotifications();
@@ -1185,7 +1219,7 @@ ProcessCompletedNotifies(void)
 	StartTransactionCommand();
 
 	/* Send signals to other backends */
-	signalled = SignalBackends();
+	signalled = SignalMyDBBackends();
 
 	if (listenChannels != NIL)
 	{
@@ -1203,6 +1237,16 @@ ProcessCompletedNotifies(void)
 		 * harmless.)
 		 */
 		asyncQueueAdvanceTail();
+		backendTryAdvanceTail = false;
+	}
+
+	if (backendTryAdvanceTail)
+	{
+		/* We switched to a new page while writing our notifies to the
+		 * queue, so we try to advance the tail ourselves, possibly waking
+		 * up another backend if it is running behind */
+		backendTryAdvanceTail = false;
+		asyncQueueAdvanceTail();
 	}
 
 	CommitTransactionCommand();
@@ -1253,10 +1297,7 @@ asyncQueueUnregister(void)
 	 * Need exclusive lock here to manipulate list links.
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
-	/* check if entry is valid and oldest ... */
-	advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
-		QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
-	/* ... then mark it invalid */
+	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
 	/* and remove it from the list */
@@ -1278,10 +1319,6 @@ asyncQueueUnregister(void)
 
 	/* mark ourselves as no longer listed in the global array */
 	amRegisteredListener = false;
-
-	/* If we were the laziest backend, try to advance the tail pointer */
-	if (advanceTail)
-		asyncQueueAdvanceTail();
 }
 
 /*
@@ -1570,7 +1607,7 @@ asyncQueueFillWarning(void)
 }
 
 /*
- * Send signals to all listening backends (except our own).
+ * Send signals to all listening backends (except our own) for our database.
  *
  * Returns true if we sent at least one signal.
  *
@@ -1583,7 +1620,7 @@ asyncQueueFillWarning(void)
  * Since we know the BackendId and the Pid the signalling is quite cheap.
  */
 static bool
-SignalBackends(void)
+SignalMyDBBackends(void)
 {
 	bool		signalled = false;
 	int32	   *pids;
@@ -1592,9 +1629,9 @@ SignalBackends(void)
 	int32		pid;
 
 	/*
-	 * Identify all backends that are listening and not already up-to-date. We
-	 * don't want to send signals while holding the AsyncQueueLock, so we just
-	 * build a list of target PIDs.
+	 * Identify all backends with MyDatabaseId that are listening and not
+	 * already up-to-date.  We don't want to send signals while holding the
+	 * AsyncQueueLock, so we just build a list of target PIDs.
 	 *
 	 * XXX in principle these pallocs could fail, which would be bad. Maybe
 	 * preallocate the arrays?	But in practice this is only run in trivial
@@ -1609,7 +1646,7 @@ SignalBackends(void)
 	{
 		pid = QUEUE_BACKEND_PID(i);
 		Assert(pid != InvalidPid);
-		if (pid != MyProcPid)
+		if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
 		{
 			QueuePosition pos = QUEUE_BACKEND_POS(i);
 
@@ -1859,6 +1896,9 @@ asyncQueueReadAllNotifications(void)
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
 	head = QUEUE_HEAD;
+	/* If we're behind, we possibly got signalled to catchup.  Remember
+	 * this so we attempt to advance the tail later */
+	advanceTail = QUEUE_SLOW_BACKEND(MyBackendId);
 	LWLockRelease(AsyncQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
@@ -1966,12 +2006,9 @@ asyncQueueReadAllNotifications(void)
 		/* Update shared state */
 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyBackendId) = pos;
-		advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
 		LWLockRelease(AsyncQueueLock);
 
-		/* If we were the laziest backend, try to advance the tail pointer */
-		if (advanceTail)
-			asyncQueueAdvanceTail();
+		/* We don't try to advance the tail here. */
 
 		PG_RE_THROW();
 	}
@@ -1980,10 +2017,10 @@ asyncQueueReadAllNotifications(void)
 	/* Update shared state */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	QUEUE_BACKEND_POS(MyBackendId) = pos;
-	advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
 	LWLockRelease(AsyncQueueLock);
 
-	/* If we were the laziest backend, try to advance the tail pointer */
+	/* We were behind, so try to advance the tail pointer, possibly
+	 * signalling another backend if necessary */
 	if (advanceTail)
 		asyncQueueAdvanceTail();
 
@@ -2093,8 +2130,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 }
 
 /*
- * Advance the shared queue tail variable to the minimum of all the
- * per-backend tail pointers.  Truncate pg_notify space if possible.
+ * Advance the shared queue tail variable if possible.  If a slow backend is
+ * holding everything up, signal it.  Truncate pg_notify space if possible.
  */
 static void
 asyncQueueAdvanceTail(void)
@@ -2103,18 +2140,41 @@ asyncQueueAdvanceTail(void)
 	int			oldtailpage;
 	int			newtailpage;
 	int			boundary;
+	int			slowbackendid = InvalidBackendId;
+	int			slowbackendpid;
 
+	/* Advance the tail as far as possible, noting if there is a slow
+	 * backend we could kick */
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
 	min = QUEUE_HEAD;
 	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
 	{
 		Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
+		if (QUEUE_BACKEND_PID(i) != MyProcPid && QUEUE_SLOW_BACKEND(i))
+		{
+			slowbackendid = i;
+			slowbackendpid = QUEUE_BACKEND_PID(i);
+		}
 		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
 	}
 	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
 	QUEUE_TAIL = min;
 	LWLockRelease(AsyncQueueLock);
 
+	/* At least one backend was slow, so signal a random one to wake it
+	 * up.  It should in turn call this function to signal the next,
+	 * see asyncQueueReadAllNotifications() */
+	if (slowbackendid != InvalidBackendId) {
+
+		/* Note: assuming things aren't broken, a signal failure here could
+		 * only occur if the target backend exited since we released
+		 * AsyncQueueLock; which is unlikely but certainly possible. So we
+		 * just log a low-level debug message if it happens.
+		 */
+		if (SendProcSignal(slowbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowbackendid) < 0)
+			elog(DEBUG3, "could not signal backend with PID %d: %m", slowbackendpid);
+	}
+
 	/*
 	 * We can truncate something if the global tail advanced across an SLRU
 	 * segment boundary.
-- 
2.20.1

Reply via email to