On Thu, Oct 13, 2022 at 12:09:54PM -0700, Nathan Bossart wrote: > On Thu, Oct 13, 2022 at 12:37:39PM +0200, Alvaro Herrera wrote: >> The main reason is that it seems odd to have startpointTLI in the struct >> used in some places together with a file-global recvFileTLI which isn't. >> The way one is passed as argument and the other as part of a struct >> seemed too distracting. This should reduce the number of moving parts, >> ISTM. > > Makes sense. Do you think the struct should be file-global so that it > doesn't need to be provided as an argument to most of the static functions > in this file?
Here's a different take. Instead of creating structs and altering function signatures, we can just make the wake-up times file-global, and we can skip the changes related to reducing the number of calls to GetCurrentTimestamp() for now. This results in a far less invasive patch. (I still think reducing the number of calls to GetCurrentTimestamp() is worthwhile, but I'm beginning to agree with Bharath that it should be handled separately.) Thoughts? -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 389a398412c51ee18cb37e04548350d661c78d4f Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Thu, 27 Jan 2022 21:43:17 +1300 Subject: [PATCH v7 1/1] Suppress useless wakeups in walreceiver. Instead of waking up 10 times per second to check for various timeout conditions, keep track of when we next have periodic work to do. Reviewed-by: Kyotaro Horiguchi, Bharath Rupireddy, Alvaro Herrera Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com --- src/backend/replication/walreceiver.c | 163 +++++++++++++++++--------- 1 file changed, 106 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 6cbb67c92a..a8740c84d2 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -95,8 +95,6 @@ bool hot_standby_feedback; static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; -#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ - /* * These variables are used similarly to openLogFile/SegNo, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID @@ -116,6 +114,23 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum WalRcvWakeupReason +{ + WALRCV_WAKEUP_TERMINATE, + WALRCV_WAKEUP_PING, + WALRCV_WAKEUP_REPLY, + WALRCV_WAKEUP_HSFEEDBACK, + NUM_WALRCV_WAKEUPS +} WalRcvWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; + static StringInfoData reply_message; static StringInfoData incoming_message; @@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); +static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); /* * Process any interrupts the walreceiver process may have received. @@ -179,9 +195,7 @@ WalReceiverMain(void) TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; - TimestampTz last_recv_timestamp; - TimestampTz starttime; - bool ping_sent; + TimestampTz now; char *err; char *sender_host = NULL; int sender_port = 0; @@ -192,7 +206,7 @@ WalReceiverMain(void) */ Assert(walrcv != NULL); - starttime = GetCurrentTimestamp(); + now = GetCurrentTimestamp(); /* * Mark walreceiver as running in shared memory. @@ -248,7 +262,7 @@ WalReceiverMain(void) /* Initialise to a sanish value */ walrcv->lastMsgSendTime = - walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime; + walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; /* Report the latch to use to awaken this process */ walrcv->latch = &MyProc->procLatch; @@ -414,9 +428,10 @@ WalReceiverMain(void) initStringInfo(&reply_message); initStringInfo(&incoming_message); - /* Initialize the last recv timestamp */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(i, now); /* Loop until end-of-streaming or error */ for (;;) @@ -426,6 +441,8 @@ WalReceiverMain(void) bool endofwal = false; pgsocket wait_fd = PGINVALID_SOCKET; int rc; + TimestampTz nextWakeup; + int64 nap; /* * Exit walreceiver if we're not in recovery. This should not @@ -443,11 +460,15 @@ WalReceiverMain(void) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + WalRcvComputeNextWakeup(i, now); XLogWalRcvSendHSFeedback(true); } /* See if we can read data immediately */ len = walrcv_receive(wrconn, &buf, &wait_fd); + now = GetCurrentTimestamp(); if (len != 0) { /* @@ -459,11 +480,12 @@ WalReceiverMain(void) if (len > 0) { /* - * Something was received from primary, so reset - * timeout + * Something was received from primary, so adjust + * the ping and terminate wakeup times. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE, + now); + WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now); XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1, startpointTLI); } @@ -480,6 +502,7 @@ WalReceiverMain(void) break; } len = walrcv_receive(wrconn, &buf, &wait_fd); + now = GetCurrentTimestamp(); } /* Let the primary know that we received some data. */ @@ -497,6 +520,12 @@ WalReceiverMain(void) if (endofwal) break; + /* Find the soonest wakeup time, to limit our nap. */ + nextWakeup = PG_INT64_MAX; + for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) + nextWakeup = Min(wakeup[i], nextWakeup); + nap = Max(0, (nextWakeup - now + 999) / 1000); + /* * Ideally we would reuse a WaitEventSet object repeatedly * here to avoid the overheads of WaitLatchOrSocket on epoll @@ -513,8 +542,9 @@ WalReceiverMain(void) WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, - NAPTIME_PER_CYCLE, + (int) Min(INT_MAX, nap), WAIT_EVENT_WAL_RECEIVER_MAIN); + now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -550,34 +580,19 @@ WalReceiverMain(void) * Check if time since last receive from primary has * reached the configured limit. */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + if (now >= wakeup[WALRCV_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout"))); - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating walreceiver due to timeout"))); - - /* - * We didn't receive anything new, for half of - * receiver replication timeout. Ping the server. - */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. + */ + if (now >= wakeup[WALRCV_WAKEUP_PING]) + { + requestReply = true; + wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX; } XLogWalRcvSendReply(requestReply, requestReply); @@ -1076,7 +1091,6 @@ XLogWalRcvSendReply(bool force, bool requestReply) static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; - static TimestampTz sendTime = 0; TimestampTz now; /* @@ -1101,10 +1115,11 @@ XLogWalRcvSendReply(bool force, bool requestReply) if (!force && writePtr == LogstreamResult.Write && flushPtr == LogstreamResult.Flush - && !TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) + && now < wakeup[WALRCV_WAKEUP_REPLY]) return; - sendTime = now; + + /* Make sure we wake up when it's time to send another reply. */ + WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now); /* Construct a new message */ writePtr = LogstreamResult.Write; @@ -1149,7 +1164,6 @@ XLogWalRcvSendHSFeedback(bool immed) catalog_xmin_epoch; TransactionId xmin, catalog_xmin; - static TimestampTz sendTime = 0; /* initially true so we always send at least one feedback message */ static bool primary_has_standby_xmin = true; @@ -1165,16 +1179,12 @@ XLogWalRcvSendHSFeedback(bool immed) /* Get current timestamp. */ now = GetCurrentTimestamp(); - if (!immed) - { - /* - * Send feedback at most once per wal_receiver_status_interval. - */ - if (!TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) - return; - sendTime = now; - } + /* Send feedback at most once per wal_receiver_status_interval. */ + if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK]) + return; + + /* Make sure we wake up when it's time to send feedback again. */ + WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now); /* * If Hot Standby is not yet accepting connections there is nothing to @@ -1285,6 +1295,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) } } +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. + */ +static void +WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case WALRCV_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + break; + case WALRCV_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + break; + case WALRCV_WAKEUP_HSFEEDBACK: + if (!hot_standby_feedback || wal_receiver_status_interval <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + break; + case WALRCV_WAKEUP_REPLY: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + break; + default: + break; + } +} + /* * Wake up the walreceiver main loop. * -- 2.25.1