On Thu, Oct 21, 2021 at 10:36:42AM +0200, Ronan Dunklau wrote: > Done. I haven't touched the timeline switch test patch for now, but I still > include it here for completeness.
Thanks. I have applied and back-patched 0001, then looked again at 0002 that adds READ_REPLICATION_SLOT: - Change the TLI to use int8 rather than int4, so as we will always be right with TimelineID which is unsigned (this was discussed upthread but I got back on it after more thoughts, to avoid any future issues). - Added an extra initialization for the set of Datum values, just as an extra safety net. - There was a bug with the timeline returned when executing the command while in recovery as ThisTimeLineID is 0 in the context of a standby, but we need to support the case of physical slots even when streaming archives from a standby. The fix is similar to what we do for IDENTIFY_SYSTEM, where we need to use the timeline currently replayed from GetXLogReplayRecPtr(), before looking at the past timeline history using restart_lsn and the replayed TLI. With that in place, I think that we are good now for this part. -- Michael
From 7e20a294d18c280b8031ee6cc862ba6a661cf40f Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Sat, 23 Oct 2021 16:28:10 +0900 Subject: [PATCH v10] Add replication command READ_REPLICATION_SLOT The command is supported for physical slots for now, and returns the type of slot, its restart_lsn and its restart_tli. This will be useful for an upcoming patch related to pg_receivewal, to allow the tool to be able to stream from the position of a slot, rather than the last WAL position flushed by the backend (as reported by IDENTIFY_SYSTEM), if the archive directory is found as empty, which would be an advantage in the case of switching to a different archive location with the same slot used to avoid holes in what gets backed up. Author: Ronan Dunklau Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan --- src/include/nodes/nodes.h | 1 + src/include/nodes/replnodes.h | 11 ++ src/backend/replication/repl_gram.y | 16 ++- src/backend/replication/repl_scanner.l | 1 + src/backend/replication/walsender.c | 105 ++++++++++++++++++++ src/test/recovery/t/001_stream_rep.pl | 32 +++++- src/test/recovery/t/006_logical_decoding.pl | 11 +- doc/src/sgml/protocol.sgml | 48 +++++++++ src/tools/pgindent/typedefs.list | 1 + 9 files changed, 223 insertions(+), 3 deletions(-) diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index e0057daa06..541e9861ba 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -496,6 +496,7 @@ typedef enum NodeTag T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, + T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_SQLCmd, diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..a746fafc12 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -87,6 +87,17 @@ typedef struct StartReplicationCmd } StartReplicationCmd; +/* ---------------------- + * READ_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct ReadReplicationSlotCmd +{ + NodeTag type; + char *slotname; +} ReadReplicationSlotCmd; + + /* ---------------------- * TIMELINE_HISTORY command * ---------------------- diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 126380e2df..dcb1108579 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void); /* Keyword tokens. */ %token K_BASE_BACKUP %token K_IDENTIFY_SYSTEM +%token K_READ_REPLICATION_SLOT %token K_SHOW %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT @@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void); %type <node> command %type <node> base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd + read_replication_slot timeline_history show sql_cmd %type <list> base_backup_legacy_opt_list generic_option_list %type <defelt> base_backup_legacy_opt generic_option %type <uintval> opt_timeline @@ -125,6 +126,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | read_replication_slot | timeline_history | show | sql_cmd @@ -140,6 +142,18 @@ identify_system: } ; +/* + * READ_REPLICATION_SLOT %s + */ +read_replication_slot: + K_READ_REPLICATION_SLOT var_name + { + ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd); + n->slotname = $2; + $$ = (Node *) n; + } + ; + /* * SHOW setting */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index c038a636c3..1b599c255e 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -85,6 +85,7 @@ identifier {ident_start}{ident_cont}* BASE_BACKUP { return K_BASE_BACKUP; } FAST { return K_FAST; } IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } +READ_REPLICATION_SLOT { return K_READ_REPLICATION_SLOT; } SHOW { return K_SHOW; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b811a5c0ef..ef3e64846d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -232,6 +232,7 @@ static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); +static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); @@ -457,6 +458,103 @@ IdentifySystem(void) end_tup_output(tstate); } +/* Handle READ_REPLICATION_SLOT command */ +static void +ReadReplicationSlot(ReadReplicationSlotCmd *cmd) +{ +#define READ_REPLICATION_SLOT_COLS 3 + ReplicationSlot *slot; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[READ_REPLICATION_SLOT_COLS]; + bool nulls[READ_REPLICATION_SLOT_COLS]; + + tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn", + TEXTOID, -1, 0); + /* TimeLineID is unsigned, so int4 is not wide enough. */ + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli", + INT8OID, -1, 0); + + MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum)); + MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool)); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(cmd->slotname, false); + if (slot == NULL || !slot->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + } + else + { + ReplicationSlot slot_contents; + int i = 0; + + /* Copy slot contents while holding spinlock */ + SpinLockAcquire(&slot->mutex); + slot_contents = *slot; + SpinLockRelease(&slot->mutex); + LWLockRelease(ReplicationSlotControlLock); + + if (OidIsValid(slot_contents.data.database)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use \"%s\" with logical replication slots", + "READ_REPLICATION_SLOT")); + + /* slot type */ + values[i] = CStringGetTextDatum("physical"); + nulls[i] = false; + i++; + + /* start LSN */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + char xloc[64]; + + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(slot_contents.data.restart_lsn)); + values[i] = CStringGetTextDatum(xloc); + nulls[i] = false; + } + i++; + + /* timeline this WAL was produced on */ + if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn)) + { + TimeLineID slots_position_timeline; + TimeLineID current_timeline; + List *timeline_history = NIL; + + /* + * While in recovery, use as timeline the currently-replaying + * one to get the LSN position's history. + */ + if (RecoveryInProgress()) + (void) GetXLogReplayRecPtr(¤t_timeline); + else + current_timeline = ThisTimeLineID; + + timeline_history = readTimeLineHistory(current_timeline); + slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn, + timeline_history); + values[i] = Int64GetDatum((int64) slots_position_timeline); + nulls[i] = false; + } + i++; + + Assert(i == READ_REPLICATION_SLOT_COLS); + } + + dest = CreateDestReceiver(DestRemoteSimple); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); +} + /* * Handle TIMELINE_HISTORY command. @@ -1622,6 +1720,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_ReadReplicationSlotCmd: + cmdtag = "READ_REPLICATION_SLOT"; + set_ps_display(cmdtag); + ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_BaseBackupCmd: cmdtag = "BASE_BACKUP"; set_ps_display(cmdtag); diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 9916a36012..8fb7487857 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -6,7 +6,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 49; +use Test::More tests => 53; # Initialize primary node my $node_primary = PostgresNode->new('primary'); @@ -254,6 +254,36 @@ ok( $ret == 0, "SHOW with superuser-settable parameter, replication role and logical replication" ); +note "testing READ_REPLICATION_SLOT command for replication connection"; + +my $slotname = 'test_read_replication_slot_physical'; + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + 'READ_REPLICATION_SLOT non_existent_slot;', + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success"); +like($stdout, qr/^||$/, + "READ_REPLICATION_SLOT returns NULL values if slot does not exist"); + +$node_primary->psql( + 'postgres', + "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;", + extra_params => [ '-d', $connstr_rep ]); + +($ret, $stdout, $stderr) = $node_primary->psql( + 'postgres', + "READ_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); +ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot"); +like($stdout, qr/^physical\|[^|]*\|1$/, + "READ_REPLICATION_SLOT returns tuple with slot information"); + +$node_primary->psql( + 'postgres', + "DROP_REPLICATION_SLOT $slotname;", + extra_params => [ '-d', $connstr_rep ]); + note "switching to physical replication slot"; # Switch to using a physical replication slot. We can do this without a new diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index cc116062c2..1b74f38f10 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -10,7 +10,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 14; +use Test::More tests => 15; use Config; # Initialize primary node @@ -39,6 +39,15 @@ ok( $stderr =~ m/replication slot "test_slot" was not created in this database/, "Logical decoding correctly fails to start"); +($result, $stdout, $stderr) = $node_primary->psql( + 'template1', + qq[READ_REPLICATION_SLOT test_slot;], + replication => 'database'); +like( + $stderr, + qr/cannot use "READ_REPLICATION_SLOT" with logical replication slots/, + 'READ_REPLICATION_SLOT not supported for logical slots'); + # Check case of walsender not using a database connection. Logical # decoding should not be allowed. ($result, $stdout, $stderr) = $node_primary->psql( diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b95cc88599..132436c6e6 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2067,6 +2067,54 @@ The commands accepted in replication mode are: </listitem> </varlistentry> + <varlistentry> + <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> + <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm> + </term> + <listitem> + <para> + Read some information associated to a replication slot. Returns a tuple + with <literal>NULL</literal> values if the replication slot does not + exist. This command is currently only supported for physical replication + slots. + </para> + <para> + In response to this command, the server will return a one-row result set, + containing the following fields: + <variablelist> + <varlistentry> + <term><literal>slot_type</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's type, either <literal>physical</literal> or + <literal>NULL</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_lsn</literal> (<type>text</type>)</term> + <listitem> + <para> + The replication slot's <literal>restart_lsn</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>restart_tli</literal> (<type>int8</type>)</term> + <listitem> + <para> + The timeline ID associated to <literal>restart_lsn</literal>, + following the current timeline history. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ] <indexterm><primary>START_REPLICATION</primary></indexterm> diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cb5b5ec74c..a8e4a2afd6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2128,6 +2128,7 @@ ReadBufferMode ReadBytePtrType ReadExtraTocPtrType ReadFunc +ReadReplicationSlotCmd ReassignOwnedStmt RecheckForeignScan_function RecordCacheEntry -- 2.33.0
signature.asc
Description: PGP signature