Le mercredi 20 octobre 2021 11:40:18 CEST, vous avez écrit :
> > +# Setup the slot, and connect to it a first time
> > +$primary->run_log(
> > + [ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
> > + 'creating a replication slot');
> > +$primary->psql('postgres',
> > + 'INSERT INTO test_table VALUES (generate_series(1,100));');
> > +$primary->psql('postgres', 'SELECT pg_switch_wal();');
> > +$nextlsn =
> > + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
> > +chomp($nextlsn);
> > Wouldn't it be simpler to use CREATE_REPLICATION_SLOT with RESERVE_WAL
> > here, rather than going through pg_receivewal? It seems to me that
> > this would be cheaper without really impacting the coverage.
>
> You're right, we can skip two invocations of pg_receivewal like this (for
> the slot creation + for starting the slot a first time).
After sending the previous patch suite, I figured it would be worthwhile to
also have tests covering timeline switches, which was not covered before.
So please find attached a new version with an additional patch for those tests,
covering both "resume from last know archive" and "resume from the
replication slots position" cases.
--
Ronan Dunklau
>From 5a47f17a17594cc171f744ce383ba820d44b6446 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 2 Sep 2021 16:25:25 +0900
Subject: [PATCH v8 1/4] Add READ_REPLICATION_SLOT command
---
doc/src/sgml/protocol.sgml | 47 +++++++++++
src/backend/replication/repl_gram.y | 16 +++-
src/backend/replication/repl_scanner.l | 1 +
src/backend/replication/walsender.c | 89 +++++++++++++++++++++
src/include/nodes/nodes.h | 1 +
src/include/nodes/replnodes.h | 10 +++
src/test/recovery/t/001_stream_rep.pl | 47 ++++++++++-
src/test/recovery/t/006_logical_decoding.pl | 9 ++-
src/tools/pgindent/typedefs.list | 1 +
9 files changed, 218 insertions(+), 3 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b95cc88599..51a15cc3da 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2067,6 +2067,53 @@ The commands accepted in replication mode are:
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+ <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+ </term>
+ <listitem>
+ <para>
+ Read the information of a replication slot. Returns a tuple with
+ <literal>NULL</literal> values if the replication slot does not
+ exist. This command is currently only supported for <literal>physical</literal> slots.
+ </para>
+ <para>
+ In response to this command, the server will return a one-row result set,
+ containing the following fields:
+ <variablelist>
+ <varlistentry>
+ <term><literal>slot_type</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's type, either <literal>physical</literal> or
+ <literal>NULL</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's <literal>restart_lsn</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_tli</literal> (<type>int4</type>)</term>
+ <listitem>
+ <para>
+ The timeline ID for the <literal>restart_lsn</literal> position,
+ when following the current timeline history.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
<indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 126380e2df..913a99da5a 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
/* Keyword tokens. */
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
%token K_SHOW
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
- timeline_history show sql_cmd
+ read_replication_slot timeline_history show sql_cmd
%type <list> base_backup_legacy_opt_list generic_option_list
%type <defelt> base_backup_legacy_opt generic_option
%type <uintval> opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon: ';'
command:
identify_system
+ | read_replication_slot
| base_backup
| start_replication
| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
}
;
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+ K_READ_REPLICATION_SLOT var_name
+ {
+ ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+ n->slotname = $2;
+ $$ = (Node *) n;
+ }
+ ;
+
/*
* SHOW setting
*/
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}*
BASE_BACKUP { return K_BASE_BACKUP; }
FAST { return K_FAST; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
SHOW { return K_SHOW; }
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..5d68a7e66a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,87 @@ IdentifySystem(void)
end_tup_output(tstate);
}
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+ ReplicationSlot *slot;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[READ_REPLICATION_SLOT_COLS];
+ bool nulls[READ_REPLICATION_SLOT_COLS];
+
+ tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+ INT4OID, -1, 0);
+
+ MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ slot = SearchNamedReplicationSlot(cmd->slotname, false);
+ if (slot == NULL || !slot->in_use)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+ }
+ else
+ {
+ List *timeline_history = NIL;
+ ReplicationSlot slot_contents;
+ int i = 0;
+ char xloc[MAXFNAMELEN];
+ TimeLineID slots_position_timeline;
+
+ /* Copy slot contents while holding spinlock */
+ SpinLockAcquire(&slot->mutex);
+ slot_contents = *slot;
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (OidIsValid(slot_contents.data.database))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("READ_REPLICATION_SLOT is only supported for physical slots"));
+ values[i] = CStringGetTextDatum("physical");
+ nulls[i] = false;
+ i++;
+ if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ {
+ snprintf(xloc, sizeof(xloc), "%X/%X",
+ LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+ values[i] = CStringGetTextDatum(xloc);
+ nulls[i] = false;
+ }
+ i++;
+
+ /*
+ * Now get the timeline this wal was produced on, to get to the
+ * current timeline
+ */
+ if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ {
+ timeline_history = readTimeLineHistory(ThisTimeLineID);
+ slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+ timeline_history);
+ values[i] = Int32GetDatum(slots_position_timeline);
+ nulls[i] = false;
+ }
+ i++;
+
+ Assert(i == READ_REPLICATION_SLOT_COLS);
+ }
+
+ dest = CreateDestReceiver(DestRemoteSimple);
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+ do_tup_output(tstate, values, nulls);
+ end_tup_output(tstate);
+}
+
/*
* Handle TIMELINE_HISTORY command.
@@ -1622,6 +1704,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
+ case T_ReadReplicationSlotCmd:
+ cmdtag = "READ_REPLICATION_SLOT";
+ set_ps_display(cmdtag);
+ ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
+ break;
+
case T_BaseBackupCmd:
cmdtag = "BASE_BACKUP";
set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e0057daa06..6201940637 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -493,6 +493,7 @@ typedef enum NodeTag
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/
T_IdentifySystemCmd,
+ T_ReadReplicationSlotCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..46384ea074 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
NodeTag type;
} IdentifySystemCmd;
+/* ----------------------
+ * READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+} ReadReplicationSlotCmd;
+
/* ----------------------
* BASE_BACKUP command
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 9916a36012..2317f55149 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 55;
# Initialize primary node
my $node_primary = PostgresNode->new('primary');
@@ -254,6 +254,51 @@ ok( $ret == 0,
"SHOW with superuser-settable parameter, replication role and logical replication"
);
+note "testing READ_REPLICATION_SLOT command";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ 'READ_REPLICATION_SLOT non_existent_slot;',
+ extra_params => [ '-d', $connstr_rep ]);
+ok( $ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with non existent slot");
+ok($stdout eq '||',
+ "READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+ extra_params => [ '-d', $connstr_rep ],
+ 0,
+ 'physical slot created on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "READ_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with existing slot");
+like($stdout, qr/^physical\|[^|]*\|1$/,
+ "READ_REPLICATION_SLOT returns tuple corresponding to the slot");
+
+$node_primary->psql(
+ 'postgres',
+ "DROP_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ],
+ 0,
+ 'physical slot dropped on primary');
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+ 'postgres',
+ "READ_REPLICATION_SLOT $slotname;",
+ extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0,
+ "READ_REPLICATION_SLOT does not produce an error with dropped slot");
+ok($stdout eq '||',
+ "READ_REPLICATION_SLOT returns NULL values if slot has been dropped");
+
note "switching to physical replication slot";
# Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..1e08afca74 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 15;
use Config;
# Initialize primary node
@@ -39,6 +39,13 @@ ok( $stderr =~
m/replication slot "test_slot" was not created in this database/,
"Logical decoding correctly fails to start");
+($result, $stdout, $stderr) = $node_primary->psql(
+ 'template1',
+ qq[READ_REPLICATION_SLOT test_slot;],
+ replication => 'database');
+like($stderr, qr/READ_REPLICATION_SLOT is only supported for physical slots/,
+ 'Logical replication slot is not supported');
+
# Check case of walsender not using a database connection. Logical
# decoding should not be allowed.
($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..a8e4a2afd6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2128,6 +2128,7 @@ ReadBufferMode
ReadBytePtrType
ReadExtraTocPtrType
ReadFunc
+ReadReplicationSlotCmd
ReassignOwnedStmt
RecheckForeignScan_function
RecordCacheEntry
--
2.33.0
>From 08718ca8563fa1de809fa6e95b65dbd54629f1e9 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 1 Sep 2021 15:52:32 +0200
Subject: [PATCH v8 2/4] Use READ_REPLICATION_SLOT command in pg_receivewal.
Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.
If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.
To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
src/bin/pg_basebackup/pg_receivewal.c | 20 ++++-
src/bin/pg_basebackup/streamutil.c | 87 ++++++++++++++++++++
src/bin/pg_basebackup/streamutil.h | 1 +
src/bin/pg_basebackup/t/020_pg_receivewal.pl | 43 +++++++++-
4 files changed, 147 insertions(+), 4 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fe..1128389839 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -409,10 +409,26 @@ StreamLog(void)
stream.startpos = FindStreamingStart(&stream.timeline);
if (stream.startpos == InvalidXLogRecPtr)
{
- stream.startpos = serverpos;
- stream.timeline = servertli;
+ /* Try to get it from the slot if any, and the server supports it */
+ if (replication_slot != NULL)
+ {
+ /* Error is logged by GetSlotInformation, so just return. */
+ if (!GetSlotInformation(conn, replication_slot, &stream.startpos, &stream.timeline))
+ return;
+ }
+
+ /*
+ * If it is still unknown, use the current flush value from the server
+ */
+ if (stream.startpos == InvalidXLogRecPtr)
+ {
+ stream.startpos = serverpos;
+ stream.timeline = servertli;
+ }
}
+ Assert(stream.startpos != InvalidXLogRecPtr);
+
/*
* Always start streaming at the beginning of a segment
*/
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce214..1257c7b536 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -479,6 +479,93 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
return true;
}
+
+/*
+ * Returns wether a replication slot exists through a given connection,
+ * and fills in the slot_info with the results if passed by the caller.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID* restart_tli)
+{
+ PGresult *res;
+ PQExpBuffer query;
+
+ Assert(slot_name != NULL);
+
+ if (PQserverVersion(conn) < 150000)
+ return false;
+
+
+ query = createPQExpBuffer();
+ appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+ res = PQexec(conn, query->data);
+ destroyPQExpBuffer(query);
+ /* The commpand should always return precisely one tuple */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not read replication slot : %s",
+ PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+
+ if (PQntuples(res) != 1 || PQnfields(res) < 3)
+ {
+ pg_log_error("could not fetch replication slot: got %d rows and %d fields, expected %d rows and %d or more fields",
+ PQntuples(res), PQnfields(res), 1, 3);
+ PQclear(res);
+ return false;
+ }
+
+ /*
+ * When the slot doesn't exist, the command returns an all-null tuple. The
+ * first column (slot_type) will only be null if the slot doesn't exists.
+ */
+ if (PQgetisnull(res, 0, 0))
+ {
+ PQclear(res);
+ pg_log_error("replication slot \"%s\" does not exist", slot_name);
+ return false;
+ }
+
+ /* This cannot happen for now, but future versions may accept
+ * READ_REPLICATION_SLOT for other slot types.
+ */
+ if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+ pg_log_error("expected a physical slot, got type \"%s\" instead",
+ PQgetvalue(res, 0, 0));
+
+ /* Restart LSN */
+ if (restart_lsn)
+ {
+ if (PQgetisnull(res, 0, 1))
+ *restart_lsn = InvalidXLogRecPtr;
+ else
+ {
+ uint32 hi,
+ lo;
+
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ pg_log_error("could not parse slot's restart_lsn \"%s\"",
+ PQgetvalue(res, 0, 1));
+ PQclear(res);
+ return false;
+ }
+ *restart_lsn = ((uint64) hi) << 32 | lo;
+ }
+ }
+ if (restart_tli)
+ {
+ if (PQgetisnull(res, 0, 2))
+ *restart_tli = 0;
+ else
+ *restart_tli = atoi(PQgetvalue(res, 0, 2));
+ }
+ PQclear(res);
+ return true;
+}
+
/*
* Create a replication slot for the given connection. This function
* returns true in case of success.
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e0..907457b8ae 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,7 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
bool use_new_option_syntax,
char *option_name, int32 option_value);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli);
extern bool RetrieveWalSegSize(PGconn *conn);
extern TimestampTz feGetCurrentTimestamp(void);
extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 0b33d73900..42749c3a40 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 32;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
# Generate some WAL. Use --synchronous at the same time to add more
# code coverage. Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full
$primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
$primary->psql('postgres', 'SELECT pg_switch_wal();');
my $nextlsn =
@@ -146,6 +146,45 @@ $primary->command_ok(
$partial_wals[0] =~ s/(\.gz)?.partial//;
ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+unlink glob "'${stream_dir}/*'";
+# Setup the slot
+$primary->psql('', "CREATE_REPLICATION_SLOT $slot_name PHYSICAL (RESERVE_WAL)",
+ replication => 1);
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+ "SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+# First verify what happens if we try with a non-existing slot.
+$primary->command_fails_like(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', 'nonexistentslot', '-n', '--verbose', '--endpos', $nextlsn ],
+ qr/pg_receivewal: error: replication slot "nonexistentslot" does not exist/,
+ 'pg_receivewal fails with a non-existing slot');
+$primary->command_ok(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+ "Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
# Permissions on WAL files should be default
SKIP:
{
--
2.33.0
>From bf9db289aed59b4e67e5491e08feded66638c617 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Mon, 6 Sep 2021 11:08:54 +0200
Subject: [PATCH v8 3/4] Add documentation for pg_receivewal
---
doc/src/sgml/ref/pg_receivewal.sgml | 26 ++++++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..5fb2b61d34 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -37,6 +37,32 @@ PostgreSQL documentation
<xref linkend="continuous-archiving"/>).
</para>
+ <para>
+ When <application>pg_receivewal</application> is launched, it tries to determine the
+ starting LSN from the following sources:
+ <orderedlist>
+ <listitem>
+ <para>
+ It scans the target directory to determine the next file it should archive.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ If it doesn't find that file, and a slot was specified, it uses the
+ <command>READ_REPLICATION_SLOT</command> command to retrieve the slot's <literal>restart_lsn</literal>
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ If it still don't find a valid start position via <command>READ_REPLICATION_SLOT</command>,
+ it uses the last flush location as reported by the server.
+ </para>
+ </listitem>
+ </orderedlist>
+ </para>
+
<para>
<application>pg_receivewal</application> streams the write-ahead
log in real time as it's being generated on the server, and does not wait
--
2.33.0
>From f85b402491dd22ecc2521bbb7a3f75bfe47c60a8 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 20 Oct 2021 14:52:13 +0200
Subject: [PATCH v8 4/4] Add tests making sure pg_receivewal can follow a
timeline switch.
---
src/bin/pg_basebackup/t/020_pg_receivewal.pl | 61 +++++++++++++++++++-
1 file changed, 60 insertions(+), 1 deletion(-)
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 42749c3a40..a5e19c5616 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 32;
+use Test::More tests => 38;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
@@ -185,6 +185,65 @@ $primary->command_ok(
$slot = $primary->slot($slot_name);
ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+# Check what happens on a timeline switch
+sub change_timeline{
+ # Setup a new standby, start pg_receivewal once, then promote it.
+ my ($primary, $backup_name, $standby_name, $slot_name, $stream_dir) = @_;
+ my $standby = PostgresNode->new($standby_name);
+ $standby->init_from_backup($primary, $backup_name, has_streaming => 1);
+ $standby->start;
+ $primary->wait_for_catchup($standby, 'replay', $primary->lsn('write'));
+ $standby->psql('', "CREATE_REPLICATION_SLOT $slot_name PHYSICAL (RESERVE_WAL)",
+ replication => 1);
+ $primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+ # Get a position which can act as an end position.
+ my $nextlsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+ $primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+ # Stream some wal, this will populate the stream directory for the
+ # resume-from-archive test case.
+ $standby->run_log(
+ [ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn, "--slot", $slot_name],
+ "Stream some wal before promoting");
+ $standby->psql(
+ 'postgres',
+ "SELECT pg_promote(wait_seconds => 300)");
+ $standby->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+ $standby->safe_psql('postgres', 'SELECT pg_switch_wal();');
+ # Get a full walfilename that will be generated.
+ my $walfilename = $standby->safe_psql('postgres', "SELECT pg_walfile_name(pg_current_wal_insert_lsn())");
+ $standby->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+ $standby->safe_psql('postgres', 'SELECT pg_switch_wal();');
+ $nextlsn = $standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+ return $standby, $nextlsn, $walfilename;
+}
+
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+my $standby;
+unlink glob "'${stream_dir}/*'";
+# First test: change timeline, and check we can resume from the slot.
+($standby, $nextlsn, $walfile_to_be_archived) = change_timeline($primary, 'my_backup', 'standby1', $slot_name, $stream_dir);
+# To resume from the slot, the target directory must be empty
+unlink glob "'${stream_dir}/*'";
+$standby->command_ok(
+ [ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn, "--slot", $slot_name],
+ "Stream some wal after promoting, resuming from the slot's position");
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the new timeline has been archived");
+ok(-e "$stream_dir/00000002.history", "Timeline history file has been archived");
+
+# Now do the same, but resuming from the latest archived file.
+# Cleanup leftover from the first standby
+unlink glob "'${stream_dir}/*'";
+($standby, $nextlsn, $walfile_to_be_archived) = change_timeline($primary, 'my_backup', 'standby2', $slot_name, $stream_dir);
+$standby->command_ok(
+ [ 'pg_receivewal', '-D', $stream_dir, '--verbose', '--endpos', $nextlsn],
+ "Stream some wal after promoting, resuming from the latest archived file");
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the new timeline has been archived");
+ok(-e "$stream_dir/00000002.history", "Timeline history file has been archived");
+
# Permissions on WAL files should be default
SKIP:
{
--
2.33.0