From 2147194165ebc66232b6e7ff9e3a8b3d07d50a72 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 11 Aug 2022 15:35:20 +0000
Subject: [PATCH v2] Switch WAL source to stream from archive

This patch enables standby to switch to stream mode i.e. get
WAL from primary even before it fails to receive from archive
location. Currently, the standby switches to stream mode, only
when receive from archive location fails.

The standby makes an attempt to read WAL from primary after
wal_retrieve_retry_interval milliseconds reading from archive.
Reading WAL from archive may not always be efficient and cheaper
because network latencies, disk IO cost might differ on the archive
as compared to the primary and often the archive may sit far from
the standbys - all adding to the recovery performance on the
standbys.

Hence reading WAL from primary as opposed to archive enables
standbys to catch up with the primary sooner thus reducing
replication lag and avoiding WAL files accumulation on the primary.

This can benefit in any of the following situations:
1) standby in initial recovery after start/restart.
2) standby stopped streaming from primary because of connectivity
issues with the primary (either due to network issues or crash in
the primary or something else) or walreceiver got killed or
crashed for whatever reasons.
---
 doc/src/sgml/config.sgml                      |  31 +++++
 src/backend/access/transam/xlogrecovery.c     |  96 ++++++++++++-
 src/backend/utils/misc/guc.c                  |  11 ++
 src/backend/utils/misc/postgresql.conf.sample |   4 +
 src/include/access/xlogrecovery.h             |   1 +
 src/test/recovery/t/034_wal_source_switch.pl  | 126 ++++++++++++++++++
 6 files changed, 265 insertions(+), 4 deletions(-)
 create mode 100644 src/test/recovery/t/034_wal_source_switch.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 2522f4c8c5..d9a6a2ec78 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4837,6 +4837,37 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+    <varlistentry id="guc-wal-source-switch-interval" xreflabel="wal_source_switch_interval">
+      <term><varname>wal_source_switch_interval</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>wal_source_switch_interval</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies how long the standby server should wait before switching WAL
+        source from WAL archive to primary (streaming replication). This can
+        happen either during the standby initial recovery or after a previous
+        failed attempt to stream WAL from the primary.
+        If this value is specified without units, it is taken as milliseconds.
+        The default value is 5 seconds. A setting of <literal>0</literal>
+        disables the feature.
+        This parameter can only be set in
+        the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+       <para>
+        Reading WAL from archive may not always be efficient and cheaper
+        because network latencies, disk IO cost might differ on the archive as
+        compared to primary and often the archive may sit far from standbys
+        impacting the recovery performance on the standbys. Hence reading WAL
+        from the primary, by setting this parameter, as opposed to the archive
+        enables the standbys to catch up with the primary sooner thus reducing
+        replication lag and avoiding WAL files accumulation on the primary.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-recovery-min-apply-delay" xreflabel="recovery_min_apply_delay">
       <term><varname>recovery_min_apply_delay</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index a59a0e826b..f22af22ba7 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -88,6 +88,7 @@ TimestampTz recoveryTargetTime;
 const char *recoveryTargetName;
 XLogRecPtr	recoveryTargetLSN;
 int			recovery_min_apply_delay = 0;
+int			wal_source_switch_interval = 5000;
 
 /* options formerly taken from recovery.conf for XLOG streaming */
 char	   *PrimaryConnInfo = NULL;
