On Wed, Oct 20, 2021 at 02:58:26PM +0200, Ronan Dunklau wrote: > After sending the previous patch suite, I figured it would be worthwhile to > also have tests covering timeline switches, which was not covered before.
That seems independent to me. I'll take a look. > So please find attached a new version with an additional patch for those > tests, > covering both "resume from last know archive" and "resume from the > replication slots position" cases. So, taking things in order, I have looked at 0003 and 0001, and attached are refined versions for both of them. 0003 is an existing hole in the docs, which I think we had better address first and backpatch, taking into account that the starting point calculation considers compressed segments when looking for completed segments. Regarding 0001, I have found the last test to check for NULL values returned by READ_REPLICATION_SLOT after dropping the slot overlaps with the first test, so I have removed that. I have expanded a bit the use of like(), and there were some confusion with PostgresNode::psql and some extra arguments (see DROP_REPLICATION_SLOT and CREATE_REPLICATION_SLOT, and no need for return values in the CREATE case either). Some comments, docs and code have been slightly tweaked. Here are some comments about 0002. + /* The commpand should always return precisely one tuple */ s/commpand/command/ + pg_log_error("could not fetch replication slot: got %d rows and %d fields, expected %d rows and %d or more fields", + PQntuples(res), PQnfields(res), 1, 3); Should this be "could not read" instead? + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + pg_log_error("could not parse slot's restart_lsn \"%s\"", + PQgetvalue(res, 0, 1)); + PQclear(res); + return false; + } Wouldn't it be saner to initialize *restart_lsn and *restart_tli to some default values at the top of GetSlotInformation() instead, if they are specified by the caller? And I think that we should still complain even if restart_lsn is NULL. On a quick read of 0004, I find the split of the logic with change_timeline() a bit hard to understand. It looks like we should be able to make a cleaner split, but I am not sure how that would look, though. -- Michael
From afca5e895e07279049f272396e40bdb78ae61d0e Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Thu, 21 Oct 2021 13:21:43 +0900 Subject: [PATCH v9 1/2] doc: Describe calculation method of streaming start for pg_receivewal The documentation was unprecise about the fact that the current WAL flush location is used if nothing can be found on the local archive directory describe, independently of the compression used by each segment (ZLIB or uncompressed). --- doc/src/sgml/ref/pg_receivewal.sgml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml index 45b544cf49..9adbddb657 100644 --- a/doc/src/sgml/ref/pg_receivewal.sgml +++ b/doc/src/sgml/ref/pg_receivewal.sgml @@ -75,6 +75,29 @@ PostgreSQL documentation one session available for the stream. </para> + <para> + The starting point of the write-ahead log streaming is calculated when + <application>pg_receivewal</application> starts: + <orderedlist> + <listitem> + <para> + First, scan the directory where the WAL segment files are written and + find the newest completed segment, using as starting point the beginning + of the next WAL segment file. This is calculated independently of the + compression method used to compress each segment. + </para> + </listitem> + + <listitem> + <para> + If a starting point cannot be calculated with the previous method, + the latest WAL flush location is used as reported by the server from + a <literal>IDENTIFY_SYSTEM</literal> command. + </para> + </listitem> + </orderedlist> + </para> + <para> If the connection is lost, or if it cannot be initially established, with a non-fatal error, <application>pg_receivewal</application> will -- 2.33.0
From 3ef08cc14e4365eaec320294275bee9f19c7dbef Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Thu, 21 Oct 2021 14:23:41 +0900 Subject: [PATCH v9 2/2] Add READ_REPLICATION_SLOT --- src/include/nodes/nodes.h | 1 + src/include/nodes/replnodes.h | 11 +++ src/backend/replication/repl_gram.y | 16 +++- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/walsender.c | 93 +++++++++++++++++++++ src/test/recovery/t/001_stream_rep.pl | 32 ++++++- src/test/recovery/t/006_logical_decoding.pl | 11 ++- doc/src/sgml/protocol.sgml | 48 +++++++++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 211 insertions(+), 3 deletions(-) diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index e0057daa06..541e9861ba 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -496,6 +496,7 @@ typedef enum NodeTag T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, + T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_SQLCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..a746fafc12 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -87,6 +87,17 @@ typedef struct StartReplicationCmd } StartReplicationCmd; +/* ---------------------- + * READ_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct ReadReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} ReadReplicationSlotCmd; + + /* ---------------------- * TIMELINE_HISTORY command * ---------------------- diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 126380e2df..dcb1108579 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void); /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_READ_REPLICATION_SLOT %token K_SHOW %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT @@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void); %type <node> command %type <node> base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd %type <list> base_backup_legacy_opt_list generic_option_list %type <defelt> base_backup_legacy_opt generic_option %type <uintval> opt_timeline @@ -125,6 +126,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | read_replication_slot | timeline_history | show | sql_cmd @@ -140,6 +142,18 @@ identify_system: } ; +/* + * READ_REPLICATION_SLOT %s + */ +read_replication_slot: + K_READ_REPLICATION_SLOT var_name + { + ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd); + n->slotname = $2; + $$ = (Node *) n; + } + ; + /* * SHOW setting */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index c038a636c3..1b599c255e 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b811a5c0ef..7b3d16f731 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -232,6 +232,7 @@ static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); @@ -457,6 +458,91 @@ IdentifySystem(void) end_tup_output(tstate); } +/* Handle READ_REPLICATION_SLOT command */ +static void +ReadReplicationSlot(ReadReplicationSlotCmd *cmd) +{ +#define READ_REPLICATION_SLOT_COLS 3 + ReplicationSlot *slot; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[READ_REPLICATION_SLOT_COLS]; + bool nulls[READ_REPLICATION_SLOT_COLS]; + + tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli", + INT4OID, -1, 0); + + MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(cmd->slotname, false); + if (slot == NULL || !slot->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + } + else + { + ReplicationSlot slot_contents; + int i = 0; + + /* Copy slot contents while holding spinlock */ + SpinLockAcquire(&slot->mutex); + slot_contents = *slot; + SpinLockRelease(&slot->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (OidIsValid(slot_contents.data.database)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use \"%s\" with logical replication slots", + "READ_REPLICATION_SLOT")); + + /* slot type */ + values[i] = CStringGetTextDatum("physical"); + nulls[i] = false; + i++; + + /* start LSN */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + char xloc[MAXFNAMELEN]; + + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.restart_lsn)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + /* timeline this WAL was produced on */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + TimeLineID slots_position_timeline; + List *timeline_history = NIL; + + timeline_history = readTimeLineHistory(ThisTimeLineID); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, + timeline_history); + values[i] = Int32GetDatum(slots_position_timeline); + nulls[i] = false; + } + i++; + + Assert(i == READ_REPLICATION_SLOT_COLS); + } + + dest = CreateDestReceiver(DestRemoteSimple); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); +} + /* * Handle TIMELINE_HISTORY command. @@ -1622,6 +1708,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ReadReplicationSlotCmd: + cmdtag = "READ_REPLICATION_SLOT"; + set_ps_display(cmdtag); + ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_BaseBackupCmd: cmdtag = "BASE_BACKUP"; set_ps_display(cmdtag); diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 9916a36012..8fb7487857 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 49; +use Test::More tests => 53; # Initialize primary node my $node_primary = PostgresNode->new('primary'); @@ -254,6 +254,36 @@ ok( $ret == 0, "SHOW with superuser-settable parameter, replication role and logical replication" ); +note "testing READ_REPLICATION_SLOT command for replication connection"; + +my $slotname = 'test_read_replication_slot_physical'; + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + 'READ_REPLICATION_SLOT non_existent_slot;', + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success"); +like($stdout, qr/^||$/, + "READ_REPLICATION_SLOT returns NULL values if slot does not exist"); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;", + extra_params => [ '-d', $connstr_rep ]); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot"); +like($stdout, qr/^physical\|[^|]*\|1$/, + "READ_REPLICATION_SLOT returns tuple with slot information"); + +$node_primary->psql( + 'postgres', + "DROP_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); + note "switching to physical replication slot"; # Switch to using a physical replication slot. We can do this without a new diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index cc116062c2..1b74f38f10 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -10,7 +10,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 14; +use Test::More tests => 15; use Config; # Initialize primary node @@ -39,6 +39,15 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +($result, $stdout, $stderr) = $node_primary->psql( + 'template1', + qq[READ_REPLICATION_SLOT test_slot;], + replication => 'database'); +like( + $stderr, + qr/cannot use "READ_REPLICATION_SLOT" with logical replication slots/, + 'READ_REPLICATION_SLOT not supported for logical slots'); + # Check case of walsender not using a database connection. Logical # decoding should not be allowed. ($result, $stdout, $stderr) = $node_primary->psql( diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b95cc88599..37c26ec8ae 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2067,6 +2067,54 @@ The commands accepted in replication mode are: </listitem> </varlistentry> + <varlistentry> + <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> + <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm> + </term> + <listitem> + <para> + Read some information associated to a replication slot. Returns a tuple + with <literal>NULL</literal> values if the replication slot does not + exist. This command is currently only supported for physical replication + slots. + </para> + <para> + In response to this command, the server will return a one-row result set, + containing the following fields: + <variablelist> + <varlistentry> + <term><literal>slot_type</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's type, either <literal>physical</literal> or + <literal>NULL</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_lsn</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's <literal>restart_lsn</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_tli</literal> (<type>int4</type>)</term> + <listitem> + <para> + The timeline ID associated to <literal>restart_lsn</literal>, + following the current timeline history. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ] <indexterm><primary>START_REPLICATION</primary></indexterm> diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cb5b5ec74c..a8e4a2afd6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2128,6 +2128,7 @@ ReadBufferMode ReadBytePtrType ReadExtraTocPtrType ReadFunc +ReadReplicationSlotCmd ReassignOwnedStmt RecheckForeignScan_function RecordCacheEntry -- 2.33.0
signature.asc
Description: PGP signature