On Thu, Oct 21, 2021 at 10:36:42AM +0200, Ronan Dunklau wrote:
> Done. I haven't touched the timeline switch test patch for now, but I still 
> include it here for completeness.

As 0001 and 0002 have been applied, I have put my hands on 0003, and
found a couple of issues upon review.

+       Assert(slot_name != NULL);
+       Assert(restart_lsn != NULL);
There is no need for those asserts, as we should support the case
where the caller gives NULL for those variables.

+       if (PQserverVersion(conn) < 150000)
+               return false;
Returning false is incorrect for older server versions as we won't
fallback to the old method when streaming from older server.  What
this needs to do is return true and set restart_lsn to
InvalidXLogRecPtr, so as pg_receivewal would just stream from the
current flush location.  "false" should just be returned on error,
with pg_log_error().

+$primary->psql('postgres',
+       'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
There is no need to switch twice to a new WAL segment as we just need
to be sure that the WAL segment of the restart_lsn is the one
archived.  Note that RESERVE_WAL uses the last redo point, so it is
better to use a checkpoint and reduce the number of logs we stream
into the new location.

Better to add some --no-sync to the new commands of pg_receivewal, to
not stress the I/O more than necessary.  I have added some extra -n
while on it to avoid loops on failure.

Attached is the updated patch I am finishing with, which is rather
clean now.  I have tweaked a couple of things while on it, and
documented better the new GetSlotInformation() in streamutil.c.
--
Michael
From 3b9c5336866881361ecaa428ae8e57f5ba9aee90 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Mon, 25 Oct 2021 15:34:46 +0900
Subject: [PATCH v11] Allow pg_receivewal to stream from a slot's restart LSN

Prior to this patch, when running pg_receivewal, the streaming start
point would be the current location of the archives if anything is
found, and pg_receivewal would fall back to the current WAL flush
location if there are no archives, as of the result of an
IDENTIFY_SYSTEM command.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.  So this makes
pg_receivewal use the following sequence of methods to determine the
starting streaming LSN:
- Scan the local archives.
- Use the slot's restart_lsn, if supported.
- Fallback to the current flush LSN.

To keep compatibility with prior server versions, we only attempt to use
READ_REPLICATION_SLOT if the backend version is at least 15, and
fallback to the older behavior of streaming from the current flush
LSN if the command is not supported.

Some new TAP tests are added to cover this feature.
---
 src/bin/pg_basebackup/pg_receivewal.c        |  29 +++++-
 src/bin/pg_basebackup/streamutil.c           | 104 +++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |   1 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  53 +++++++++-
 doc/src/sgml/ref/pg_receivewal.sgml          |  11 ++
 5 files changed, 194 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..85f2ce53a6 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -404,15 +404,38 @@ StreamLog(void)
 		exit(1);
 
 	/*
-	 * Figure out where to start streaming.
+	 * Figure out where to start streaming.  First scan the local directory.
 	 */
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/*
+		 * Try to get the starting point from the slot if any, the upstream
+		 * server may not support this option.
+		 */
+		if (replication_slot != NULL)
+		{
+			if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
+									&stream.timeline))
+			{
+				/* Error is logged by GetSlotInformation() */
+				return;
+			}
+		}
+
+		/*
+		 * If it the starting point is still not known, use the current WAL
+		 * flush value as last resort.
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
+	Assert(stream.startpos != InvalidXLogRecPtr);
+
 	/*
 	 * Always start streaming at the beginning of a segment
 	 */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce214..3d3250a3f5 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,110 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Run READ_REPLICATION_SLOT through a given connection and give back to