@@ -3397,6 +3398,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							bool nonblocking)
 {
 	static TimestampTz last_fail_time = 0;
+	static bool first_time = true;
+	static TimestampTz last_switch_time = 0;
+	bool	intentionalSourceSwitch = false;
 	TimestampTz now;
 	bool		streaming_reply_sent = false;
 
@@ -3418,6 +3422,11 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 	 * those actions are taken when reading from the previous source fails, as
 	 * part of advancing to the next state.
 	 *
+	 * Try reading WAL from primary after every wal_source_switch_interval
+	 * milliseconds, when state machine is in XLOG_FROM_ARCHIVE state. If
+	 * successful, the state machine moves to XLOG_FROM_STREAM state, otherwise
+	 * it falls back to XLOG_FROM_ARCHIVE state.
+	 *
 	 * If standby mode is turned off while reading WAL from stream, we move
 	 * to XLOG_FROM_ARCHIVE and reset lastSourceFailed, to force fetching
 	 * the files (which would be required at end of recovery, e.g., timeline
@@ -3452,8 +3461,11 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 			 * Don't allow any retry loops to occur during nonblocking
 			 * readahead.  Let the caller process everything that has been
 			 * decoded already first.
+			 *
+			 * Continue retrying for requested WAL when there was an
+			 * intentional source switch from archive to stream.
 			 */
-			if (nonblocking)
+			if (nonblocking && !intentionalSourceSwitch)
 				return XLREAD_WOULDBLOCK;
 
 			switch (currentSource)
@@ -3583,15 +3595,30 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		}
 
 		if (currentSource != oldSource)
-			elog(DEBUG2, "switched WAL source from %s to %s after %s",
-				 xlogSourceNames[oldSource], xlogSourceNames[currentSource],
-				 lastSourceFailed ? "failure" : "success");
+		{
+			if (intentionalSourceSwitch)
+			{
+				elog(DEBUG2,
+					 "switched WAL source to %s after fetching WAL from %s for %d milliseconds",
+					 xlogSourceNames[oldSource],
+					 xlogSourceNames[currentSource],
+					 wal_source_switch_interval);
+			}
+			else
+			{
+				elog(DEBUG2, "switched WAL source from %s to %s after %s",
+					 xlogSourceNames[oldSource],
+					 xlogSourceNames[currentSource],
+					 lastSourceFailed ? "failure" : "success");
+			}
+		}
 
 		/*
 		 * We've now handled possible failure. Try to read from the chosen
 		 * source.
 		 */
 		lastSourceFailed = false;
+		intentionalSourceSwitch = false;
 
 		switch (currentSource)
 		{
@@ -3614,6 +3641,67 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				if (randAccess)
 					curFileTLI = 0;
 
+				/*
+				 * Make an attempt to read WAL from primary after
+				 * wal_retrieve_retry_interval milliseconds reading from
+				 * archive.
+				 *
+				 * Reading WAL from archive may not always be efficient and
+				 * cheaper because network latencies, disk IO cost might differ
+				 * on the archive as compared to the primary and often the
+				 * archive may sit far from the standbys - all adding to the
+				 * recovery performance on the standbys.
+				 *
+				 * Hence reading WAL from primary as opposed to archive enables
+				 * standbys to catch up with the primary sooner thus reducing
+				 * replication lag and avoiding WAL files accumulation on the
+				 * primary.
+				 *
+				 * We are here for any of the following reasons:
+				 * 1) standby in initial recovery after start/restart.
+				 * 2) standby stopped streaming from primary because of
+				 * connectivity issues with the primary (either due to network
+				 * issues or crash in the primary or something else) or
+				 * walreceiver got killed or crashed for whatever reasons.
+				 */
+				if (StandbyMode && currentSource == XLOG_FROM_ARCHIVE)
+				{
+					TimestampTz curr_time;
+
+					curr_time = GetCurrentTimestamp();
+
+					/* Assume last_switch_time as curr_time for the first time */
+					if (first_time)
+						last_switch_time = curr_time;
+
+					if (!first_time &&
+						TimestampDifferenceExceeds(last_switch_time, curr_time,
+												   wal_source_switch_interval))
+					{
+						elog(DEBUG2,
+							 "trying to switch WAL source to %s after fetching WAL from %s for %d milliseconds",
+							 xlogSourceNames[XLOG_FROM_STREAM],
+							 xlogSourceNames[currentSource],
+							 wal_source_switch_interval);
+
+						last_switch_time = curr_time;
+
+						/*
+						 * Treat this as a failure to read from archive, even
+						 * though it is actually not, so that the state machine
+						 * will move on to stream the WAL from primary.
+						 */
+						lastSourceFailed = true;
+						intentionalSourceSwitch = true;
+
+						break;
+					}
+
+					/* We're not here for the first time any more */
+					if (first_time)
+						first_time = false;
+				}
+
 				/*
 				 * Try to restore the file from archive, or read an existing
 				 * file from pg_wal.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 5db5df6285..ce50e6596d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3324,6 +3324,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_source_switch_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the time to wait before switching WAL source from archive to primary"),
+			gettext_noop("0 turns this feature off."),
+			GUC_UNIT_MS
+		},
+		&wal_source_switch_interval,
+		5000, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_segment_size", PGC_INTERNAL, PRESET_OPTIONS,
 			gettext_noop("Shows the size of write ahead log segments."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 90bec0502c..ec70a76a11 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -351,6 +351,10 @@
 					# in milliseconds; 0 disables
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
+#wal_source_switch_interval = 5s	# time to wait before switching WAL
+					# source from archive to primary
+					# 0 disables the feature, > 0 indicates the
+					# interval in milliseconds.
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
 
 # - Subscribers -
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 0aa85d90e8..a4f8e9c804 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -57,6 +57,7 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int wal_source_switch_interval;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
diff --git a/src/test/recovery/t/034_wal_source_switch.pl b/src/test/recovery/t/034_wal_source_switch.pl
new file mode 100644
index 0000000000..b5579e745d
--- /dev/null
+++ b/src/test/recovery/t/034_wal_source_switch.pl
@@ -0,0 +1,126 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test for WAL source switch feature
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(
+    allows_streaming => 1,
+    has_archiving => 1,
+    extra => ['--wal-segsize=1']);
+# Ensure checkpoint doesn't come in our way
+$node_primary->append_conf(
+	'postgresql.conf', qq(
+    min_wal_size = 2MB
+    max_wal_size = 1GB
+    checkpoint_timeout=1h
+    wal_recycle = off
+));
+$node_primary->start;
+$node_primary->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('rep1')");
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1,
+    has_restoring => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'rep1'
+min_wal_size = 2MB
+max_wal_size = 1GB
+checkpoint_timeout=1h
+wal_source_switch_interval = 100ms
+wal_recycle = off
+log_min_messages = 'debug2'
+));
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+$node_primary->wait_for_catchup($node_standby);
+
+# Stop standby
+$node_standby->stop;
+
+# Advance WAL by 100 segments (= 100MB) on primary
+advance_wal($node_primary, 100);
+
+# Wait for primary to generate requested WAL files
+$node_primary->poll_query_until('postgres',
+	q|SELECT COUNT(*) >= 100 FROM pg_ls_waldir()|, 't');
+
+# The standby now connects to primary during inital recovery after
+# fetching WAL from archive for about wal_source_switch_interval milliseconds.
+$node_standby->start;
+
+$node_primary->wait_for_catchup($node_standby);
+
+ok(find_in_log(
+		$node_standby,
+        qr/restored log file ".*" from archive/),
+	    'check that some of WAL segments were fetched from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/trying to switch WAL source to .* after fetching WAL from .* for .* milliseconds/),
+	    'check that standby tried to switch WAL source to primary from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/switched WAL source to .* after fetching WAL from .* for .* milliseconds/),
+	    'check that standby actually switched WAL source to primary from archive');
+
+ok(find_in_log(
+		$node_standby,
+        qr/started streaming WAL from primary at .* on timeline .*/),
+	    'check that standby strated streaming from primary');
+
+# Stop standby
+$node_standby->stop;
+
+# Stop primary
+$node_primary->stop;
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (wal_segment_size * $n) bytes) on primary.
+	for (my $i = 0; $i < $n; $i++)
+	{
+		$node->safe_psql('postgres',
+			"CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+	return;
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+done_testing();
-- 
2.34.1

