rebased for cfbot -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 2466001a3ae6f94aac8eff45b488765e619bea1b Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Thu, 1 Dec 2022 20:50:21 -0800 Subject: [PATCH v2 1/1] suppress unnecessary wakeups in logical/worker.c
--- src/backend/replication/logical/tablesync.c | 20 +++ src/backend/replication/logical/worker.c | 156 +++++++++++++++----- src/include/replication/worker_internal.h | 3 + 3 files changed, 142 insertions(+), 37 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 38dfce7129..88218e1fed 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); + /* + * If we've made it past our previously-stored special wakeup time, reset + * it so that it can be recalculated as needed. + */ + if (next_sync_start <= GetCurrentTimestamp()) + next_sync_start = PG_INT64_MAX; + /* We need up-to-date sync state info for subscription tables here. */ FetchTableStates(&started_tx); @@ -592,6 +599,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) DSM_HANDLE_INVALID); hentry->last_start_time = now; } + else if (found) + { + TimestampTz retry_time = hentry->last_start_time + + (wal_retrieve_retry_interval * + INT64CONST(1000)); + + /* + * Store when we can start the sync worker so that we + * know how long to sleep. + */ + if (retry_time < next_sync_start) + next_sync_start = retry_time; + } } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 79cda39445..284f11428c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -207,8 +207,6 @@ #include "utils/syscache.h" #include "utils/timeout.h" -#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ - typedef struct FlushPosition { dlist_node node; @@ -348,6 +346,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum LogRepWorkerWakeupReason +{ + LRW_WAKEUP_TERMINATE, + LRW_WAKEUP_PING, + LRW_WAKEUP_STATUS, + NUM_LRW_WAKEUPS +} LogRepWorkerWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +static TimestampTz wakeup[NUM_LRW_WAKEUPS]; +TimestampTz next_sync_start; + +static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, + TimestampTz now); + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -3446,10 +3464,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - TimestampTz last_recv_timestamp = GetCurrentTimestamp(); - bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + TimestampTz now; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3479,6 +3496,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = &errcallback; apply_error_context_stack = error_context_stack; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + next_sync_start = PG_INT64_MAX; + /* This outer loop iterates once per wait. */ for (;;) { @@ -3495,6 +3518,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); if (len != 0) { /* Loop to process all available data (without blocking). */ @@ -3518,9 +3542,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) int c; StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Adjust the ping and terminate wakeup times. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now); + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now); /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3574,6 +3598,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); } } @@ -3612,14 +3637,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (!dlist_is_empty(&lsn_mapping)) wait_time = WalWriterDelay; else - wait_time = NAPTIME_PER_CYCLE; + { + TimestampTz nextWakeup = PG_INT64_MAX; + + /* Find soonest wakeup time, to limit our nap. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + nextWakeup = Min(wakeup[i], nextWakeup); + + /* Also consider special wakeup time for starting sync workers. */ + if (next_sync_start < now) + { + /* + * Instead of spinning while we wait for the sync worker to + * start, wait a bit before retrying (unless there's an earlier + * wakeup time). + */ + nextWakeup = Min(now + INT64CONST(100000), nextWakeup); + } + else + nextWakeup = Min(next_sync_start, nextWakeup); + + /* + * Calculate the nap time. WaitLatchOrSocket() doesn't accept + * timeouts longer than INT_MAX milliseconds, so we limit the + * result accordingly. Also, we round up to the next millisecond + * to avoid waking up too early and spinning until one of the + * wakeup times. + */ + wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, fd, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN); - + now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -3630,6 +3684,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + + /* + * If a wakeup time for starting sync workers was set, just set it + * to right now. It will be recalculated as needed. + */ + if (next_sync_start != PG_INT64_MAX) + next_sync_start = now; } if (rc & WL_TIMEOUT) @@ -3648,31 +3712,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Check if time since last receive from primary has reached the * configured limit. */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; + if (now >= wakeup[LRW_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); - - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication worker due to timeout"))); - - /* Check to see if it's time for a ping. */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + /* Check to see if it's time for a ping. */ + if (now >= wakeup[LRW_WAKEUP_PING]) + { + requestReply = true; + wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX; } send_feedback(last_received, requestReply, requestReply); @@ -3708,7 +3757,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; @@ -3751,10 +3799,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!force && writepos == last_writepos && flushpos == last_flushpos && - !TimestampDifferenceExceeds(send_time, now, - wal_receiver_status_interval * 1000)) + now < wakeup[LRW_WAKEUP_STATUS]) return; - send_time = now; + + /* Make sure we wake up when it's time to send another status update. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now); if (!reply_message) { @@ -5034,3 +5083,36 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_SEND_TO_PARALLEL; } } + +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. + */ +static void +LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case LRW_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + break; + case LRW_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + break; + case LRW_WAKEUP_STATUS: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = PG_INT64_MAX; + else + wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + break; + default: + break; + } +} diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index db891eea8a..ec39711ed1 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +/* Next time to attempt starting sync workers. */ +extern PGDLLIMPORT TimestampTz next_sync_start; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); -- 2.25.1