Le mercredi 20 octobre 2021, 07:13:15 CEST Michael Paquier a écrit :
> On Tue, Oct 19, 2021 at 05:32:55PM +0200, Ronan Dunklau wrote:
> > Following recommendations, I stripped most of the features from the patch.
> > For now we support only physical replication slots, and only provide the
> > two fields of interest (restart_lsn, restart_tli) in addition to the slot
> > type (fixed at physical) to not paint ourselves in a corner.
> > 
> > I also removed the part about pg_basebackup since other fixes have been
> > proposed for that case.
> 
> Patch 0001 looks rather clean.  I have a couple of comments.

Thank you for the quick review !


> 
> +       if (OidIsValid(slot_contents.data.database))
> +           elog(ERROR, "READ_REPLICATION_SLOT is only supported for
> physical slots");
> 
> elog() can only be used for internal errors.  Errors that can be
> triggered by a user should use ereport() instead.

Ok.
> 
> +ok($stdout eq '||',
> +   "READ_REPLICATION_SLOT returns NULL values if slot does not exist");
> [...]
> +ok($stdout =~ 'physical\|[^|]*\|1',
> +   "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
> Isn't result pattern matching something we usually test with like()?

Ok.
> 
> +($ret, $stdout, $stderr) = $node_primary->psql(
> +   'postgres',
> +   "READ_REPLICATION_SLOT $slotname;",
> +   extra_params => [ '-d', $connstr_rep ]);
> No need for extra_params in this test.  You can just pass down
> "replication => 1" instead, no?

In that test file, every replication connection is obtained by using 
connstr_rep so I thought it would be best to use the same thing.

> 
> --- a/src/test/recovery/t/006_logical_decoding.pl
> +++ b/src/test/recovery/t/006_logical_decoding.pl
> [...]
> +ok($stderr=~ 'READ_REPLICATION_SLOT is only supported for physical slots',
> +   'Logical replication slot is not supported');
> This one should use like().

Ok.

> 
> +        <para>
> +        The slot's <literal>restart_lsn</literal> can also beused as a
> starting +        point if the target directory is empty.
> +        </para>
> I am not sure that there is a need for this addition as the same thing
> is said when describing the lookup ordering.

Ok, removed.

> 
> +      If nothing is found and a slot is specified, use the
> +      <command>READ_REPLICATION_SLOT</command>
> +      command.
> It may be clearer to say that the position is retrieved from the
> command.

Ok, done. The doc also uses the active voice here now.

> 
> +bool
> +GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr
> *restart_lsn, TimeLineID* restart_tli)
> +{
> Could you extend that so as we still run the command but don't crash
> if the caller specifies NULL for any of the result fields?  This would
> be handy.

Done.

> 
> +   if (PQgetisnull(res, 0, 0))
> +   {
> +       PQclear(res);
> +       pg_log_error("replication slot \"%s\" does not exist",
> slot_name);
> +       return false;
> +   }
> +   if (PQntuples(res) != 1 || PQnfields(res) < 3)
> +   {
> +       pg_log_error("could not fetch replication slot: got %d rows
> and %d fields, expected %d rows and %d or more fields",
> +                    PQntuples(res), PQnfields(res), 1, 3);
> +       PQclear(res);
> +       return false;
> +   }
> Wouldn't it be better to reverse the order of these two checks?

Yes it is, and the PQntuples condition should be removed from the first error 
test.

> 
> I don't mind the addition of the slot type being part of the result of
> READ_REPLICATION_SLOT even if it is not mandatory (?), but at least
> GetSlotInformation() should check after it.

Ok.

> 
> +# Setup the slot, and connect to it a first time
> +$primary->run_log(
> +   [ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
> +   'creating a replication slot');
> +$primary->psql('postgres',
> +   'INSERT INTO test_table VALUES (generate_series(1,100));');
> +$primary->psql('postgres', 'SELECT pg_switch_wal();');
> +$nextlsn =
> +  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
> +chomp($nextlsn);
> Wouldn't it be simpler to use CREATE_REPLICATION_SLOT with RESERVE_WAL
> here, rather than going through pg_receivewal?  It seems to me that
> this would be cheaper without really impacting the coverage.

You're right, we can skip two invocations of pg_receivewal like this (for the 
slot creation + for starting the slot a first time).


-- 
Ronan Dunklau
>From 845b2fdd33318f0d7528ef0ecbb69e4aecf43c46 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Mon, 6 Sep 2021 11:08:54 +0200
Subject: [PATCH v7 3/3] Add documentation for pg_receivewal

---
 doc/src/sgml/ref/pg_receivewal.sgml | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..5fb2b61d34 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -37,6 +37,32 @@ PostgreSQL documentation
    <xref linkend="continuous-archiving"/>).
   </para>
 
+  <para>
+    When <application>pg_receivewal</application> is launched, it tries to determine the
+    starting LSN from the following sources:
+    <orderedlist>
+      <listitem>
+      <para>
+      It scans the target directory to determine the next file it should archive.
+      </para>
+      </listitem>
+
+      <listitem>
+      <para>
+      If it doesn't find that file, and a slot was specified, it uses the
+      <command>READ_REPLICATION_SLOT</command> command to retrieve the slot's <literal>restart_lsn</literal>
+      </para>
+      </listitem>
+
+      <listitem>
+      <para>
+      If it still don't find a valid start position via <command>READ_REPLICATION_SLOT</command>,
+      it uses the last flush location as reported by the server.
+      </para>
+      </listitem>
+    </orderedlist>
+  </para>
+
   <para>
    <application>pg_receivewal</application> streams the write-ahead
    log in real time as it's being generated on the server, and does not wait
-- 
2.33.0

>From 5a47f17a17594cc171f744ce383ba820d44b6446 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 2 Sep 2021 16:25:25 +0900
Subject: [PATCH v7 1/3] Add READ_REPLICATION_SLOT command

---
 doc/src/sgml/protocol.sgml                  | 47 +++++++++++
 src/backend/replication/repl_gram.y         | 16 +++-
 src/backend/replication/repl_scanner.l      |  1 +
 src/backend/replication/walsender.c         | 89 +++++++++++++++++++++
 src/include/nodes/nodes.h                   |  1 +
 src/include/nodes/replnodes.h               | 10 +++
 src/test/recovery/t/001_stream_rep.pl       | 47 ++++++++++-
 src/test/recovery/t/006_logical_decoding.pl |  9 ++-
 src/tools/pgindent/typedefs.list            |  1 +
 9 files changed, 218 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b95cc88599..51a15cc3da 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2067,6 +2067,53 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read the information of a replication slot. Returns a tuple with
+      <literal>NULL</literal> values if the replication slot does not
+      exist. This command is currently only supported for <literal>physical</literal> slots.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>NULL</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID for the <literal>restart_lsn</literal> position,
+          when following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 126380e2df..913a99da5a 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				read_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_legacy_opt_list generic_option_list
 %type <defelt>	base_backup_legacy_opt generic_option
 %type <uintval>	opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon:	';'
 
 command:
 			identify_system
+			| read_replication_slot
 			| base_backup
 			| start_replication
 			| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..5d68a7e66a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,87 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+	ReplicationSlot *slot;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[READ_REPLICATION_SLOT_COLS];
+	bool		nulls[READ_REPLICATION_SLOT_COLS];
+
+	tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+							  INT4OID, -1, 0);
+
+	MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+	else
+	{
+		List	   *timeline_history = NIL;
+		ReplicationSlot slot_contents;
+		int			i = 0;
+		char		xloc[MAXFNAMELEN];
+		TimeLineID	slots_position_timeline;
+
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		if (OidIsValid(slot_contents.data.database))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("READ_REPLICATION_SLOT is only supported for physical slots"));
+		values[i] = CStringGetTextDatum("physical");
+		nulls[i] = false;
+		i++;
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			snprintf(xloc, sizeof(xloc), "%X/%X",
+					 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		i++;
+
+		/*
+		 * Now get the timeline this wal was produced on, to get to the
+		 * current timeline
+		 */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+														  timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		i++;
+
+		Assert(i == READ_REPLICATION_SLOT_COLS);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1622,6 +1704,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e0057daa06..6201940637 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -493,6 +493,7 @@ typedef enum NodeTag
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
 	T_IdentifySystemCmd,
+	T_ReadReplicationSlotCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..46384ea074 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+} ReadReplicationSlotCmd;
+
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 9916a36012..2317f55149 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -254,6 +254,51 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	'READ_REPLICATION_SLOT non_existent_slot;',
+	extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok($stdout eq '||',
+	"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+	extra_params => [ '-d', $connstr_rep ],
+	0,
+	'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with existing slot");
+like($stdout, qr/^physical\|[^|]*\|1$/,
+	"READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+	'postgres',
+	"DROP_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ],
+	0,
+	'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+	"READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok($stdout eq '||',
+	"READ_REPLICATION_SLOT returns NULL values if slot has been dropped");
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..1e08afca74 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 15;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,13 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+	'template1',
+	qq[READ_REPLICATION_SLOT test_slot;],
+	replication => 'database');
+like($stderr, qr/READ_REPLICATION_SLOT is only supported for physical slots/,
+	'Logical replication slot is not supported');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..a8e4a2afd6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2128,6 +2128,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry
-- 
2.33.0