+ * caller some result information if requested for this slot:
+ * - Start LSN position, InvalidXLogRecPtr if unknown.
+ * - Current timeline ID, 0 if unknown.
+ * Returns false on failure, and true otherwise.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+	XLogRecPtr	lsn_loc = InvalidXLogRecPtr;
+	TimeLineID	tli_loc = 0;
+
+	if (*restart_lsn)
+		*restart_lsn = lsn_loc;
+	if (restart_tli != NULL)
+		*restart_tli = tli_loc;
+
+	/*
+	 * If the command is not supported, letting the caller know that no
+	 * information was found.
+	 */
+	if (PQserverVersion(conn) < 150000)
+		return true;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return false;
+	}
+
+	/* The command should always return precisely one tuple and three fields */
+	if (PQntuples(res) != 1 || PQnfields(res) != 3)
+	{
+		pg_log_error("could not read replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+		PQclear(res);
+		return false;
+	}
+
+	/*
+	 * When the slot doesn't exist, the command returns a tuple with NULL
+	 * values.  This checks only the slot type field.
+	 */
+	if (PQgetisnull(res, 0, 0))
+	{
+		pg_log_error("replication slot \"%s\" does not exist", slot_name);
+		PQclear(res);
+		return false;
+	}
+
+	/*
+	 * Note that this cannot happen as READ_REPLICATION_SLOT supports only
+	 * physical slots, but play it safe.
+	 */
+	if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+	{
+		pg_log_error("expected a physical replication slot, got type \"%s\" instead",
+					 PQgetvalue(res, 0, 0));
+		PQclear(res);
+		return false;
+	}
+
+	/* restart LSN */
+	if (!PQgetisnull(res, 0, 1))
+	{
+		uint32		hi,
+					lo;
+
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			pg_log_error("could not parse slot's restart_lsn \"%s\"",
+						 PQgetvalue(res, 0, 1));
+			PQclear(res);
+			return false;
+		}
+		lsn_loc = ((uint64) hi) << 32 | lo;
+	}
+
+	/* current TLI */
+	if (!PQgetisnull(res, 0, 2))
+		tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
+
+	PQclear(res);
+
+	/* Assign results if requested */
+	if (restart_lsn)
+		*restart_lsn = lsn_loc;
+	if (restart_tli)
+		*restart_tli = tli_loc;
+
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e0..907457b8ae 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,7 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
 									   bool use_new_option_syntax,
 									   char *option_name, int32 option_value);
 
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index b93493b5e9..c7801f9f2f 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Utils;
 use PostgreSQL::Test::Cluster;
-use Test::More tests => 27;
+use Test::More tests => 31;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -72,6 +72,8 @@ $primary->command_ok(
 my @partial_wals = glob "$stream_dir/*\.partial";
 is(scalar(@partial_wals), 1, "one partial WAL segment was created");
 
+note "Testing pg_receivewal with compression methods";
+
 # Check ZLIB compression if available.
 SKIP:
 {
@@ -155,3 +157,52 @@ SKIP:
 	ok(check_mode_recursive($stream_dir, 0700, 0600),
 		"check stream dir permissions");
 }
+
+note "Testing pg_receivewal with slot as starting streaming point";
+
+# When using a replication slot, archiving should be resumed from the slot's
+# restart LSN.  Use a new archive location and new slot for this test.
+my $slot_dir = $primary->basedir . '/slot_wal';
+mkdir($slot_dir);
+$slot_name = 'archive_slot';
+
+# Setup the slot, reserving WAL at creation (corresponding to the
+# last redo LSN here, actually).
+$primary->psql('postgres',
+	"SELECT pg_create_physical_replication_slot('$slot_name', true);");
+
+# Get the segment name associated with the slot's restart LSN, that should
+# be archived.
+my $walfile_streamed = $primary->safe_psql(
+	'postgres',
+	"SELECT pg_walfile_name(restart_lsn)
+  FROM pg_replication_slots
+  WHERE slot_name = '$slot_name';");
+
+# Switch to a new segment, to make sure that the segment retained by the
+# slot is still streamed.  This may not be necessary, but play it safe.
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Check case where the slot does not exist.
+$primary->command_fails_like(
+	[
+		'pg_receivewal',   '-D', $slot_dir,   '--slot',
+		'nonexistentslot', '-n', '--no-sync', '--verbose',
+		'--endpos',        $nextlsn
+	],
+	qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+	'pg_receivewal fails with non-existing slot');
+$primary->command_ok(
+	[
+		'pg_receivewal', '-D', $slot_dir,   '--slot',
+		$slot_name,      '-n', '--no-sync', '--verbose',
+		'--endpos',      $nextlsn
+	],
+	"WAL streamed from the slot's restart_lsn");
+ok(-e "$slot_dir/$walfile_streamed",
+	"WAL from the slot's restart_lsn has been archived");
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 6da8b2be8c..5fff503fe5 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -88,6 +88,17 @@ PostgreSQL documentation
      </para>
     </listitem>
 
+    <listitem>
+     <para>
+      If a starting point could not be calculated with the previous method,
+      and if a replication slot is used, an extra
+      <command>READ_REPLICATION_SLOT</command> is issued to retrieve the
+      slot's <literal>restart_lsn</literal> to use as starting point.
+      This option is only available when streaming write-ahead logs from
+      <productname>PostgreSQL</productname> 15 and up.
+     </para>
+    </listitem>
+
     <listitem>
      <para>
       If a starting point cannot be calculated with the previous method,
-- 
2.33.0

Attachment: signature.asc
Description: PGP signature

Reply via email to