On Tue, Jan 24, 2023 at 06:45:08PM -0500, Tom Lane wrote: > I took a look through this, and have a number of mostly-cosmetic > issues:
Thanks for the detailed review. > * It seems wrong that next_sync_start isn't handled as one of the > wakeup[NUM_LRW_WAKEUPS] entries. I see that it needs to be accessed from > another module; but you could handle that without exposing either enum > LogRepWorkerWakeupReason or the array, by providing getter/setter > functions for process_syncing_tables_for_apply() to call. > > * This code is far too intimately familiar with the fact that TimestampTz > is an int64 count of microseconds. (I'm picky about that because I > remember that they were once something else, so I wonder if someday > they will be different again.) You could get rid of the PG_INT64_MAX > usages by replacing those with the timestamp infinity macro DT_NOEND; > and I'd even be on board with adding a less-opaque alternate name for > that to datatype/timestamp.h. The various magic-constant multipliers > could perhaps be made less magic by using TimestampTzPlusMilliseconds(). > > * I think it might be better to construct the enum like this: > > +typedef enum LogRepWorkerWakeupReason > +{ > + LRW_WAKEUP_TERMINATE, > + LRW_WAKEUP_PING, > + LRW_WAKEUP_STATUS > +#define NUM_LRW_WAKEUPS (LRW_WAKEUP_STATUS + 1) > +} LogRepWorkerWakeupReason; > > so that you don't have to have a default: case in switches on the > enum value. I'm more worried about somebody adding an enum value > and missing updating a switch statement elsewhere than I am about > somebody adding an enum value and neglecting to update the > immediately-adjacent macro. I did all of this in v3. > * The updates of "now" in LogicalRepApplyLoop seem rather > randomly placed, and I'm not entirely convinced that we'll > always be using a reasonably up-to-date value. Can't we > just update it right before each usage? This came up for walreceiver.c, too. The concern is that GetCurrentTimestamp() might be rather expensive on systems without something like the vDSO. I don't know how common that is. If we can rule that out, then I agree that we should just update it right before each use. > * This special handling of next_sync_start seems mighty ugly: > > + /* Also consider special wakeup time for starting sync workers. > */ > + if (next_sync_start < now) > + { > + /* > + * Instead of spinning while we wait for the sync worker to > + * start, wait a bit before retrying (unless there's an > earlier > + * wakeup time). > + */ > + nextWakeup = Min(now + INT64CONST(100000), nextWakeup); > + } > + else > + nextWakeup = Min(next_sync_start, nextWakeup); > > Do we really need the slop? If so, is there a reason why it > shouldn't apply to all possible sources of nextWakeup? (It's > going to be hard to fold next_sync_start into the wakeup[] > array unless you can make this not a special case.) I'm not positive it is absolutely necessary. AFAICT the function that updates this particular wakeup time is conditionally called, so it at least seems theoretically possible that we could end up spinning in a tight loop until we attempt to start a new tablesync worker. But perhaps this is unlikely enough that we needn't worry about it. I noticed that this wakeup time wasn't being updated when the number of active tablesync workers is >= max_sync_workers_per_subscription. In v3, I tried to handle this by setting the wakeup time to a second later for this case. I think you could ordinarily depend on the tablesync worker's notify_pid to wake up the apply worker, but that wouldn't work if the apply worker has restarted. Ultimately, this particular wakeup time seems to be a special case, and I probably need to think about it some more. If you have ideas, I'm all ears. > * It'd probably be worth enlarging the comment for > LogRepWorkerComputeNextWakeup to explain why its API is like that, > perhaps "We ask the caller to pass in the value of "now" because > this frequently avoids multiple calls of GetCurrentTimestamp(). > It had better be a reasonably-up-to-date value, though." I did this in v3. I noticed that many of your comments also applied to the similar patch that was recently applied to walreceiver.c, so I created another patch to fix that up. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 3b464bf0ccb22e36ab627a5e19981eaf3734d4dd Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Tue, 24 Jan 2023 20:52:21 -0800 Subject: [PATCH v3 1/2] code review for 05a7be9 --- src/backend/replication/walreceiver.c | 31 ++++++++++++++------------- src/include/utils/timestamp.h | 1 + 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3876c0188d..0563bad0f6 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -122,8 +122,8 @@ typedef enum WalRcvWakeupReason WALRCV_WAKEUP_TERMINATE, WALRCV_WAKEUP_PING, WALRCV_WAKEUP_REPLY, - WALRCV_WAKEUP_HSFEEDBACK, - NUM_WALRCV_WAKEUPS + WALRCV_WAKEUP_HSFEEDBACK +#define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1) } WalRcvWakeupReason; /* @@ -525,7 +525,7 @@ WalReceiverMain(void) break; /* Find the soonest wakeup time, to limit our nap. */ - nextWakeup = PG_INT64_MAX; + nextWakeup = DT_NOEND; for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) nextWakeup = Min(wakeup[i], nextWakeup); @@ -604,7 +604,7 @@ WalReceiverMain(void) if (now >= wakeup[WALRCV_WAKEUP_PING]) { requestReply = true; - wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX; + wakeup[WALRCV_WAKEUP_PING] = DT_NOEND; } XLogWalRcvSendReply(requestReply, requestReply); @@ -1310,7 +1310,10 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) /* * Compute the next wakeup time for a given wakeup reason. Can be called to * initialize a wakeup time, to adjust it for the next wakeup, or to - * reinitialize it when GUCs have changed. + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. */ static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) @@ -1319,29 +1322,27 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now) { case WALRCV_WAKEUP_TERMINATE: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = DT_NOEND; else - wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); break; case WALRCV_WAKEUP_PING: if (wal_receiver_timeout <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = DT_NOEND; else - wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000); + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); break; case WALRCV_WAKEUP_HSFEEDBACK: if (!hot_standby_feedback || wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = DT_NOEND; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; case WALRCV_WAKEUP_REPLY: if (wal_receiver_status_interval <= 0) - wakeup[reason] = PG_INT64_MAX; + wakeup[reason] = DT_NOEND; else - wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000); - break; - default: + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); break; } } diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 42f802bb9d..1a63bc7c2d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -82,6 +82,7 @@ IntervalPGetDatum(const Interval *X) #define INTERVAL_RANGE(t) (((t) >> 16) & INTERVAL_RANGE_MASK) #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) +#define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) /* Set at postmaster start */ -- 2.25.1
>From 0a4d3a6c62bacd2b5592043ca4ba2408b127f1f5 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Tue, 24 Jan 2023 21:12:28 -0800 Subject: [PATCH v3 2/2] suppress useless wakeups in logical/worker.c --- src/backend/replication/logical/tablesync.c | 28 +++ src/backend/replication/logical/worker.c | 192 ++++++++++++++++---- src/include/replication/worker_internal.h | 4 + src/tools/pgindent/typedefs.list | 1 + 4 files changed, 189 insertions(+), 36 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 07eea504ba..573b46b5a2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); + /* + * If we've made it past our previously-stored special wakeup time, reset + * it so that it can be recalculated as needed. + */ + if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp()) + LogRepWorkerClearSyncStartWakeup(); + /* We need up-to-date sync state info for subscription tables here. */ FetchTableStates(&started_tx); @@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) DSM_HANDLE_INVALID); hentry->last_start_time = now; } + else + { + TimestampTz retry_time; + + /* + * Store when we can start the sync worker so that we + * know how long to sleep. + */ + retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time, + wal_retrieve_retry_interval); + LogRepWorkerUpdateSyncStartWakeup(retry_time); + } + } + else + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz retry_time; + + /* Maybe there will be a free slot in a second... */ + retry_time = TimestampTzPlusSeconds(now, 1); + LogRepWorkerUpdateSyncStartWakeup(retry_time); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cfb2ab6248..83fb8c3110 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -208,8 +208,6 @@ #include "utils/syscache.h" #include "utils/timeout.h" -#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ - typedef struct FlushPosition { dlist_node node; @@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * Reasons to wake up and perform periodic tasks. + */ +typedef enum LogRepWorkerWakeupReason +{ + LRW_WAKEUP_TERMINATE, + LRW_WAKEUP_PING, + LRW_WAKEUP_STATUS, + LRW_WAKEUP_SYNC_START +#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1) +} LogRepWorkerWakeupReason; + +/* + * Wake up times for periodic tasks. + */ +static TimestampTz wakeup[NUM_LRW_WAKEUPS]; + +static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, + TimestampTz now); + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -3449,10 +3467,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - TimestampTz last_recv_timestamp = GetCurrentTimestamp(); - bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + TimestampTz now; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3482,6 +3499,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) error_context_stack = &errcallback; apply_error_context_stack = error_context_stack; + /* Initialize nap wakeup times. */ + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + /* This outer loop iterates once per wait. */ for (;;) { @@ -3498,6 +3520,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); if (len != 0) { /* Loop to process all available data (without blocking). */ @@ -3521,9 +3544,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) int c; StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + /* Adjust the ping and terminate wakeup times. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now); + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now); /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3577,6 +3600,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + now = GetCurrentTimestamp(); } } @@ -3615,7 +3639,33 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (!dlist_is_empty(&lsn_mapping)) wait_time = WalWriterDelay; else - wait_time = NAPTIME_PER_CYCLE; + { + TimestampTz nextWakeup = DT_NOEND; + + /* + * Since process_syncing_tables() is called conditionally, the + * tablesync worker start wakeup time might be in the past, and we + * can't know for sure when it will be updated again. Rather than + * spinning in a tight loop in this case, bump this wakeup time by + * a second. + */ + now = GetCurrentTimestamp(); + if (wakeup[LRW_WAKEUP_SYNC_START] < now) + wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1); + + /* Find soonest wakeup time, to limit our nap. */ + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + nextWakeup = Min(wakeup[i], nextWakeup); + + /* + * Calculate the nap time. WaitLatchOrSocket() doesn't accept + * timeouts longer than INT_MAX milliseconds, so we limit the + * result accordingly. Also, we round up to the next millisecond + * to avoid waking up too early and spinning until one of the + * wakeup times. + */ + wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000)); + } rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | @@ -3623,6 +3673,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) fd, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN); + now = GetCurrentTimestamp(); if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); @@ -3633,6 +3684,20 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + now = GetCurrentTimestamp(); + for (int i = 0; i < NUM_LRW_WAKEUPS; i++) + LogRepWorkerComputeNextWakeup(i, now); + + /* + * LogRepWorkerComputeNextWakeup() will have cleared the tablesync + * worker start wakeup time, so we might not wake up to start a new + * worker at the appropriate time. To deal with this, we set the + * wakeup time to right now so that + * process_syncing_tables_for_apply() recalculates it as soon as + * possible. + */ + if (!am_tablesync_worker()) + LogRepWorkerUpdateSyncStartWakeup(now); } if (rc & WL_TIMEOUT) @@ -3651,31 +3716,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * Check if time since last receive from primary has reached the * configured limit. */ - if (wal_receiver_timeout > 0) - { - TimestampTz now = GetCurrentTimestamp(); - TimestampTz timeout; - - timeout = - TimestampTzPlusMilliseconds(last_recv_timestamp, - wal_receiver_timeout); + if (now >= wakeup[LRW_WAKEUP_TERMINATE]) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); - if (now >= timeout) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("terminating logical replication worker due to timeout"))); - - /* Check to see if it's time for a ping. */ - if (!ping_sent) - { - timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); - if (now >= timeout) - { - requestReply = true; - ping_sent = true; - } - } + /* Check to see if it's time for a ping. */ + if (now >= wakeup[LRW_WAKEUP_PING]) + { + requestReply = true; + wakeup[LRW_WAKEUP_PING] = DT_NOEND; } send_feedback(last_received, requestReply, requestReply); @@ -3711,7 +3761,6 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; @@ -3754,10 +3803,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!force && writepos == last_writepos && flushpos == last_flushpos && - !TimestampDifferenceExceeds(send_time, now, - wal_receiver_status_interval * 1000)) + now < wakeup[LRW_WAKEUP_STATUS]) return; - send_time = now; + + /* Make sure we wake up when it's time to send another status update. */ + LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now); if (!reply_message) { @@ -5056,3 +5106,73 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) return TRANS_LEADER_APPLY; } } + +/* + * Compute the next wakeup time for a given wakeup reason. Can be called to + * initialize a wakeup time, to adjust it for the next wakeup, or to + * reinitialize it when GUCs have changed. We ask the caller to pass in the + * value of "now" because this frequently avoids multiple calls of + * GetCurrentTimestamp(). It had better be a reasonably up-to-date value + * though. + */ +static void +LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now) +{ + switch (reason) + { + case LRW_WAKEUP_TERMINATE: + if (wal_receiver_timeout <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout); + break; + case LRW_WAKEUP_PING: + if (wal_receiver_timeout <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2); + break; + case LRW_WAKEUP_STATUS: + if (wal_receiver_status_interval <= 0) + wakeup[reason] = DT_NOEND; + else + wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval); + break; + case LRW_WAKEUP_SYNC_START: + /* + * This wakeup time is manually set as needed. This function can + * only be used to initialize its value. + */ + wakeup[reason] = DT_NOEND; + break; + } +} + +/* + * Retrieve the current wakeup time for starting tablesync workers. + */ +TimestampTz +LogRepWorkerGetSyncStartWakeup(void) +{ + return wakeup[LRW_WAKEUP_SYNC_START]; +} + +/* + * Update the current wakeup time for starting tablesync workers. If the + * current wakeup time is <= next_sync_start, no action is taken. + */ +void +LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start) +{ + if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START]) + wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start; +} + +/* + * Clear the current wakeup time for starting tablesync workers. + */ +void +LogRepWorkerClearSyncStartWakeup(void) +{ + wakeup[LRW_WAKEUP_SYNC_START] = DT_NOEND; +} diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dc87a4edd1..ae44717588 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern TimestampTz LogRepWorkerGetSyncStartWakeup(void); +extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start); +extern void LogRepWorkerClearSyncStartWakeup(void); + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 51484ca7e2..0c8b6ebc4b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1435,6 +1435,7 @@ LockViewRecurse_context LockWaitPolicy LockingClause LogOpts +LogRepWorkerWakeupReason LogStmtLevel LogicalDecodeBeginCB LogicalDecodeBeginPrepareCB -- 2.25.1