>From 8065443324067a4b5a5da522a1a52f25a9b5f21a Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:32 +0200
Subject: [PATCH v7 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 src/bin/pg_basebackup/pg_receivewal.c        | 20 ++++-
 src/bin/pg_basebackup/streamutil.c           | 87 ++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  1 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 44 +++++++++-
 4 files changed, 148 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..1128389839 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -409,10 +409,26 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot != NULL)
+		{
+			/* Error is logged by GetSlotInformation, so just return. */
+			if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+				return;
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
+	Assert(stream.startpos != InvalidXLogRecPtr);
+
 	/*
 	 * Always start streaming at the beginning of a segment
 	 */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce214..1257c7b536 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,93 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Returns wether a replication slot exists through a given connection,
+ * and fills in the slot_info with the results if passed by the caller.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID* restart_tli)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+
+	Assert(slot_name != NULL);
+
+	if (PQserverVersion(conn) < 150000)
+		return false;
+
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	/* The commpand should always return precisely one tuple */
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not fetch replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+		PQclear(res);
+		return false;
+	}
+
+	/*
+	 * When the slot doesn't exist, the command returns an all-null tuple. The
+	 * first column (slot_type) will only be null if the slot doesn't exists.
+	 */
+	if (PQgetisnull(res, 0, 0))
+	{
+		PQclear(res);
+		pg_log_error("replication slot \"%s\" does not exist", slot_name);
+		return false;
+	}
+
+	/* This cannot happen for now, but future versions may accept
+	 * READ_REPLICATION_SLOT for other slot types.
+	 */
+	if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+		pg_log_error("expected a physical slot, got type \"%s\" instead",
+					 PQgetvalue(res, 0, 0));
+
+	/* Restart LSN */
+	if (restart_lsn)
+	{
+		if (PQgetisnull(res, 0, 1))
+			*restart_lsn = InvalidXLogRecPtr;
+		else
+		{
+			uint32		hi,
+						lo;
+
+			if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+			{
+				pg_log_error("could not parse slot's restart_lsn \"%s\"",
+							 PQgetvalue(res, 0, 1));
+				PQclear(res);
+				return false;
+			}
+			*restart_lsn = ((uint64) hi) << 32 | lo;
+		}
+	}
+	if (restart_tli)
+	{
+		if (PQgetisnull(res, 0, 2))
+			*restart_tli = 0;
+		else
+			*restart_tli = atoi(PQgetvalue(res, 0, 2));
+	}
+	PQclear(res);
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e0..907457b8ae 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,7 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
 									   bool use_new_option_syntax,
 									   char *option_name, int32 option_value);
 
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..bb7cef4f72 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 32;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,46 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+unlink glob "'${stream_dir}/*'";
+# Setup the slot
+$primary->psql('', "CREATE_REPLICATION_SLOT $slot_name PHYSICAL (RESERVE_WAL)",
+  replication => 1);
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+# First verify what happens if we try with a non-existing slot.
+$primary->command_fails_like(
+  [ 'pg_receivewal', '-D', $stream_dir, '--slot', 'nonexistentslot', '-n', '--verbose', '--endpos', $nextlsn ],
+  qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+  'pg_receivewal fails with a non-existing slot');
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.33.0

Reply via email to