I've attached a minimally-updated patch that doesn't yet address the bigger
topics under discussion.

On Thu, Mar 16, 2023 at 03:30:37PM +0530, Amit Kapila wrote:
> On Wed, Feb 1, 2023 at 5:35 AM Nathan Bossart <nathandboss...@gmail.com> 
> wrote:
>> On Sat, Jan 28, 2023 at 10:26:25AM +0530, Amit Kapila wrote:
>> > BTW, do we need to do something about wakeups in
>> > wait_for_relation_state_change()?
>>
>> ... and wait_for_worker_state_change(), and copy_read_data().  From a quick
>> glance, it looks like fixing these would be a more invasive change.
> 
> What kind of logic do you have in mind to avoid waking up once per
> second in those cases?

I haven't looked into this too much yet.  I'd probably try out Tom's
suggestions from upthread [0] next and see if those can be applied here,
too.

>>  TBH
>> I'm beginning to wonder whether all this is really worth it to prevent
>> waking up once per second.
> 
> If we can't do it for all cases, do you see any harm in doing it for
> cases where we can achieve it without adding much complexity? We can
> probably add comments for others so that if someone else has better
> ideas in the future we can deal with those as well.

I don't think there's any harm, but I'm also not sure it does a whole lot
of good.  At the very least, I think we should figure out something better
than the process_syncing_tables() hacks before taking this patch seriously.

[0] https://postgr.es/m/3220831.1674772625%40sss.pgh.pa.us

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From d76710d4b79e173e2d1baf1af228fe7dd8927e72 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Tue, 24 Jan 2023 21:12:28 -0800
Subject: [PATCH v4 1/1] suppress useless wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  28 +++
 src/backend/replication/logical/worker.c    | 189 ++++++++++++++++----
 src/include/replication/worker_internal.h   |   4 +
 src/tools/pgindent/typedefs.list            |   1 +
 4 files changed, 186 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..573b46b5a2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (LogRepWorkerGetSyncStartWakeup() <= GetCurrentTimestamp())
+		LogRepWorkerClearSyncStartWakeup();
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -592,6 +599,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
+					else
+					{
+						TimestampTz retry_time;
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						retry_time = TimestampTzPlusMilliseconds(hentry->last_start_time,
+																 wal_retrieve_retry_interval);
+						LogRepWorkerUpdateSyncStartWakeup(retry_time);
+					}
+				}
+				else
+				{
+					TimestampTz now = GetCurrentTimestamp();
+					TimestampTz retry_time;
+
+					/* Maybe there will be a free slot in a second... */
+					retry_time = TimestampTzPlusSeconds(now, 1);
+					LogRepWorkerUpdateSyncStartWakeup(retry_time);
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 10f9711972..8e68540b6f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -208,8 +208,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -351,6 +349,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	LRW_WAKEUP_SYNC_START
+#define NUM_LRW_WAKEUPS (LRW_WAKEUP_SYNC_START + 1)
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -3441,10 +3459,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3474,6 +3491,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = &errcallback;
 	apply_error_context_stack = error_context_stack;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -3513,9 +3535,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					now = GetCurrentTimestamp();
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -3607,7 +3630,29 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = TIMESTAMP_INFINITY;
+
+			/*
+			 * Since process_syncing_tables() is called conditionally, the
+			 * tablesync worker start wakeup time might be in the past, and we
+			 * can't know for sure when it will be updated again.  Rather than
+			 * spinning in a tight loop in this case, bump this wakeup time by
+			 * a second.
+			 */
+			now = GetCurrentTimestamp();
+			if (wakeup[LRW_WAKEUP_SYNC_START] < now)
+				wakeup[LRW_WAKEUP_SYNC_START] = TimestampTzPlusSeconds(wakeup[LRW_WAKEUP_SYNC_START], 1);
+
+			/* Find soonest wakeup time, to limit our nap. */
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/*
+			 * Calculate the nap time, clamping as necessary.
+			 */
+			wait_time = TimestampDifferenceMilliseconds(now, nextWakeup);
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
@@ -3625,6 +3670,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			/* recompute wakeup times */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * LogRepWorkerComputeNextWakeup() will have cleared the tablesync
+			 * worker start wakeup time, so we might not wake up to start a new
+			 * worker at the appropriate time.  To deal with this, we set the
+			 * wakeup time to right now so that
+			 * process_syncing_tables_for_apply() recalculates it as soon as
+			 * possible.
+			 */
+			if (!am_tablesync_worker())
+				LogRepWorkerUpdateSyncStartWakeup(now);
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -3643,31 +3703,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
+			now = GetCurrentTimestamp();
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
+
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
 			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
-
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = TIMESTAMP_INFINITY;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -3703,7 +3749,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3746,10 +3791,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -5048,3 +5094,74 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_APPLY;
 	}
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.  We ask the caller to pass in the
+ * value of "now" because this frequently avoids multiple calls of
+ * GetCurrentTimestamp().  It had better be a reasonably up-to-date value
+ * though.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = TIMESTAMP_INFINITY;
+			else
+				wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
+			break;
+		case LRW_WAKEUP_SYNC_START:
+			/*
+			 * This wakeup time is manually set as needed.  This function can
+			 * only be used to initialize its value.
+			 */
+			wakeup[reason] = TIMESTAMP_INFINITY;
+			break;
+			/* there's intentionally no default: here */
+	}
+}
+
+/*
+ * Retrieve the current wakeup time for starting tablesync workers.
+ */
+TimestampTz
+LogRepWorkerGetSyncStartWakeup(void)
+{
+	return wakeup[LRW_WAKEUP_SYNC_START];
+}
+
+/*
+ * Update the current wakeup time for starting tablesync workers.  If the
+ * current wakeup time is <= next_sync_start, no action is taken.
+ */
+void
+LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start)
+{
+	if (next_sync_start < wakeup[LRW_WAKEUP_SYNC_START])
+		wakeup[LRW_WAKEUP_SYNC_START] = next_sync_start;
+}
+
+/*
+ * Clear the current wakeup time for starting tablesync workers.
+ */
+void
+LogRepWorkerClearSyncStartWakeup(void)
+{
+	wakeup[LRW_WAKEUP_SYNC_START] = TIMESTAMP_INFINITY;
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4edd1..ae44717588 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+extern TimestampTz LogRepWorkerGetSyncStartWakeup(void);
+extern void LogRepWorkerUpdateSyncStartWakeup(TimestampTz next_sync_start);
+extern void LogRepWorkerClearSyncStartWakeup(void);
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 097f42e1b3..66b699852a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1443,6 +1443,7 @@ LockViewRecurse_context
 LockWaitPolicy
 LockingClause
 LogOpts
+LogRepWorkerWakeupReason
 LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeBeginPrepareCB
-- 
2.25.1

Reply via email to