On Thu, Oct 21, 2021 at 10:36:42AM +0200, Ronan Dunklau wrote:
> Done. I haven't touched the timeline switch test patch for now, but I still 
> include it here for completeness.

Thanks.  I have applied and back-patched 0001, then looked again at
0002 that adds READ_REPLICATION_SLOT:
- Change the TLI to use int8 rather than int4, so as we will always be
right with TimelineID which is unsigned (this was discussed upthread
but I got back on it after more thoughts, to avoid any future
issues).
- Added an extra initialization for the set of Datum values, just as
an extra safety net.
- There was a bug with the timeline returned when executing the
command while in recovery as ThisTimeLineID is 0 in the context of a
standby, but we need to support the case of physical slots even when
streaming archives from a standby.  The fix is similar to what we do
for IDENTIFY_SYSTEM, where we need to use the timeline currently
replayed from GetXLogReplayRecPtr(), before looking at the past
timeline history using restart_lsn and the replayed TLI.

With that in place, I think that we are good now for this part.
--
Michael
From 7e20a294d18c280b8031ee6cc862ba6a661cf40f Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Sat, 23 Oct 2021 16:28:10 +0900
Subject: [PATCH v10] Add replication command READ_REPLICATION_SLOT

The command is supported for physical slots for now, and returns the
type of slot, its restart_lsn and its restart_tli.

This will be useful for an upcoming patch related to pg_receivewal, to
allow the tool to be able to stream from the position of a slot, rather
than the last WAL position flushed by the backend (as reported by
IDENTIFY_SYSTEM), if the archive directory is found as empty, which
would be an advantage in the case of switching to a different archive
location with the same slot used to avoid holes in what gets backed up.

Author: Ronan Dunklau
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy
Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan
---
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/replnodes.h               |  11 ++
 src/backend/replication/repl_gram.y         |  16 ++-
 src/backend/replication/repl_scanner.l      |   1 +
 src/backend/replication/walsender.c         | 105 ++++++++++++++++++++
 src/test/recovery/t/001_stream_rep.pl       |  32 +++++-
 src/test/recovery/t/006_logical_decoding.pl |  11 +-
 doc/src/sgml/protocol.sgml                  |  48 +++++++++
 src/tools/pgindent/typedefs.list            |   1 +
 9 files changed, 223 insertions(+), 3 deletions(-)

diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index e0057daa06..541e9861ba 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -496,6 +496,7 @@ typedef enum NodeTag
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
+	T_ReadReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
 	T_SQLCmd,
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..a746fafc12 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -87,6 +87,17 @@ typedef struct StartReplicationCmd
 } StartReplicationCmd;
 
 
