Le jeudi 21 octobre 2021, 09:21:44 CEST Michael Paquier a écrit :
> On Thu, Oct 21, 2021 at 08:29:54AM +0200, Ronan Dunklau wrote:
> > Ok, do you want me to propose a different patch for previous versions ?
> 
> That's not necessary.  Thanks for proposing.
> 
> > Do you mean restart_lsn as the pointer argument to the function, or
> > restart_lsn as the field returned by the command ? If it's the first, I'll
> > change it but if it's the latter it is expected that we sometime run this
> > on a slot where WAL has never been reserved yet.
> 
> restart_lsn as the pointer of the function.

Done. I haven't touched the timeline switch test patch for now, but I still 
include it here for completeness.





-- 
Ronan Dunklau
>From b3703868d230f773378742a0deb4360f6cc7343a Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 21 Oct 2021 13:21:43 +0900
Subject: [PATCH v10 1/4] doc: Describe calculation method of streaming start
 for pg_receivewal

The documentation was unprecise about the fact that the current WAL
flush location is used if nothing can be found on the local archive
directory describe, independently of the compression used by each
segment (ZLIB or uncompressed).
---
 doc/src/sgml/ref/pg_receivewal.sgml | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..9adbddb657 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -75,6 +75,29 @@ PostgreSQL documentation
    one session available for the stream.
   </para>
 
+  <para>
+   The starting point of the write-ahead log streaming is calculated when
+   <application>pg_receivewal</application> starts:
+   <orderedlist>
+    <listitem>
+     <para>
+      First, scan the directory where the WAL segment files are written and
+      find the newest completed segment, using as starting point the beginning
+      of the next WAL segment file. This is calculated independently of the
+      compression method used to compress each segment.
+     </para>
+    </listitem>
+
+    <listitem>
+     <para>
+      If a starting point cannot be calculated with the previous method,
+      the latest WAL flush location is used as reported by the server from
+      a <literal>IDENTIFY_SYSTEM</literal> command.
+     </para>
+    </listitem>
+   </orderedlist>
+  </para>
+
   <para>
    If the connection is lost, or if it cannot be initially established,
    with a non-fatal error, <application>pg_receivewal</application> will
-- 
2.33.0

>From 067353a88803e96b295f425167c1195ccf984352 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 21 Oct 2021 14:23:41 +0900
Subject: [PATCH v10 2/4] Add READ_REPLICATION_SLOT

---
 doc/src/sgml/protocol.sgml                  | 48 +++++++++++
 src/backend/replication/repl_gram.y         | 16 +++-
 src/backend/replication/repl_scanner.l      |  1 +
 src/backend/replication/walsender.c         | 93 +++++++++++++++++++++
 src/include/nodes/nodes.h                   |  1 +
 src/include/nodes/replnodes.h               | 11 +++
 src/test/recovery/t/001_stream_rep.pl       | 32 ++++++-
 src/test/recovery/t/006_logical_decoding.pl | 11 ++-
 src/tools/pgindent/typedefs.list            |  1 +
 9 files changed, 211 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b95cc88599..37c26ec8ae 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2067,6 +2067,54 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read some information associated to a replication slot. Returns a tuple
+      with <literal>NULL</literal> values if the replication slot does not
+      exist. This command is currently only supported for physical replication
+      slots.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>NULL</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID associated to <literal>restart_lsn</literal>,
+          following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 126380e2df..dcb1108579 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				read_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_legacy_opt_list generic_option_list
 %type <defelt>	base_backup_legacy_opt generic_option
 %type <uintval>	opt_timeline
@@ -125,6 +126,7 @@ command:
 			| start_logical_replication
 			| create_replication_slot
 			| drop_replication_slot
