Le jeudi 29 juillet 2021, 11:09:40 CEST Ronan Dunklau a écrit :
> Patch 0001 adds the new READ_REPLICATION_SLOT command.
> It returns for a given slot the type, restart_lsn, flush_lsn,
> restart_lsn_timeline and flush_lsn_timeline.
> The timelines are determined by reading the current timeline history, and
> finding the timeline where we may find the record. I didn't find explicit
> test for eg IDENTIFY_SYSTEM so didn't write one either for this new
> command, but it is tested indirectly in patch 0002.
> 
> Patch 0002 makes pg_receivewal use that command if we use a replication slot
> and the command is available, and use the restart_lsn and
> restart_lsn_timeline as a starting point. It also adds a small test to
> check that we start back from the previous restart_lsn instead of the
> current flush position when our destination directory does not contain any
> WAL file.
> 
> I also noticed we don't test following a timeline switch. It would probably
> be good to add that, both for the case where we determine the previous
> timeline from the archived segments and when it comes from the new command.
> What do you think ?

Following the discussion at [1], I refactored the implementation into 
streamutil and added a third patch making use of it in pg_basebackup itself in 
order to fail early if the replication slot doesn't exist, so please find 
attached v2 for that.

Best regards,

[1]: https://www.postgresql.org/message-id/flat/
CAD21AoDYmv0yJMQnWtCx_kZGwVZnkQSTQ1re2JNSgM0k37afYQ%40mail.gmail.com

-- 
Ronan Dunklau
>From fff8786049326864d3ef8fe4539e1829f933f32f Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:34:54 +0200
Subject: [PATCH v2 1/3] Add READ_REPLICATION_SLOT command.

This commit introduces a new READ_REPLICATION_SLOT <slot_name> command.
This command is used to read information about a replication slot when
using a physical replication connection.

In this first version it returns the slot type, restart_lsn, flush_lsn and
the timeline of the restart_lsn and flush_lsn, which are obtained by following the
current timeline history.
---
 doc/src/sgml/protocol.sgml             |  62 +++++++++++++++
 src/backend/replication/repl_gram.y    |  18 ++++-
 src/backend/replication/repl_scanner.l |   1 +
 src/backend/replication/walsender.c    | 106 +++++++++++++++++++++++++
 src/include/nodes/nodes.h              |   1 +
 src/include/nodes/replnodes.h          |  10 +++
 6 files changed, 197 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a232546b1d..6207171426 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,68 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+      <para>
+      Read information about the named replication slot.  This is useful to determine which WAL location we should be asking the server to start streaming at.
+      </para>
+      <para>
+      In response to this command, the server will return a one-row result set, containing the following fields:
+        <variablelist>
+          <varlistentry>
+            <term><literal>type</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's type, either <literal>physical</literal> or <literal>logical</literal>
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's restart_lsn.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+            <listitem>
+              <para>
+               The replication slot's confirmed_flush LSN.
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>restart_lsn_timeline</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the restart_lsn position, when following the current timeline
+               history
+              </para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term><literal>confirmed_flush_lsn_timeline</literal> (<type>int4</type>)</term>
+            <listitem>
+              <para>
+               The timeline ID for the confirmed_flush_lsn position, when following the current timeline
+               history
+              </para>
+            </listitem>
+          </varlistentry>
+        </variablelist>
+      </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..7298f44008 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				identify_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_opt_list
 %type <defelt>	base_backup_opt
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| identify_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+identify_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
@@ -361,6 +375,8 @@ timeline_history:
 				}
 			;
 
+
+
 opt_physical:
 			K_PHYSICAL
 			| /* EMPTY */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..9a13d1c186 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+	ReplicationSlot *slot;
+	ReplicationSlot slot_contents;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc tupdesc;
+	Datum values[5];
+	bool  nulls[5];
+	char  xloc[MAXFNAMELEN];
+	int i = 0;
+	List *timeline_history = NIL;
+	TimeLineID slots_position_timeline;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("replication slot \"%s\" does not exist",
+						cmd->slotname)));
+	}
+	/* Copy slot contents while holding spinlock */
+	SpinLockAcquire(&slot->mutex);
+	slot_contents = *slot;
+	SpinLockRelease(&slot->mutex);
+	LWLockRelease(ReplicationSlotControlLock);
+
+	tupdesc = CreateTemplateTupleDesc(5);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_lsn_timeline",
+							  INT4OID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_lsn_timeline",
+							  INT4OID, -1, 0);
+
+	if (slot_contents.data.database == InvalidOid)
+		values[i] = CStringGetTextDatum("physical");
+	else
+		values[i] = CStringGetTextDatum("logical");
+	nulls[i] = false;
+	i++;
+
+	snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+	values[i] = CStringGetTextDatum(xloc);
+	nulls[i] = false;
+	i++;
+
+	snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+	values[i] = CStringGetTextDatum(xloc);
+	nulls[i] = false;
+	i++;
+
+	/* Now get the timeline this wal was produced on, to get to the current
+	 * timeline
+	 * XXX: should we allow the caller to specify which target timeline it wants
+	 * ?
+	 */
+	if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+	{
+		timeline_history = readTimeLineHistory(ThisTimeLineID);
+		slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, timeline_history);
+		values[i] = Int32GetDatum(slots_position_timeline);
+		nulls[i] = false;
+	} else {
+		values[i] = 0;
+		nulls[i] = true;
+	}
+	i++;
+
+	if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+	{
+		if (!timeline_history)
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+		slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, timeline_history);
+		values[i] = Int32GetDatum(slots_position_timeline);
+		nulls[i] = false;
+	} else {
+		values[i] = 0;
+		nulls[i] = true;
+	}
+	i++;
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1618,6 +1717,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 6a4d82f0a8..5f78bdd573 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -495,6 +495,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ec85b7d993 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char		*slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
-- 
2.32.0