+/* ----------------------
+ *		READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+} ReadReplicationSlotCmd;
+
+
 /* ----------------------
  *		TIMELINE_HISTORY command
  * ----------------------
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 126380e2df..dcb1108579 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
+				read_replication_slot timeline_history show sql_cmd
 %type <list>	base_backup_legacy_opt_list generic_option_list
 %type <defelt>	base_backup_legacy_opt generic_option
 %type <uintval>	opt_timeline
@@ -125,6 +126,7 @@ command:
 			| start_logical_replication
 			| create_replication_slot
 			| drop_replication_slot
+			| read_replication_slot
 			| timeline_history
 			| show
 			| sql_cmd
@@ -140,6 +142,18 @@ identify_system:
 				}
 			;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+			K_READ_REPLICATION_SLOT var_name
+				{
+					ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+					n->slotname = $2;
+					$$ = (Node *) n;
+				}
+			;
+
 /*
  * SHOW setting
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index c038a636c3..1b599c255e 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -85,6 +85,7 @@ identifier		{ident_start}{ident_cont}*
 BASE_BACKUP			{ return K_BASE_BACKUP; }
 FAST			{ return K_FAST; }
 IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT	{ return K_READ_REPLICATION_SLOT; }
 SHOW		{ return K_SHOW; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b811a5c0ef..ef3e64846d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,103 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+	ReplicationSlot *slot;
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc	tupdesc;
+	Datum		values[READ_REPLICATION_SLOT_COLS];
+	bool		nulls[READ_REPLICATION_SLOT_COLS];
+
+	tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+							  TEXTOID, -1, 0);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+							  TEXTOID, -1, 0);
+	/* TimeLineID is unsigned, so int4 is not wide enough. */
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+							  INT8OID, -1, 0);
+
+	MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
+	MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	slot = SearchNamedReplicationSlot(cmd->slotname, false);
+	if (slot == NULL || !slot->in_use)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+	else
+	{
+		ReplicationSlot slot_contents;
+		int			i = 0;
+
+		/* Copy slot contents while holding spinlock */
+		SpinLockAcquire(&slot->mutex);
+		slot_contents = *slot;
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		if (OidIsValid(slot_contents.data.database))
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot use \"%s\" with logical replication slots",
+						   "READ_REPLICATION_SLOT"));
+
+		/* slot type */
+		values[i] = CStringGetTextDatum("physical");
+		nulls[i] = false;
+		i++;
+
+		/* start LSN */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			char		xloc[64];
+
+			snprintf(xloc, sizeof(xloc), "%X/%X",
+					 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+			values[i] = CStringGetTextDatum(xloc);
+			nulls[i] = false;
+		}
+		i++;
+
+		/* timeline this WAL was produced on */
+		if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+		{
+			TimeLineID	slots_position_timeline;
+			TimeLineID	current_timeline;
+			List	   *timeline_history = NIL;
+
+			/*
+			 * While in recovery, use as timeline the currently-replaying
+			 * one to get the LSN position's history.
+			 */
+			if (RecoveryInProgress())
+				(void) GetXLogReplayRecPtr(&current_timeline);
+			else
+				current_timeline = ThisTimeLineID;
+
+			timeline_history = readTimeLineHistory(current_timeline);
+			slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+														  timeline_history);
+			values[i] = Int64GetDatum((int64) slots_position_timeline);
+			nulls[i] = false;
+		}
+		i++;
+
+		Assert(i == READ_REPLICATION_SLOT_COLS);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+	do_tup_output(tstate, values, nulls);
+	end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1622,6 +1720,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ReadReplicationSlotCmd:
+			cmdtag = "READ_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_BaseBackupCmd:
 			cmdtag = "BASE_BACKUP";
 			set_ps_display(cmdtag);
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 9916a36012..8fb7487857 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 49;
+use Test::More tests => 53;
 
 # Initialize primary node
 my $node_primary = PostgresNode->new('primary');
@@ -254,6 +254,36 @@ ok( $ret == 0,
 	"SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command for replication connection";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	'READ_REPLICATION_SLOT non_existent_slot;',
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success");
+like($stdout, qr/^||$/,
+	"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+	extra_params => [ '-d', $connstr_rep ]);
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+	'postgres',
+	"READ_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot");
+like($stdout, qr/^physical\|[^|]*\|1$/,
+	"READ_REPLICATION_SLOT returns tuple with slot information");
+
+$node_primary->psql(
+	'postgres',
+	"DROP_REPLICATION_SLOT $slotname;",
+	extra_params => [ '-d', $connstr_rep ]);
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index cc116062c2..1b74f38f10 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 14;
+use Test::More tests => 15;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,15 @@ ok( $stderr =~
 	  m/replication slot "test_slot" was not created in this database/,
 	"Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+	'template1',
+	qq[READ_REPLICATION_SLOT test_slot;],
+	replication => 'database');
+like(
+	$stderr,
+	qr/cannot use "READ_REPLICATION_SLOT" with logical replication slots/,
+	'READ_REPLICATION_SLOT not supported for logical slots');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index b95cc88599..132436c6e6 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2067,6 +2067,54 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read some information associated to a replication slot. Returns a tuple
+      with <literal>NULL</literal> values if the replication slot does not
+      exist. This command is currently only supported for physical replication
+      slots.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>NULL</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int8</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID associated to <literal>restart_lsn</literal>,
+          following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cb5b5ec74c..a8e4a2afd6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2128,6 +2128,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry
-- 
2.33.0

Attachment: signature.asc
Description: PGP signature

Reply via email to