rebased for cfbot

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 2466001a3ae6f94aac8eff45b488765e619bea1b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Thu, 1 Dec 2022 20:50:21 -0800
Subject: [PATCH v2 1/1] suppress unnecessary wakeups in logical/worker.c

---
 src/backend/replication/logical/tablesync.c |  20 +++
 src/backend/replication/logical/worker.c    | 156 +++++++++++++++-----
 src/include/replication/worker_internal.h   |   3 +
 3 files changed, 142 insertions(+), 37 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..88218e1fed 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -419,6 +419,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+	/*
+	 * If we've made it past our previously-stored special wakeup time, reset
+	 * it so that it can be recalculated as needed.
+	 */
+	if (next_sync_start <= GetCurrentTimestamp())
+		next_sync_start = PG_INT64_MAX;
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	FetchTableStates(&started_tx);
 
@@ -592,6 +599,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 												 DSM_HANDLE_INVALID);
 						hentry->last_start_time = now;
 					}
+					else if (found)
+					{
+						TimestampTz retry_time = hentry->last_start_time +
+												 (wal_retrieve_retry_interval *
+												  INT64CONST(1000));
+
+						/*
+						 * Store when we can start the sync worker so that we
+						 * know how long to sleep.
+						 */
+						if (retry_time < next_sync_start)
+							next_sync_start = retry_time;
+					}
 				}
 			}
 		}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 79cda39445..284f11428c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -207,8 +207,6 @@
 #include "utils/syscache.h"
 #include "utils/timeout.h"
 
-#define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
-
 typedef struct FlushPosition
 {
 	dlist_node	node;
@@ -348,6 +346,26 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum LogRepWorkerWakeupReason
+{
+	LRW_WAKEUP_TERMINATE,
+	LRW_WAKEUP_PING,
+	LRW_WAKEUP_STATUS,
+	NUM_LRW_WAKEUPS
+} LogRepWorkerWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_LRW_WAKEUPS];
+TimestampTz next_sync_start;
+
+static void LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason,
+										  TimestampTz now);
+
 typedef struct SubXactInfo
 {
 	TransactionId xid;			/* XID of the subxact */
@@ -3446,10 +3464,9 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 LogicalRepApplyLoop(XLogRecPtr last_received)
 {
-	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
-	bool		ping_sent = false;
 	TimeLineID	tli;
 	ErrorContextCallback errcallback;
+	TimestampTz now;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -3479,6 +3496,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = &errcallback;
 	apply_error_context_stack = error_context_stack;
 
+	/* Initialize nap wakeup times. */
+	now = GetCurrentTimestamp();
+	for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+		LogRepWorkerComputeNextWakeup(i, now);
+	next_sync_start = PG_INT64_MAX;
+
 	/* This outer loop iterates once per wait. */
 	for (;;)
 	{
@@ -3495,6 +3518,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
+		now = GetCurrentTimestamp();
 		if (len != 0)
 		{
 			/* Loop to process all available data (without blocking). */
@@ -3518,9 +3542,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					int			c;
 					StringInfoData s;
 
-					/* Reset timeout. */
-					last_recv_timestamp = GetCurrentTimestamp();
-					ping_sent = false;
+					/* Adjust the ping and terminate wakeup times. */
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_TERMINATE, now);
+					LogRepWorkerComputeNextWakeup(LRW_WAKEUP_PING, now);
 
 					/* Ensure we are reading the data into our memory context. */
 					MemoryContextSwitchTo(ApplyMessageContext);
@@ -3574,6 +3598,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+				now = GetCurrentTimestamp();
 			}
 		}
 
@@ -3612,14 +3637,43 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (!dlist_is_empty(&lsn_mapping))
 			wait_time = WalWriterDelay;
 		else
-			wait_time = NAPTIME_PER_CYCLE;
+		{
+			TimestampTz nextWakeup = PG_INT64_MAX;
+
+			/* Find soonest wakeup time, to limit our nap. */
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				nextWakeup = Min(wakeup[i], nextWakeup);
+
+			/* Also consider special wakeup time for starting sync workers. */
+			if (next_sync_start < now)
+			{
+				/*
+				 * Instead of spinning while we wait for the sync worker to
+				 * start, wait a bit before retrying (unless there's an earlier
+				 * wakeup time).
+				 */
+				nextWakeup = Min(now + INT64CONST(100000), nextWakeup);
+			}
+			else
+				nextWakeup = Min(next_sync_start, nextWakeup);
+
+			/*
+			 * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+			 * timeouts longer than INT_MAX milliseconds, so we limit the
+			 * result accordingly.  Also, we round up to the next millisecond
+			 * to avoid waking up too early and spinning until one of the
+			 * wakeup times.
+			 */
+			wait_time = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+		}
 
 		rc = WaitLatchOrSocket(MyLatch,
 							   WL_SOCKET_READABLE | WL_LATCH_SET |
 							   WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 							   fd, wait_time,
 							   WAIT_EVENT_LOGICAL_APPLY_MAIN);
-
+		now = GetCurrentTimestamp();
 		if (rc & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
@@ -3630,6 +3684,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			now = GetCurrentTimestamp();
+			for (int i = 0; i < NUM_LRW_WAKEUPS; i++)
+				LogRepWorkerComputeNextWakeup(i, now);
+
+			/*
+			 * If a wakeup time for starting sync workers was set, just set it
+			 * to right now.  It will be recalculated as needed.
+			 */
+			if (next_sync_start != PG_INT64_MAX)
+				next_sync_start = now;
 		}
 
 		if (rc & WL_TIMEOUT)
@@ -3648,31 +3712,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			 * Check if time since last receive from primary has reached the
 			 * configured limit.
 			 */
-			if (wal_receiver_timeout > 0)
-			{
-				TimestampTz now = GetCurrentTimestamp();
-				TimestampTz timeout;
+			if (now >= wakeup[LRW_WAKEUP_TERMINATE])
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("terminating logical replication worker due to timeout")));
 