+			| read_replication_slot
 			| timeline_history
 			| show
 			| sql_cmd
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..7b3d16f731 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,91 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+	ReplicationSlot *slot;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[READ_REPLICATION_SLOT_COLS];
+	bool		nulls[READ_REPLICATION_SLOT_COLS];
+
+	tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+							  INT4OID, -1, 0);
+
+	MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+	else
+	{
+		ReplicationSlot slot_contents;
+		int			i = 0;
+
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		if (OidIsValid(slot_contents.data.database))
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use \"%s\" with logical replication slots",
+						   "READ_REPLICATION_SLOT"));
+
+		/* slot type */
+		values[i] = CStringGetTextDatum("physical");
+		nulls[i] = false;
+		i++;
+
+		/* start LSN */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			char		xloc[MAXFNAMELEN];
+
+			snprintf(xloc, sizeof(xloc), "%X/%X",
+					 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		i++;
+
+		/* timeline this WAL was produced on */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			TimeLineID	slots_position_timeline;
+			List	   *timeline_history = NIL;
+
+			timeline_history = readTimeLineHistory(ThisTimeLineID);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+														  timeline_history);
+			values[i] = Int32GetDatum(slots_position_timeline);
+			nulls[i] = false;
+		}
+		i++;
+
+		Assert(i == READ_REPLICATION_SLOT_COLS);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1622,6 +1708,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e0057daa06..541e9861ba 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -496,6 +496,7 @@ typedef enum NodeTag
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
+	T_ReadReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
 	T_SQLCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..a746fafc12 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -87,6 +87,17 @@ typedef struct StartReplicationCmd
 } StartReplicationCmd;
 
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+} ReadReplicationSlotCmd;
+
+
 /* ----------------------
  *		TIMELINE_HISTORY command
  * ----------------------
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 9916a36012..8fb7487857 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 53;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -254,6 +254,36 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command for replication connection";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	'READ_REPLICATION_SLOT non_existent_slot;',
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success");
+like($stdout, qr/^||$/,
+	"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+	extra_params => [ '-d', $connstr_rep ]);
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot");
+like($stdout, qr/^physical\|[^|]*\|1$/,
+	"READ_REPLICATION_SLOT returns tuple with slot information");
+
+$node_primary->psql(
+	'postgres',
+	"DROP_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..1b74f38f10 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 15;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,15 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+	'template1',
+	qq[READ_REPLICATION_SLOT test_slot;],
+	replication => 'database');
+like(
+	$stderr,
+	qr/cannot use "READ_REPLICATION_SLOT" with logical replication slots/,
+	'READ_REPLICATION_SLOT not supported for logical slots');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..a8e4a2afd6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2128,6 +2128,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry
-- 
2.33.0

>From f2aaf094a33c3b62bae88ad95816377186c91fa6 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 20 Oct 2021 14:52:13 +0200
Subject: [PATCH v10 4/4] Add tests making sure pg_receivewal can follow a
 timeline switch.

---
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 61 +++++++++++++++++++-
 1 file changed, 60 insertions(+), 1 deletion(-)

diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 42749c3a40..a5e19c5616 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 32;
+use Test::More tests => 38;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -185,6 +185,65 @@ $primary->command_ok(
 $slot = $primary->slot($slot_name);
 ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
 
+# Check what happens on a timeline switch
+sub change_timeline{
+  # Setup a new standby, start pg_receivewal once, then promote it.
+  my ($primary, $backup_name, $standby_name, $slot_name, $stream_dir) = @_;
+  my $standby = PostgresNode->new($standby_name);
+  $standby->init_from_backup($primary, $backup_name, has_streaming => 1);
+  $standby->start;
+  $primary->wait_for_catchup($standby, 'replay', $primary->lsn('write'));
+  $standby->psql('', "CREATE_REPLICATION_SLOT $slot_name PHYSICAL (RESERVE_WAL)",
+                 replication => 1);
+  $primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+  # Get a position which can act as an end position.
+  my $nextlsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+  $primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+  # Stream some wal, this will populate the stream directory for the
+  # resume-from-archive test case.
+  $standby->run_log(
+    [ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn, "--slot", $slot_name],
+    "Stream some wal before promoting");
+  $standby->psql(
+	'postgres',
+	"SELECT pg_promote(wait_seconds => 300)");
+  $standby->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+  $standby->safe_psql('postgres', 'SELECT pg_switch_wal();');
+  # Get a full walfilename that will be generated.
+  my $walfilename = $standby->safe_psql('postgres', "SELECT pg_walfile_name(pg_current_wal_insert_lsn())");
+  $standby->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+  $standby->safe_psql('postgres', 'SELECT pg_switch_wal();');
+  $nextlsn = $standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+  return $standby, $nextlsn, $walfilename;
+}
+
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+my $standby;
+unlink glob "'${stream_dir}/*'";
+# First test: change timeline, and check we can resume from the slot.
+($standby, $nextlsn, $walfile_to_be_archived) = change_timeline($primary, 'my_backup', 'standby1', $slot_name, $stream_dir);
+# To resume from the slot, the target directory must be empty
+unlink glob "'${stream_dir}/*'";
+$standby->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn, "--slot", $slot_name],
+	"Stream some wal after promoting, resuming from the slot's position");
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the new timeline has been archived");
+ok(-e "$stream_dir/00000002.history", "Timeline history file has been archived");
+
+# Now do the same, but resuming from the latest archived file.
+# Cleanup leftover from the first standby
+unlink glob "'${stream_dir}/*'";
+($standby, $nextlsn, $walfile_to_be_archived) = change_timeline($primary, 'my_backup', 'standby2', $slot_name, $stream_dir);
+$standby->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn],
+	"Stream some wal after promoting, resuming from the latest archived file");
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the new timeline has been archived");
+ok(-e "$stream_dir/00000002.history", "Timeline history file has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.33.0

>From a455eb220e34a8fea43056cd14bbda5ea9ea1f27 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:32 +0200
Subject: [PATCH v10 3/4] Use READ_REPLICATION_SLOT command in pg_receivewal.

Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.

If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.

To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
 doc/src/sgml/ref/pg_receivewal.sgml          |  8 ++
 src/bin/pg_basebackup/pg_receivewal.c        | 20 ++++-
 src/bin/pg_basebackup/streamutil.c           | 84 ++++++++++++++++++++
 src/bin/pg_basebackup/streamutil.h           |  1 +
 src/bin/pg_basebackup/t/020_pg_receivewal.pl | 43 +++++++++-
 5 files changed, 152 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 9adbddb657..667efb3cb3 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -88,6 +88,14 @@ PostgreSQL documentation
      </para>
     </listitem>
 
+    <listitem>
+      <para>
+      If the previous method couldn't calculate a starting point, and a
+      replication slot is used, the <command>READ_REPLICATION_SLOT</command>
+      command is issued to retrieve the slot's <literal>restart_lsn</literal>.
+      </para>
+    </listitem>
+
     <listitem>
      <para>
       If a starting point cannot be calculated with the previous method,
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..1128389839 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -409,10 +409,26 @@ StreamLog(void)
 	stream.startpos = FindStreamingStart(&stream.timeline);
 	if (stream.startpos == InvalidXLogRecPtr)
 	{
-		stream.startpos = serverpos;
-		stream.timeline = servertli;
+		/* Try to get it from the slot if any, and the server supports it */
+		if (replication_slot != NULL)
+		{
+			/* Error is logged by GetSlotInformation, so just return. */
+			if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+				return;
+		}
+
+		/*
+		 * If it is still unknown, use the current flush value from the server
+		 */
+		if (stream.startpos == InvalidXLogRecPtr)
+		{
+			stream.startpos = serverpos;
+			stream.timeline = servertli;
+		}
 	}
 
