Le jeudi 29 juillet 2021, 07:32:37 CEST Kyotaro Horiguchi a écrit :
> I didn't thought in details. But I forgot that ordinary SQL commands
> have been prohibited in physical replication connection. So we need a
> new replication command but it's not that a big deal.
Thank you for your feedback !
>
> > I don't see any reason not to make it work for logical replication
> > connections / slots, but it wouldn't be that useful since we can query
> > the database in that case.
>
> Ordinary SQL queries are usable on a logical replication slot so
> I'm not sure how logical replication connection uses the command.
> However, like you, I wouldn't bother restricting the command to
> physical replication, but perhaps the new command should return the
> slot type.
>
Ok done in the attached patch.
>
> I'm not sure it's worth adding complexity for such strictness.
> START_REPLICATION safely fails if someone steals the slot meanwhile.
> In the first place there's no means to protect a slot from others
> while idle. One possible problem is the case where START_REPLICATION
> successfully acquire the slot after the new command failed. But that
> case doesn't seem worse than the case someone advances the slot while
> absence. So I think READ_REPLICATION_SLOT is sufficient.
>
Ok, I implemented it like this. I tried to follow the pg_get_replication_slots
approach with regards to how to prevent concurrent modification while reading
the slot.
> > From pg_receivewal point of view, this would amount to:
> > - check if we currently have wal in the target directory.
> >
> > - if we do, proceed as currently done, by computing the start lsn and
> >
> > timeline from the last archived wal
> >
> > - if we don't, and we have a slot, run ACQUIRE_REPLICATION_SLOT. Use the
> >
> > restart_lsn as the start lsn if there is one, and don't provide a timeline
> >
> > - if we still don't have a start_lsn, fallback to using the current
> > server
> >
> > wal position as is done.
>
> That's pretty much it.
Great.
>
> > What do you think ? Which information should we provide about the slot ?
>
> We need the timeline id to start with when using restart_lsn. The
> current timeline can be used in most cases but there's a case where
> the LSN is historical.
Ok, see below.
>
> pg_receivewal doesn't send a replication status report when a segment
> is finished. So after pg_receivewal stops just after a segment is
> finished, the slot stays at the beginning of the last segment. Thus
> next time it will start from there, creating a duplicate segment.
I'm not sure I see where the problem is here. If we don't keep the segments in
pg_walreceiver target directory, then it would be the responsibility of
whoever moved them to make sure we don't have duplicates, or to handle them
gracefully.
Even if we were forcing a feedback after a segment is finished, there could
still be a problem if the feedback never made it to the server but the segment
was here. It might be interesting to send a feedback anyway.
Please find attached two patches implementing what we've been discussing.
Patch 0001 adds the new READ_REPLICATION_SLOT command.
It returns for a given slot the type, restart_lsn, flush_lsn,
restart_lsn_timeline and flush_lsn_timeline.
The timelines are determined by reading the current timeline history, and
finding the timeline where we may find the record. I didn't find explicit test
for eg IDENTIFY_SYSTEM so didn't write one either for this new command, but it
is tested indirectly in patch 0002.
Patch 0002 makes pg_receivewal use that command if we use a replication slot
and the command is available, and use the restart_lsn and restart_lsn_timeline
as a starting point. It also adds a small test to check that we start back
from the previous restart_lsn instead of the current flush position when our
destination directory does not contain any WAL file.
I also noticed we don't test following a timeline switch. It would probably be
good to add that, both for the case where we determine the previous timeline
from the archived segments and when it comes from the new command. What do you
think ?
Regards,
--
Ronan Dunklau
>From 37d9545c05b9e36aafac751f9dc549e75798413c Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:34:54 +0200
Subject: [PATCH v1 1/2] Add READ_REPLICATION_SLOT command.
This commit introduces a new READ_REPLICATION_SLOT <slot_name> command.
This command is used to read information about a replication slot when
using a physical replication connection.
In this first version it returns the slot type, restart_lsn, flush_lsn and
the timeline of the restart_lsn and flush_lsn, which are obtained by following the
current timeline history.
---
doc/src/sgml/protocol.sgml | 62 +++++++++++++++
src/backend/replication/repl_gram.y | 18 ++++-
src/backend/replication/repl_scanner.l | 1 +
src/backend/replication/walsender.c | 106 +++++++++++++++++++++++++
src/include/nodes/nodes.h | 1 +
src/include/nodes/replnodes.h | 10 +++
6 files changed, 197 insertions(+), 1 deletion(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index e8cb78ff1f..ea15e888d2 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2052,6 +2052,68 @@ The commands accepted in replication mode are:
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+ <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+ </term>
+ <listitem>
+ <para>
+ Read information about the named replication slot. This is useful to determine which WAL location we should be asking the server to start streaming at.
+ </para>
+ <para>
+ In response to this command, the server will return a one-row result set, containing the following fields:
+ <variablelist>
+ <varlistentry>
+ <term><literal>type</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's type, either <literal>physical</literal> or <literal>logical</literal>
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's restart_lsn.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>confirmed_flush_lsn</literal> (<type>text</type>)</term>
+ <listitem>
+ <para>
+ The replication slot's confirmed_flush LSN.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>restart_lsn_timeline</literal> (<type>int4</type>)</term>
+ <listitem>
+ <para>
+ The timeline ID for the restart_lsn position, when following the current timeline
+ history
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>confirmed_flush_lsn_timeline</literal> (<type>int4</type>)</term>
+ <listitem>
+ <para>
+ The timeline ID for the confirmed_flush_lsn position, when following the current timeline
+ history
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
<indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index e1e8ec29cc..7298f44008 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
/* Keyword tokens. */
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
%token K_SHOW
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
- timeline_history show sql_cmd
+ identify_replication_slot timeline_history show sql_cmd
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
%type <uintval> opt_timeline
@@ -120,6 +121,7 @@ opt_semicolon: ';'
command:
identify_system
+ | identify_replication_slot
| base_backup
| start_replication
| start_logical_replication
@@ -140,6 +142,18 @@ identify_system:
}
;
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+identify_replication_slot:
+ K_READ_REPLICATION_SLOT var_name
+ {
+ ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+ n->slotname = $2;
+ $$ = (Node *) n;
+ }
+ ;
+
/*
* SHOW setting
*/
@@ -361,6 +375,8 @@ timeline_history:
}
;
+
+
opt_physical:
K_PHYSICAL
| /* EMPTY */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}*
BASE_BACKUP { return K_BASE_BACKUP; }
FAST { return K_FAST; }
IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; }
SHOW { return K_SHOW; }
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..9a13d1c186 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
end_tup_output(tstate);
}
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+ ReplicationSlot *slot;
+ ReplicationSlot slot_contents;
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ Datum values[5];
+ bool nulls[5];
+ char xloc[MAXFNAMELEN];
+ int i = 0;
+ List *timeline_history = NIL;
+ TimeLineID slots_position_timeline;
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ slot = SearchNamedReplicationSlot(cmd->slotname, false);
+ if (slot == NULL || !slot->in_use)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("replication slot \"%s\" does not exist",
+ cmd->slotname)));
+ }
+ /* Copy slot contents while holding spinlock */
+ SpinLockAcquire(&slot->mutex);
+ slot_contents = *slot;
+ SpinLockRelease(&slot->mutex);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ tupdesc = CreateTemplateTupleDesc(5);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "type",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "confirmed_flush_lsn",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "restart_lsn_timeline",
+ INT4OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "confirmed_flush_lsn_timeline",
+ INT4OID, -1, 0);
+
+ if (slot_contents.data.database == InvalidOid)
+ values[i] = CStringGetTextDatum("physical");
+ else
+ values[i] = CStringGetTextDatum("logical");
+ nulls[i] = false;
+ i++;
+
+ snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+ values[i] = CStringGetTextDatum(xloc);
+ nulls[i] = false;
+ i++;
+
+ snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(slot_contents.data.confirmed_flush));
+ values[i] = CStringGetTextDatum(xloc);
+ nulls[i] = false;
+ i++;
+
+ /* Now get the timeline this wal was produced on, to get to the current
+ * timeline
+ * XXX: should we allow the caller to specify which target timeline it wants
+ * ?
+ */
+ if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+ {
+ timeline_history = readTimeLineHistory(ThisTimeLineID);
+ slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, timeline_history);
+ values[i] = Int32GetDatum(slots_position_timeline);
+ nulls[i] = false;
+ } else {
+ values[i] = 0;
+ nulls[i] = true;
+ }
+ i++;
+
+ if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+ {
+ if (!timeline_history)
+ timeline_history = readTimeLineHistory(ThisTimeLineID);
+ slots_position_timeline = tliOfPointInHistory(slot_contents.data.confirmed_flush, timeline_history);
+ values[i] = Int32GetDatum(slots_position_timeline);
+ nulls[i] = false;
+ } else {
+ values[i] = 0;
+ nulls[i] = true;
+ }
+ i++;
+
+ dest = CreateDestReceiver(DestRemoteSimple);
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+ do_tup_output(tstate, values, nulls);
+ end_tup_output(tstate);
+}
+
/*
* Handle TIMELINE_HISTORY command.
@@ -1618,6 +1717,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
+ case T_ReadReplicationSlotCmd:
+ cmdtag = "READ_REPLICATION_SLOT";
+ set_ps_display(cmdtag);
+ ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
+ break;
+
case T_BaseBackupCmd:
cmdtag = "BASE_BACKUP";
set_ps_display(cmdtag);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index f7b009ec43..ade121ad71 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -496,6 +496,7 @@ typedef enum NodeTag
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/
T_IdentifySystemCmd,
+ T_ReadReplicationSlotCmd,
T_BaseBackupCmd,
T_CreateReplicationSlotCmd,
T_DropReplicationSlotCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ec85b7d993 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,16 @@ typedef struct IdentifySystemCmd
NodeTag type;
} IdentifySystemCmd;
+/* ----------------------
+ * READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+ NodeTag type;
+ char *slotname;
+} ReadReplicationSlotCmd;
+
/* ----------------------
* BASE_BACKUP command
--
2.32.0
>From 0258bb4e628c5dc5fb35b8b289d8599740e42b17 Mon Sep 17 00:00:00 2001
From: Ronan Dunklau <ronan.dunk...@aiven.io>
Date: Wed, 28 Jul 2021 16:35:39 +0200
Subject: [PATCH v1 2/2] Use READ_REPLICATION_SLOT command in pg_receivewal.
Prior to this patch, when running pg_receivewal, the start LSN is determined by looking at
the WAL files currently stored on disk, then using the current flush lsn
from the server.
If for some reason the WAL files from pg_receivewal were moved, we want
to restart where we left at, which is the replication slot's restart_lsn
instead of skipping right to the current flush location.
To keep compatibility with prior server versions, we only attempt it if
the version is < 15.
---
src/bin/pg_basebackup/pg_receivewal.c | 61 +++++++++++++++++++-
src/bin/pg_basebackup/t/020_pg_receivewal.pl | 46 ++++++++++++++-
2 files changed, 103 insertions(+), 4 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 4474273daf..02c4d4f1a5 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -26,6 +26,7 @@
#include "fe_utils/option_utils.h"
#include "getopt_long.h"
#include "libpq-fe.h"
+#include "pqexpbuffer.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -190,6 +191,48 @@ close_destination_dir(DIR *dest_dir, char *dest_folder)
}
}
+static XLogRecPtr
+GetSlotRestartLSN(const char *slot_name, uint32 *timeline)
+{
+ PGresult *res;
+ PQExpBuffer query;
+ uint32 hi,
+ lo;
+ XLogRecPtr startpos;
+ if (slot_name == NULL)
+ return InvalidXLogRecPtr;
+
+ query = createPQExpBuffer();
+ appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s",
+ slot_name);
+ res = PQexec(conn, query->data);
+ destroyPQExpBuffer(query);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not acquire replication slot \"%s\": %s",
+ slot_name, PQerrorMessage(conn));
+ PQclear(res);
+ return InvalidXLogRecPtr;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 4)
+ {
+ pg_log_error("could not fetch replication slot LSN: got %d rows and %d fields, expected %d rows and %d or more fields",
+ PQntuples(res), PQnfields(res), 1, 2);
+ PQclear(res);
+ return InvalidXLogRecPtr;
+ }
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ pg_log_error("could not parse slot's restart_lsn \"%s\"",
+ PQgetvalue(res, 0, 0));
+ PQclear(res);
+ return InvalidXLogRecPtr;
+ }
+ startpos = ((uint64) hi) << 32 | lo;
+ *timeline = atoi(PQgetvalue(res, 0, 3));
+ PQclear(res);
+ return startpos;
+}
/*
* Determine starting location for streaming, based on any existing xlog
@@ -408,8 +451,22 @@ StreamLog(void)
stream.startpos = FindStreamingStart(&stream.timeline);
if (stream.startpos == InvalidXLogRecPtr)
{
- stream.startpos = serverpos;
- stream.timeline = servertli;
+ /* Try to get it from the slot if any, and the server supports it */
+ if (replication_slot)
+ {
+ if (PQserverVersion(conn) >= 150000)
+ stream.startpos = GetSlotRestartLSN(replication_slot, &stream.timeline);
+ else
+ pg_log_warning("Server does not suport fetching the slot's position, "
+ "resuming from the current server position instead");
+ }
+ /* If it is still unknown, use the current flush value from the server
+ */
+ if (stream.startpos == InvalidXLogRecPtr)
+ {
+ stream.startpos = serverpos;
+ stream.timeline = servertli;
+ }
}
/*
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 950083d21c..537a0d9602 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 27;
+use Test::More tests => 30;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
@@ -49,7 +49,7 @@ is($primary->slot($slot_name)->{'slot_type'},
# Generate some WAL. Use --synchronous at the same time to add more
# code coverage. Switch to the next segment first so that subsequent
-# restarts of pg_receivewal will see this segment as full..
+# restarts of pg_receivewal will see this segment as full../
$primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
$primary->psql('postgres', 'SELECT pg_switch_wal();');
my $nextlsn =
@@ -144,6 +144,48 @@ $primary->command_ok(
$partial_wals[0] =~ s/(\.gz)?.partial//;
ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
+# Verify that if we use a replication slot, we resume where we left even in the
+# absence of WALs
+
+# Setup the slot, and connect to it a first time
+$primary->run_log(
+ [ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
+ 'creating a replication slot');
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->run_log(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+ "streaming some WAL");
+
+# Get the slot restart_lsn and make sure we retrieve the associated WAL file
+# even after deletion of the previous stored WAL files.
+$slot = $primary->slot($slot_name);
+my $restart_lsn = $slot->{'restart_lsn'};
+# Add one so that the restart_lsn doesn't correspond to the previous file.
+$restart_lsn =~ s/.$/1/;
+my $walfile_to_be_archived = $primary->safe_psql('postgres',
+ "SELECT pg_walfile_name('$restart_lsn');");
+isnt($restart_lsn, '', 'restart LSN of new slot is not null');
+
+unlink glob "'${stream_dir}/*'";
+
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+$primary->command_ok(
+ [ 'pg_receivewal', '-D', $stream_dir, '--slot', $slot_name, '--verbose', '--endpos', $nextlsn ],
+ "Stream some wal starting from the slot's restart_lsn");
+$slot = $primary->slot($slot_name);
+my @walfiles = glob "${stream_dir}/*";
+ok(-e "$stream_dir/$walfile_to_be_archived", "WAL from the slot's restart_lsn has been archived");
+
# Permissions on WAL files should be default
SKIP:
{
--
2.32.0