Hoi Tom,
On Wed, 11 Sep 2019 at 00:18, Tom Lane <t...@sss.pgh.pa.us> wrote: > > I pushed 0001 after doing some hacking on it --- it was sloppy about > datatypes, and about whether the invalid-entry value is 0 or -1, > and it was just wrong about keeping the list in backendid order. > (You can't conditionally skip looking for where to put the new > entry, if you want to maintain the order. I thought about just > defining the list as unordered, which would simplify joining the > list initially, but that could get pretty cache-unfriendly when > there are lots of entries.) > > 0002 is now going to need a rebase, so please do that. > > Thanks for this, and good catch. Looks like I didn't test the first patch by itself very well. Here is the rebased second patch. Thanks in advance, -- Martijn van Oosterhout <klep...@gmail.com> http://svana.org/kleptog/
From bc4b1b458564f758b7fa1c1f7b0397aade71db06 Mon Sep 17 00:00:00 2001 From: Martijn van Oosterhout <oosterh...@fox-it.com> Date: Mon, 3 Jun 2019 17:13:31 +0200 Subject: [PATCH 1/2] Improve performance of async notifications Advancing the tail pointer requires an exclusive lock which can block backends from other databases, so it's worth keeping these attempts to a minimum. Instead of tracking the slowest backend exactly we update the queue more lazily, only checking when we switch to a new SLRU page. Additionally, instead of waking up every slow backend at once, we do them one at a time. --- src/backend/commands/async.c | 142 +++++++++++++++++++++++++---------- 1 file changed, 101 insertions(+), 41 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index f26269b5ea..b9dd0ca139 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -73,10 +73,11 @@ * Finally, after we are out of the transaction altogether, we check if * we need to signal listening backends. In SignalBackends() we scan the * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal - * to every listening backend (we don't know which backend is listening on - * which channel so we must signal them all). We can exclude backends that - * are already up to date, though. We don't bother with a self-signal - * either, but just process the queue directly. + * to every listening backend for the relavent database (we don't know + * which backend is listening on which channel so we must signal them + * all). We can exclude backends that are already up to date, though. + * We don't bother with a self-signal either, but just process the queue + * directly. * * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * sets the process's latch, which triggers the event to be processed @@ -89,13 +90,25 @@ * Inbound-notify processing consists of reading all of the notifications * that have arrived since scanning last time. We read every notification * until we reach either a notification from an uncommitted transaction or - * the head pointer's position. Then we check if we were the laziest - * backend: if our pointer is set to the same position as the global tail - * pointer is set, then we move the global tail pointer ahead to where the - * second-laziest backend is (in general, we take the MIN of the current - * head position and all active backends' new tail pointers). Whenever we - * move the global tail pointer we also truncate now-unused pages (i.e., - * delete files in pg_notify/ that are no longer used). + * the head pointer's position. + * + * 6. To avoid SLRU wraparound and minimize disk space the tail pointer + * needs to be advanced so that old pages can be truncated. This + * however requires an exclusive lock and as such should be done + * infrequently. + * + * When a new notification is added, the writer checks to see if the + * tail pointer is more than QUEUE_CLEANUP_DELAY pages behind. If + * so, it attempts to advance the tail, and if there are slow + * backends (perhaps because all the notifications were for other + * databases), wake one of them up by sending a signal. + * + * When the slow backend processes the queue it notes it was behind + * and so also tries to advance the tail, possibly waking up another + * slow backend. Eventually all backends will have processed the + * queue and the global tail pointer is move to a new page and we + * also truncate now-unused pages (i.e., delete files in pg_notify/ + * that are no longer used). * * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, @@ -211,6 +224,12 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* how many pages does a backend need to be behind before it needs to be signalled */ +#define QUEUE_CLEANUP_DELAY 4 + +/* is a backend so far behind it needs to be signalled? */ +#define QUEUE_SLOW_BACKEND(i) \ + (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY) /* * Struct describing a listening backend's status */ @@ -252,7 +271,7 @@ typedef struct QueueBackendStatus typedef struct AsyncQueueControl { QueuePosition head; /* head points to the next free location */ - QueuePosition tail; /* the global tail is equivalent to the pos of + QueuePosition tail; /* the global tail is some place older than the * the "slowest" backend */ BackendId firstListener; /* id of first listener, or InvalidBackendId */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ @@ -402,10 +421,15 @@ static bool amRegisteredListener = false; /* has this backend sent notifications in the current transaction? */ static bool backendHasSentNotifications = false; +/* has this backend switched to new page, and so should attempt to advance + * the queue tail? */ +static bool backendTryAdvanceTail = false; + /* GUC parameter */ bool Trace_notify = false; /* local function prototypes */ +static int asyncQueuePageDiff(int p, int q); static bool asyncQueuePagePrecedes(int p, int q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); @@ -421,7 +445,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); -static bool SignalBackends(void); +static bool SignalMyDBBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, @@ -438,8 +462,8 @@ static void ClearPendingActionsAndNotifies(void); /* * We will work on the page range of 0..QUEUE_MAX_PAGE. */ -static bool -asyncQueuePagePrecedes(int p, int q) +static int +asyncQueuePageDiff(int p, int q) { int diff; @@ -455,7 +479,13 @@ asyncQueuePagePrecedes(int p, int q) diff -= QUEUE_MAX_PAGE + 1; else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) diff += QUEUE_MAX_PAGE + 1; - return diff < 0; + return diff; +} + +static bool +asyncQueuePagePrecedes(int p, int q) +{ + return asyncQueuePageDiff(p, q) < 0; } /* @@ -905,6 +935,12 @@ PreCommit_Notify(void) (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); + + /* If we are advancing to a new page, remember this so after the + * transaction commits we can attempt to advance the tail + * pointer, see ProcessCompletedNotifies() */ + if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0) + backendTryAdvanceTail = true; LWLockRelease(AsyncQueueLock); } } @@ -1051,8 +1087,6 @@ Exec_ListenPreCommit(void) * notification to the frontend. Also, although our transaction might * have executed NOTIFY, those message(s) aren't queued yet so we can't * see them in the queue. - * - * This will also advance the global tail pointer if possible. */ if (!QUEUE_POS_EQUAL(max, head)) asyncQueueReadAllNotifications(); @@ -1185,7 +1219,7 @@ ProcessCompletedNotifies(void) StartTransactionCommand(); /* Send signals to other backends */ - signalled = SignalBackends(); + signalled = SignalMyDBBackends(); if (listenChannels != NIL) { @@ -1203,6 +1237,16 @@ ProcessCompletedNotifies(void) * harmless.) */ asyncQueueAdvanceTail(); + backendTryAdvanceTail = false; + } + + if (backendTryAdvanceTail) + { + /* We switched to a new page while writing our notifies to the + * queue, so we try to advance the tail ourselves, possibly waking + * up another backend if it is running behind */ + backendTryAdvanceTail = false; + asyncQueueAdvanceTail(); } CommitTransactionCommand(); @@ -1253,10 +1297,7 @@ asyncQueueUnregister(void) * Need exclusive lock here to manipulate list links. */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); - /* check if entry is valid and oldest ... */ - advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && - QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); - /* ... then mark it invalid */ + /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; /* and remove it from the list */ @@ -1278,10 +1319,6 @@ asyncQueueUnregister(void) /* mark ourselves as no longer listed in the global array */ amRegisteredListener = false; - - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); } /* @@ -1570,7 +1607,7 @@ asyncQueueFillWarning(void) } /* - * Send signals to all listening backends (except our own). + * Send signals to all listening backends (except our own) for our database. * * Returns true if we sent at least one signal. * @@ -1583,7 +1620,7 @@ asyncQueueFillWarning(void) * Since we know the BackendId and the Pid the signalling is quite cheap. */ static bool -SignalBackends(void) +SignalMyDBBackends(void) { bool signalled = false; int32 *pids; @@ -1592,9 +1629,9 @@ SignalBackends(void) int32 pid; /* - * Identify all backends that are listening and not already up-to-date. We - * don't want to send signals while holding the AsyncQueueLock, so we just - * build a list of target PIDs. + * Identify all backends with MyDatabaseId that are listening and not + * already up-to-date. We don't want to send signals while holding the + * AsyncQueueLock, so we just build a list of target PIDs. * * XXX in principle these pallocs could fail, which would be bad. Maybe * preallocate the arrays? But in practice this is only run in trivial @@ -1609,7 +1646,7 @@ SignalBackends(void) { pid = QUEUE_BACKEND_PID(i); Assert(pid != InvalidPid); - if (pid != MyProcPid) + if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId) { QueuePosition pos = QUEUE_BACKEND_POS(i); @@ -1859,6 +1896,9 @@ asyncQueueReadAllNotifications(void) Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); head = QUEUE_HEAD; + /* If we're behind, we possibly got signalled to catchup. Remember + * this so we attempt to advance the tail later */ + advanceTail = QUEUE_SLOW_BACKEND(MyBackendId); LWLockRelease(AsyncQueueLock); if (QUEUE_POS_EQUAL(pos, head)) @@ -1966,12 +2006,9 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); + /* We don't try to advance the tail here. */ PG_RE_THROW(); } @@ -1980,10 +2017,10 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ + /* We were behind, so try to advance the tail pointer, possibly + * signalling another backend if necessary */ if (advanceTail) asyncQueueAdvanceTail(); @@ -2093,8 +2130,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, } /* - * Advance the shared queue tail variable to the minimum of all the - * per-backend tail pointers. Truncate pg_notify space if possible. + * Advance the shared queue tail variable if possible. If a slow backend is + * holding everything up, signal it. Truncate pg_notify space if possible. */ static void asyncQueueAdvanceTail(void) @@ -2103,18 +2140,41 @@ asyncQueueAdvanceTail(void) int oldtailpage; int newtailpage; int boundary; + int slowbackendid = InvalidBackendId; + int slowbackendpid; + /* Advance the tail as far as possible, noting if there is a slow + * backend we could kick */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + if (QUEUE_BACKEND_PID(i) != MyProcPid && QUEUE_SLOW_BACKEND(i)) + { + slowbackendid = i; + slowbackendpid = QUEUE_BACKEND_PID(i); + } min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); } oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); QUEUE_TAIL = min; LWLockRelease(AsyncQueueLock); + /* At least one backend was slow, so signal a random one to wake it + * up. It should in turn call this function to signal the next, + * see asyncQueueReadAllNotifications() */ + if (slowbackendid != InvalidBackendId) { + + /* Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * AsyncQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. + */ + if (SendProcSignal(slowbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowbackendid) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", slowbackendpid); + } + /* * We can truncate something if the global tail advanced across an SLRU * segment boundary. -- 2.20.1