+	Assert(stream.startpos != InvalidXLogRecPtr);
+
 	/*
 	 * Always start streaming at the beginning of a segment
 	 */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce214..f437b65529 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,90 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 	return true;
 }
 
+
+/*
+ * Returns wether a replication slot exists through a given connection,
+ * and fills in the slot_info with the results if passed by the caller.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID* restart_tli)
+{
+	PGresult   *res;
+	PQExpBuffer query;
+
+	Assert(slot_name != NULL);
+	Assert(restart_lsn != NULL);
+
+	if (PQserverVersion(conn) < 150000)
+		return false;
+
+	*restart_lsn = InvalidXLogRecPtr;
+
+	if (restart_tli != NULL)
+		*restart_tli = 0;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+	res = PQexec(conn, query->data);
+	destroyPQExpBuffer(query);
+	/* The command should always return precisely one tuple */
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not read replication slot : %s",
+					 PQerrorMessage(conn));
+		PQclear(res);
+		return false;
+	}
+
+	if (PQntuples(res) != 1 || PQnfields(res) < 3)
+	{
+		pg_log_error("could not read replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+					 PQntuples(res), PQnfields(res), 1, 3);
+		PQclear(res);
+		return false;
+	}
+
+	/*
+	 * When the slot doesn't exist, the command returns an all-null tuple. The
+	 * first column (slot_type) will only be null if the slot doesn't exists.
+	 */
+	if (PQgetisnull(res, 0, 0))
+	{
+		PQclear(res);
+		pg_log_error("replication slot \"%s\" does not exist", slot_name);
+		return false;
+	}
+
+	/* This cannot happen for now, but future versions may accept
+	 * READ_REPLICATION_SLOT for other slot types.
+	 */
+	if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+		pg_log_error("expected a physical slot, got type \"%s\" instead",
+					 PQgetvalue(res, 0, 0));
+
+	/* Restart LSN */
+	if (!PQgetisnull(res, 0, 1))
+	{
+		uint32		hi,
+					lo;
+
+		if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+		{
+			pg_log_error("could not parse slot's restart_lsn \"%s\"",
+						 PQgetvalue(res, 0, 1));
+			PQclear(res);
+			return false;
+		}
+		*restart_lsn = ((uint64) hi) << 32 | lo;
+	}
+	if (restart_tli && !PQgetisnull(res, 0, 2))
+	{
+		*restart_tli = atoi(PQgetvalue(res, 0, 2));
+	}
+	PQclear(res);
+	return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e0..907457b8ae 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,7 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
 									   bool use_new_option_syntax,
 									   char *option_name, int32 option_value);
 
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..42749c3a40 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 32;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
 
 # Generate some WAL.  Use --synchronous at the same time to add more
 # code coverage.  Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
 $primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 my $nextlsn =
@@ -146,6 +146,45 @@ $primary->command_ok(
 $partial_wals[0] =~ s/(\.gz)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+unlink glob "'${stream_dir}/*'";
+# Setup the slot
+$primary->psql('', "CREATE_REPLICATION_SLOT $slot_name PHYSICAL (RESERVE_WAL)",
+  replication => 1);
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+	"SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+$primary->psql('postgres',
+	'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+# First verify what happens if we try with a non-existing slot.
+$primary->command_fails_like(
+  [ 'pg_receivewal', '-D', $stream_dir, '--slot', 'nonexistentslot', '-n', '--verbose', '--endpos', $nextlsn ],
+  qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+  'pg_receivewal fails with a non-existing slot');
+$primary->command_ok(
+	[ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+	"Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
 # Permissions on WAL files should be default
 SKIP:
 {
-- 
2.33.0

Reply via email to