I've attached a minimally-updated patch that doesn't yet address the bigger topics under discussion.
On Thu, Mar 16, 2023 at 03:30:37PM +0530, Amit Kapila wrote: > On Wed, Feb 1, 2023 at 5:35 AM Nathan Bossart <nathandboss...@gmail.com> > wrote: >> On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote: >> > BTW, do we need to do something about wakeups in >> > wait_for_relation_state_change()? >> >> ... and wait_for_worker_state_change(), and copy_read_data(). From a quick >> glance, it looks like fixing these would be a more invasive change. > > What kind of logic do you have in mind to avoid waking up once per > second in those cases? I haven't looked into this too much yet. I'd probably try out Tom's suggestions from upthread [0] next and see if those can be applied here, too. >> TBH >> I'm beginning to wonder whether all this is really worth it to prevent >> waking up once per second. > > If we can't do it for all cases, do you see any harm in doing it for > cases where we can achieve it without adding much complexity? We can > probably add comments for others so that if someone else has better > ideas in the future we can deal with those as well. I don't think there's any harm, but I'm also not sure it does a whole lot of good. At the very least, I think we should figure out something better than the process_syncing_tables() hacks before taking this patch seriously. [0] https://postgr.es/m/3220831.1674772625%40sss.pgh.pa.us -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From d76710d4b79e173e2d1baf1af228fe7dd8927e72 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Tue, 24 Jan 2023 21:12:28 -0800 Subject: [PATCH v4 1/1] suppress useless wakeups in logical/worker.c --- src/backend/replication/logical/tablesync.c | 28 +++ src/backend/replication/logical/worker.c | 189 ++++++++++++++++---- src/include/replication/worker_internal.h | 4 + src/tools/pgindent/typedefs.list | 1 + 4 files changed, 186 insertions(+), 36 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 07eea504ba..573b46b5a2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); + /* + * If we've made it past our previously-stored special wakeup time, reset + * it so that it can be recalculated as needed. + */ + if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp()) + LogRepWorkerClearSyncStartWakeup(); + /* We need up-to-date sync state info for subscription tables here. */ FetchTableStates(&started_tx); @@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) DSM_HANDLE_INVALID); hentry->last_start_time = now; } + else + { + TimestampTz retry_time; + + /* + * Store when we can start the sync worker so that we + * know how long to sleep. + */ + retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time, + wal_retrieve_retry_interval); + LogRepWorkerUpdateSyncStartWakeup(retry_time); + } + } + else + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz retry_time; + + /* Maybe there will be a free slot in a second... */ + retry_time = TimestampTzPlusSeconds(now, 1); + LogRepWorkerUpdateSyncStartWakeup(retry_time); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 10f9711972..8e68540b6f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -208,8 +208,6 @@ #include "utils/syscache.h" #include "utils/timeout.h" -#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ - typedef struct FlushPosition { dlist_node node; @@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum LogRepWorkerWakeupReason +{ + LRW_WAKEUP_TERMINATE, + LRW_WAKEUP_PING, + LRW_WAKEUP_STATUS, + LRW_WAKEUP_SYNC_START +#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1) +} LogRepWorkerWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +static TimestampTz wakeup[NUM_LRW_WAKEUPS]; + +static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, + TimestampTz now); + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -3441,10 +3459,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - TimestampTz last_recv_timestamp = GetCurrentTimestamp(); - bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + TimestampTz now; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3474,6 +3491,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = &errcallback; apply_error_context_stack = error_context_stack; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + /* This outer loop iterates once per wait. */ for (;;) { @@ -3513,9 +3535,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) int c; StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Adjust the ping and terminate wakeup times. */ + now = GetCurrentTimestamp(); + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now); + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now); /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3607,7 +3630,29 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (!dlist_is_empty(&lsn_mapping)) wait_time = WalWriterDelay; else - wait_time = NAPTIME_PER_CYCLE; + { + TimestampTz nextWakeup = TIMESTAMP_INFINITY; + + /* + * Since process_syncing_tables() is called conditionally, the + * tablesync worker start wakeup time might be in the past, and we + * can't know for sure when it will be updated again. Rather than + * spinning in a tight loop in this case, bump this wakeup time by + * a second. + */ + now = GetCurrentTimestamp(); + if (wakeup[LRW_WAKEUP_SYNC_START] < now) + wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1); + + /* Find soonest wakeup time, to limit our nap. */ + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + nextWakeup = Min(wakeup[i], nextWakeup); + + /* + * Calculate the nap time, clamping as necessary. + */ + wait_time = TimestampDifferenceMilliseconds(now, nextWakeup); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -3625,6 +3670,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + /* recompute wakeup times */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + + /* + * LogRepWorkerComputeNextWakeup() will have cleared the tablesync + * worker start wakeup time, so we might not wake up to start a new + * worker at the appropriate time. To deal with this, we set the + * wakeup time to right now so that + * process_syncing_tables_for_apply() recalculates it as soon as + * possible. + */ + if (!am_tablesync_worker()) + LogRepWorkerUpdateSyncStartWakeup(now); } if (rc & WL_TIMEOUT) @@ -3643,31 +3703,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Check if time since last receive from primary has reached the * configured limit. */ - if (wal_receiver_timeout > 0) + now = GetCurrentTimestamp(); + if (now >= wakeup[LRW_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); + + /* Check to see if it's time for a ping. */ + if (now >= wakeup[LRW_WAKEUP_PING]) { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); - - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication worker due to timeout"))); - - /* Check to see if it's time for a ping. */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + requestReply = true; + wakeup[LRW_WAKEUP_PING] = TIMESTAMP_INFINITY; } send_feedback(last_received, requestReply, requestReply); @@ -3703,7 +3749,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; @@ -3746,10 +3791,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!force && writepos == last_writepos && flushpos == last_flushpos && - !TimestampDifferenceExceeds(send_time, now, - wal_receiver_status_interval * 1000)) + now < wakeup[LRW_WAKEUP_STATUS]) return; - send_time = now; + + /* Make sure we wake up when it's time to send another status update. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now); if (!reply_message) { @@ -5048,3 +5094,74 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_APPLY; } } + +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. + */ +static void +LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case LRW_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = TIMESTAMP_INFINITY; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); + break; + case LRW_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = TIMESTAMP_INFINITY; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); + break; + case LRW_WAKEUP_STATUS: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = TIMESTAMP_INFINITY; + else + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); + break; + case LRW_WAKEUP_SYNC_START: + /* + * This wakeup time is manually set as needed. This function can + * only be used to initialize its value. + */ + wakeup[reason] = TIMESTAMP_INFINITY; + break; + /* there's intentionally no default: here */ + } +} + +/* + * Retrieve the current wakeup time for starting tablesync workers. + */ +TimestampTz +LogRepWorkerGetSyncStartWakeup(void) +{ + return wakeup[LRW_WAKEUP_SYNC_START]; +} + +/* + * Update the current wakeup time for starting tablesync workers. If the + * current wakeup time is <= next_sync_start, no action is taken. + */ +void +LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start) +{ + if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START]) + wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start; +} + +/* + * Clear the current wakeup time for starting tablesync workers. + */ +void +LogRepWorkerClearSyncStartWakeup(void) +{ + wakeup[LRW_WAKEUP_SYNC_START] = TIMESTAMP_INFINITY; +} diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dc87a4edd1..ae44717588 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern TimestampTz LogRepWorkerGetSyncStartWakeup(void); +extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start); +extern void LogRepWorkerClearSyncStartWakeup(void); + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 097f42e1b3..66b699852a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1443,6 +1443,7 @@ LockViewRecurse_context LockWaitPolicy LockingClause LogOpts +LogRepWorkerWakeupReason LogStmtLevel LogicalDecodeBeginCB LogicalDecodeBeginPrepareCB -- 2.25.1