>From 935cdeef089c985867c9d18df96caca1c751f02d Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:35:39 +0200
Subject: [PATCH v2 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        | 26 ++++++++-
 src/bin/pg_basebackup/streamutil.c           | 61 ++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  2 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 46 ++++++++++++++-
 4 files changed, 131 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 4474273daf..453b3644ca 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -26,6 +26,7 @@
 #include "fe_utils/option_utils.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
+#include "pqexpbuffer.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -191,6 +192,7 @@ close_destination_dir(DIR *dest_dir, char *dest_folder)
 }
 
 
+
 /*
  * Determine starting location for streaming, based on any existing xlog
  * segments in the directory. We start at the end of the last one that is
@@ -408,8 +410,28 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot)
+		{
+			if (PQserverVersion(conn) >= 150000)
+			{
+				if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+					pg_log_warning("Could not fetch the replication_slot \"%s\" information "
+								   "resuming from the current server position instead", replication_slot);
+			}
+			else
+				pg_log_warning("Server does not suport fetching the slot's position, "
+							   "resuming from the current server position instead");
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
 	/*
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index f5b3b476e5..c902824617 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -397,6 +397,8 @@ RetrieveDataDirCreatePerm(PGconn *conn)
 	return true;
 }
 
+
+
 /*
  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
  * some result information if requested:
@@ -479,6 +481,65 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Check a replication slot exists through a given connection, and give back to
+ * caller some result information if requested:
+ * 	- restart_lsn
+ * 	- timeline
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, uint32 *restart_lsn_timeline)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+	uint32		hi,
+				lo;
+
+	if (slot_name == NULL)
+		return InvalidXLogRecPtr;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s",
+					  slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return InvalidXLogRecPtr;
+	}
+	if (PQntuples(res) == 0)
+	{
+		pg_log_error("replication slot %s does not exist", slot_name);
+		PQclear(0);
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 4)
+	{
+		pg_log_error("could not fetch replication slot LSN: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 2);
+		PQclear(res);
+		return false;
+	}
+	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+	{
+		pg_log_error("could not parse slot's restart_lsn \"%s\"",
+					 PQgetvalue(res, 0, 0));
+		PQclear(res);
+		return false;
+	}
+	if (restart_lsn)
+		*restart_lsn = ((uint64) hi) << 32 | lo;
+	if (restart_lsn_timeline)
+		*restart_lsn_timeline = atoi(PQgetvalue(res, 0, 3));
+	PQclear(res);
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 504803b976..e34f986136 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -40,6 +40,8 @@ extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
 							  XLogRecPtr *startpos,
 							  char **db_name);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, uint32 *restart_lsn_timeline);
+
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..4f0f6956b5 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 30;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full../
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,48 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+	[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+	'creating a replication slot');
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.32.0

>From 5eec8896036da42175f1f58893a87faa8f92d560 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Thu, 26 Aug 2021 14:05:26 +0200
Subject: [PATCH v2 3/3] Check slot existence in pg_basebackup.

Use the newly introduced READ_REPLICATION_SLOT command to check for a
slot existence in pg_basebackup. That way, we can fail early.
---
 src/bin/pg_basebackup/pg_basebackup.c        |  9 +++++++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl | 10 ++++++----
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7296eb97d0..6794994c69 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1867,6 +1867,15 @@ BaseBackup(void)
 	if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
 		exit(1);
 
+	/*
+	 * Check the replication slot exists if applicable
+	 */
+	if (replication_slot && !(temp_replication_slot || create_slot) && PQserverVersion(conn) >= 15000)
+	{
+		if (!GetSlotInformation(conn, replication_slot, NULL, NULL))
+			exit(1);
+	}
+
 	/*
 	 * Start the actual backup
 	 */
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index a2cb2a7679..787edd2f4f 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -10,7 +10,7 @@ use File::Path qw(rmtree);
 use Fcntl qw(:seek);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 110;
+use Test::More tests => 111;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -465,14 +465,16 @@ $node->command_ok(
 	'pg_basebackup -X stream runs with --no-slot');
 rmtree("$tempdir/backupnoslot");
 
-$node->command_fails(
+command_fails_like(
 	[
 		'pg_basebackup',             '-D',
 		"$tempdir/backupxs_sl_fail", '-X',
 		'stream',                    '-S',
-		'slot0'
+		'slot0',                     '-p',
+		"$port"
 	],
-	'pg_basebackup fails with nonexistent replication slot');
+	qr/pg_basebackup: error: could not read replication slot/,
+	'pg_basebackup fails early with nonexistent replication slot');
 
 $node->command_fails(
 	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
-- 
2.32.0

Reply via email to