-				timeout =
-					TimestampTzPlusMilliseconds(last_recv_timestamp,
-												wal_receiver_timeout);
-
-				if (now >= timeout)
-					ereport(ERROR,
-							(errcode(ERRCODE_CONNECTION_FAILURE),
-							 errmsg("terminating logical replication worker due to timeout")));
-
-				/* Check to see if it's time for a ping. */
-				if (!ping_sent)
-				{
-					timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-														  (wal_receiver_timeout / 2));
-					if (now >= timeout)
-					{
-						requestReply = true;
-						ping_sent = true;
-					}
-				}
+			/* Check to see if it's time for a ping. */
+			if (now >= wakeup[LRW_WAKEUP_PING])
+			{
+				requestReply = true;
+				wakeup[LRW_WAKEUP_PING] = PG_INT64_MAX;
 			}
 
 			send_feedback(last_received, requestReply, requestReply);
@@ -3708,7 +3757,6 @@ static void
 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 {
 	static StringInfo reply_message = NULL;
-	static TimestampTz send_time = 0;
 
 	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
@@ -3751,10 +3799,11 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force &&
 		writepos == last_writepos &&
 		flushpos == last_flushpos &&
-		!TimestampDifferenceExceeds(send_time, now,
-									wal_receiver_status_interval * 1000))
+		now < wakeup[LRW_WAKEUP_STATUS])
 		return;
-	send_time = now;
+
+	/* Make sure we wake up when it's time to send another status update. */
+	LogRepWorkerComputeNextWakeup(LRW_WAKEUP_STATUS, now);
 
 	if (!reply_message)
 	{
@@ -5034,3 +5083,36 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
 		return TRANS_LEADER_SEND_TO_PARALLEL;
 	}
 }
+
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+LogRepWorkerComputeNextWakeup(LogRepWorkerWakeupReason reason, TimestampTz now)
+{
+	switch (reason)
+	{
+		case LRW_WAKEUP_TERMINATE:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_PING:
+			if (wal_receiver_timeout <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+			break;
+		case LRW_WAKEUP_STATUS:
+			if (wal_receiver_status_interval <= 0)
+				wakeup[reason] = PG_INT64_MAX;
+			else
+				wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+			break;
+		default:
+			break;
+	}
+}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index db891eea8a..ec39711ed1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -225,6 +225,9 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+/* Next time to attempt starting sync workers. */
+extern PGDLLIMPORT TimestampTz next_sync_start;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
-- 
2.25.1

Reply via email to