Hello,
Following recommendations, I stripped most of the features from the patch. For
now we support only physical replication slots, and only provide the two fields
of interest (restart_lsn, restart_tli) in addition to the slot type (fixed at
physical) to not paint ourselves in a corner.
I also removed the part about pg_basebackup since other fixes have been
proposed for that case.
Le vendredi 1 octobre 2021, 09:05:18 CEST Michael Paquier a écrit :
> On Mon, Sep 06, 2021 at 04:17:28PM +0900, Michael Paquier wrote:
> > Using READ_REPLICATION_SLOT as the command name is fine, and it could
> > be extended with more fields if necessary, implemented now with only
> > what we think is useful. Returning errors on cases that are still not
> > supported yet is fine, say for logical slots if we decide to reject
> > the case for now, and testrictions can always be lifted in the
> > future.
>
> And marked as RwF as this was three weeks ago. Please feel free to
> register a new entry if this is being worked on.
> --
> Michael
--
Ronan Dunklau
>From 4147665664164eb597fdcc86637ec9c497c36660 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 2 Sep 2021 16:25:25 +0900
Subject: [PATCH v6 1/3] Add READ_REPLICATION_SLOT command
---
doc/src/sgml/protocol.sgml | 47 +++++++++++
src/backend/replication/repl_gram.y | 16 +++-
src/backend/replication/repl_scanner.l | 1 +
src/backend/replication/walsender.c | 87 +++++++++++++++++++++
src/include/nodes/nodes.h | 1 +
src/include/nodes/replnodes.h | 10 +++
src/test/recovery/t/001_stream_rep.pl | 47 ++++++++++-
src/test/recovery/t/006_logical_decoding.pl | 9 ++-
src/tools/pgindent/typedefs.list | 1 +
9 files changed, 216 insertions(+), 3 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b95cc88599..51a15cc3da 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2067,6 +2067,53 @@ The commands accepted in replication mode are:
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+ <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+ </term>
+ <listitem>
+ <para>
+ Read the information of a replication slot. Returns a tuple with
+ <literal>NULL</literal> values if the replication slot does not
+ exist. This command is currently only supported for <literal>physical</literal> slots.
+ </para>
+ <para>
+ In response to this command, the server will return a one-row result set,
+ containing the following fields:
+ <variablelist>
+ <varlistentry>
+ <term><literal>slot_type</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's type, either <literal>physical</literal> or
+ <literal>NULL</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's <literal>restart_lsn</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+ <listitem>
+ <para>
+ The timeline ID for the <literal>restart_lsn</literal> position,
+ when following the current timeline history.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
<indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 126380e2df..913a99da5a 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
/* Keyword tokens. */
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
%token K_SHOW
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
- timeline_history show sql_cmd
+ read_replication_slot timeline_history show sql_cmd
%type <list> base_backup_legacy_opt_list generic_option_list
%type <defelt> base_backup_legacy_opt generic_option
%type <uintval> opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon: ';'
command:
identify_system
+ | read_replication_slot
| base_backup
| start_replication
| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
}
;
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+ K_READ_REPLICATION_SLOT var_name
+ {
+ ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+ n->slotname = $2;
+ $$ = (Node *) n;
+ }
+ ;
+
/*
* SHOW setting
*/
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}*
BASE_BACKUP { return K_BASE_BACKUP; }
FAST { return K_FAST; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
SHOW { return K_SHOW; }
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..329ff97ffc 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,85 @@ IdentifySystem(void)
end_tup_output(tstate);
}
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+ ReplicationSlot *slot;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[READ_REPLICATION_SLOT_COLS];
+ bool nulls[READ_REPLICATION_SLOT_COLS];
+
+ tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+ INT4OID, -1, 0);
+
+ MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ slot = SearchNamedReplicationSlot(cmd->slotname, false);
+ if (slot == NULL || !slot->in_use)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+ }
+ else
+ {
+ List *timeline_history = NIL;
+ ReplicationSlot slot_contents;
+ int i = 0;
+ char xloc[MAXFNAMELEN];
+ TimeLineID slots_position_timeline;
+
+ /* Copy slot contents while holding spinlock */
+ SpinLockAcquire(&slot->mutex);
+ slot_contents = *slot;
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (OidIsValid(slot_contents.data.database))
+ elog(ERROR, "READ_REPLICATION_SLOT is only supported for physical slots");
+ values[i] = CStringGetTextDatum("physical");
+ nulls[i] = false;
+ i++;
+ if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ {
+ snprintf(xloc, sizeof(xloc), "%X/%X",
+ LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+ values[i] = CStringGetTextDatum(xloc);
+ nulls[i] = false;
+ }
+ i++;
+
+ /*
+ * Now get the timeline this wal was produced on, to get to the
+ * current timeline
+ */
+ if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ {
+ timeline_history = readTimeLineHistory(ThisTimeLineID);
+ slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+ timeline_history);
+ values[i] = Int32GetDatum(slots_position_timeline);
+ nulls[i] = false;
+ }
+ i++;
+
+ Assert(i == READ_REPLICATION_SLOT_COLS);
+ }
+
+ dest = CreateDestReceiver(DestRemoteSimple);
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+ do_tup_output(tstate, values, nulls);
+ end_tup_output(tstate);
+}
+
/*
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1702,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
+ case T_ReadReplicationSlotCmd:
+ cmdtag = "READ_REPLICATION_SLOT";
+ set_ps_display(cmdtag);
+ ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
+ break;
+
case T_BaseBackupCmd:
cmdtag = "BASE_BACKUP";
set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e0057daa06..6201940637 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -493,6 +493,7 @@ typedef enum NodeTag
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/
T_IdentifySystemCmd,
+ T_ReadReplicationSlotCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..46384ea074 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
NodeTag type;
} IdentifySystemCmd;
+/* ----------------------
+ * READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+} ReadReplicationSlotCmd;
+
/* ----------------------
* BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 9916a36012..7d4f01d909 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
# Initialize primary node
my $node_primary = PostgresNode->new('primary');
@@ -254,6 +254,51 @@ ok( $ret == 0,
"SHOW with superuser-settable parameter, replication role and logical replication"
);
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ 'READ_REPLICATION_SLOT non_existent_slot;',
+ extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok($stdout eq '||',
+ "READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+ extra_params => [ '-d', $connstr_rep ],
+ 0,
+ 'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "READ_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with existing slot");
+ok($stdout =~ 'physical\|[^|]*\|1',
+ "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+ 'postgres',
+ "DROP_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ],
+ 0,
+ 'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "READ_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok($stdout eq '||',
+ "READ_REPLICATION_SLOT returns NULL values if slot has been dropped");
+
note "switching to physical replication slot";
# Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..802375519b 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 15;
use Config;
# Initialize primary node
@@ -39,6 +39,13 @@ ok( $stderr =~
m/replication slot "test_slot" was not created in this database/,
"Logical decoding correctly fails to start");
+($result, $stdout, $stderr) = $node_primary->psql(
+ 'template1',
+ qq[READ_REPLICATION_SLOT test_slot;],
+ replication => 'database');
+ok($stderr=~ 'READ_REPLICATION_SLOT is only supported for physical slots',
+ 'Logical replication slot is not supported');
+
# Check case of walsender not using a database connection. Logical
# decoding should not be allowed.
($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..a8e4a2afd6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2128,6 +2128,7 @@ ReadBufferMode
ReadBytePtrType
ReadExtraTocPtrType
ReadFunc
+ReadReplicationSlotCmd
ReassignOwnedStmt
RecheckForeignScan_function
RecordCacheEntry
--
2.33.0
>From 68ed56a86b0da17cadfddd939c8e2430b59c4f53 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Mon, 6 Sep 2021 11:08:54 +0200
Subject: [PATCH v6 3/3] Add documentation for pg_receivewal
---
doc/src/sgml/ref/pg_receivewal.sgml | 33 +++++++++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..46e141bc21 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -37,6 +37,34 @@ PostgreSQL documentation
<xref linkend="continuous-archiving"/>).
</para>
+ <para>
+ When <application>pg_receivewal</application> is launched, it tries to determine the
+ starting LSN from the following sources:
+ <orderedlist>
+ <listitem>
+ <para>
+ The target directory is scanned to determine what is the next file we
+ should archive.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ If nothing is found and a slot is specified, use the
+ <command>READ_REPLICATION_SLOT</command>
+ command.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ If we still don't have a valid start position, use the last flush
+ location as reported by the server.
+ </para>
+ </listitem>
+ </orderedlist>
+ </para>
+
<para>
<application>pg_receivewal</application> streams the write-ahead
log in real time as it's being generated on the server, and does not wait
@@ -198,6 +226,11 @@ PostgreSQL documentation
satisfactorily. The option <literal>--synchronous</literal> (see
below) must be specified in addition to make this work correctly.
</para>
+
+ <para>
+ The slot's <literal>restart_lsn</literal> can also be used as a starting
+ point if the target directory is empty.
+ </para>
</listitem>
</varlistentry>
--
2.33.0
>From b4e3b1f4341321c3c5d013e3ff15ff4f7dab58d7 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:32 +0200
Subject: [PATCH v6 2/3] Use READ_REPLICATION_SLOT command in pg_receivewal.
Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.
If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.
To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
src/bin/pg_basebackup/pg_receivewal.c | 20 +++++-
src/bin/pg_basebackup/streamutil.c | 73 ++++++++++++++++++++
src/bin/pg_basebackup/streamutil.h | 1 +
src/bin/pg_basebackup/t/020_pg_receivewal.pl | 51 +++++++++++++-
4 files changed, 141 insertions(+), 4 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..1128389839 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -409,10 +409,26 @@ StreamLog(void)
stream.startpos = FindStreamingStart(&stream.timeline);
if (stream.startpos == InvalidXLogRecPtr)
{
- stream.startpos = serverpos;
- stream.timeline = servertli;
+ /* Try to get it from the slot if any, and the server supports it */
+ if (replication_slot != NULL)
+ {
+ /* Error is logged by GetSlotInformation, so just return. */
+ if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+ return;
+ }
+
+ /*
+ * If it is still unknown, use the current flush value from the server
+ */
+ if (stream.startpos == InvalidXLogRecPtr)
+ {
+ stream.startpos = serverpos;
+ stream.timeline = servertli;
+ }
}
+ Assert(stream.startpos != InvalidXLogRecPtr);
+
/*
* Always start streaming at the beginning of a segment
*/
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce214..807ec5d022 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,79 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
return true;
}
+
+/*
+ * Returns wether a replication slot exists through a given connection,
+ * and fills in the slot_info with the results if passed by the caller.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID* restart_tli)
+{
+ PGresult *res;
+ PQExpBuffer query;
+
+ Assert(slot_name != NULL);
+
+ if (PQserverVersion(conn) < 150000)
+ return false;
+
+
+ query = createPQExpBuffer();
+ appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+ res = PQexec(conn, query->data);
+ destroyPQExpBuffer(query);
+ /* The commpand should always return precisely one tuple */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1)
+ {
+ pg_log_error("could not read replication slot : %s",
+ PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+
+ /*
+ * When the slot doesn't exist, the command returns an all-null tuple. The
+ * first column (slot_type) will only be null if the slot doesn't exists.
+ */
+ if (PQgetisnull(res, 0, 0))
+ {
+ PQclear(res);
+ pg_log_error("replication slot \"%s\" does not exist", slot_name);
+ return false;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 3)
+ {
+ pg_log_error("could not fetch replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+ PQntuples(res), PQnfields(res), 1, 3);
+ PQclear(res);
+ return false;
+ }
+
+ /* Restart LSN */
+ if (PQgetisnull(res, 0, 1))
+ *restart_lsn = InvalidXLogRecPtr;
+ else
+ {
+ uint32 hi,
+ lo;
+
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ pg_log_error("could not parse slot's restart_lsn \"%s\"",
+ PQgetvalue(res, 0, 1));
+ PQclear(res);
+ return false;
+ }
+ *restart_lsn = ((uint64) hi) << 32 | lo;
+ }
+ if (PQgetisnull(res, 0, 2))
+ *restart_tli = 0;
+ else
+ *restart_tli = atoi(PQgetvalue(res, 0, 2));
+ PQclear(res);
+ return true;
+}
+
/*
* Create a replication slot for the given connection. This function
* returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e0..907457b8ae 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,7 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
bool use_new_option_syntax,
char *option_name, int32 option_value);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli);
extern bool RetrieveWalSegSize(PGconn *conn);
extern TimestampTz feGetCurrentTimestamp(void);
extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..e79fd9d7f0 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 32;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
# Generate some WAL. Use --synchronous at the same time to add more
# code coverage. Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
$primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
$primary->psql('postgres', 'SELECT pg_switch_wal();');
my $nextlsn =
@@ -146,6 +146,53 @@ $primary->command_ok(
$partial_wals[0] =~ s/(\.gz)?.partial//;
ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+ [ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+ 'creating a replication slot');
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+ "streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+ "SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+# First verify what happens if we try with a non-existing slot.
+$primary->command_fails_like(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', 'nonexistentslot', '-n', '--verbose', '--endpos', $nextlsn ],
+ qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+ 'pg_receivewal fails with a non-existing slot');
+$primary->command_ok(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+ "Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
# Permissions on WAL files should be default
SKIP:
{
--
2.33.0