From a9907426dfda7ceb8e28a45f8711cca676d91949 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 25 Mar 2024 09:47:05 +0000
Subject: [PATCH v20] Track last_inactive_time in pg_replication_slots.

Till now, the time at which the replication slot became inactive
is not tracked directly in pg_replication_slots. This commit adds
a new property called last_inactive_time for this. It is set to 0
whenever a slot is made active/acquired and set to current
timestamp whenever the slot is inactive/released or restored from
the disk.

The new property will be useful on production servers to debug and
analyze inactive replication slots. It will also help to know the
lifetime of a replication slot - one can know how long a streaming
standby, logical subscriber, or replication slot consumer is down.

The new property will be useful to implement inactive timeout based
replication slot invalidation in a future commit.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Reviewed-by: Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/system-views.sgml            |  10 ++
 src/backend/catalog/system_views.sql      |   1 +
 src/backend/replication/slot.c            |  34 +++++
 src/backend/replication/slotfuncs.c       |   7 +-
 src/include/catalog/pg_proc.dat           |   6 +-
 src/include/replication/slot.h            |   3 +
 src/test/recovery/t/019_replslot_limit.pl | 152 ++++++++++++++++++++++
 src/test/regress/expected/rules.out       |   3 +-
 8 files changed, 211 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index b5da476c20..5f4165a945 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2523,6 +2523,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_inactive_time</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently being used.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>conflicting</structfield> <type>bool</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f69b7f5580..bc70ff193e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
+            L.last_inactive_time,
             L.conflicting,
             L.invalidation_reason,
             L.failover,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cdf0c450c5..0919e4e56b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
 	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
+	slot->last_inactive_time = 0;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -622,6 +623,11 @@ retry:
 	if (SlotIsLogical(s))
 		pgstat_acquire_replslot(s);
 
+	/* Reset the last inactive time as the slot is active now. */
+	SpinLockAcquire(&s->mutex);
+	s->last_inactive_time = 0;
+	SpinLockRelease(&s->mutex);
+
 	if (am_walsender)
 	{
 		ereport(log_replication_commands ? LOG : DEBUG1,
@@ -645,6 +651,7 @@ ReplicationSlotRelease(void)
 	ReplicationSlot *slot = MyReplicationSlot;
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
+	TimestampTz now = 0;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -679,6 +686,15 @@ ReplicationSlotRelease(void)
 		ReplicationSlotsComputeRequiredXmin(false);
 	}
 
+	/*
+	 * Set the last inactive time after marking the slot inactive. Exempted
+	 * are the slots that are currently being synced from the primary to the
+	 * standby, because such slots are typically inactive in the sense that
+	 * they don't have associated walsender process to replicate.
+	 */
+	if (!(RecoveryInProgress() && slot->data.synced))
+		now = GetCurrentTimestamp();
+
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
 		/*
@@ -687,9 +703,16 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		slot->last_inactive_time = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
+	else
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->last_inactive_time = now;
+		SpinLockRelease(&slot->mutex);
+	}
 
 	MyReplicationSlot = NULL;
 
@@ -2342,6 +2365,17 @@ RestoreSlotFromDisk(const char *name)
 		slot->in_use = true;
 		slot->active_pid = 0;
 
+		/*
+		 * We set the last inactive time after loading the slot from the disk
+		 * into memory. Whoever acquires the slot i.e. makes the slot active
+		 * will reset it. Exempted are the slots that are currently being
+		 * synced from the primary to the standby, because such slots are
+		 * typically inactive in the sense that they don't have associated
+		 * walsender process to replicate.
+		 */
+		if (!(RecoveryInProgress() && slot->data.synced))
+			slot->last_inactive_time = GetCurrentTimestamp();
+
 		restored = true;
 		break;
 	}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 4232c1e52e..24f5e6d90a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 18
+#define PG_GET_REPLICATION_SLOTS_COLS 19
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -410,6 +410,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		if (slot_contents.last_inactive_time > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time);
+		else
+			nulls[i++] = true;
+
 		cause = slot_contents.data.invalidated;
 
 		if (SlotIsPhysical(&slot_contents))
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 71c74350a0..0d26e5b422 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11133,9 +11133,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7f25a083ee..eefd7abd39 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -201,6 +201,9 @@ typedef struct ReplicationSlot
 	 * forcibly flushed or not.
 	 */
 	XLogRecPtr	last_saved_confirmed_flush;
+
+	/* The time at which this slot becomes inactive */
+	TimestampTz last_inactive_time;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index fe00370c3e..3409cf88cd 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -410,4 +410,156 @@ kill 'CONT', $receiverpid;
 $node_primary3->stop;
 $node_standby3->stop;
 
+# =============================================================================
+# Testcase start: Check last_inactive_time property of the streaming standby's slot
+#
+
+# Initialize primary node
+my $primary4 = PostgreSQL::Test::Cluster->new('primary4');
+$primary4->init(allows_streaming => 'logical');
+$primary4->start;
+
+# Take backup
+$backup_name = 'my_backup4';
+$primary4->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby4 = PostgreSQL::Test::Cluster->new('standby4');
+$standby4->init_from_backup($primary4, $backup_name, has_streaming => 1);
+
+my $sb4_slot = 'sb4_slot';
+$standby4->append_conf('postgresql.conf', "primary_slot_name = '$sb4_slot'");
+
+my $slot_creation_time = $primary4->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
+$primary4->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := '$sb4_slot');
+]);
+
+# Get last_inactive_time value after the slot's creation. Note that the slot
+# is still inactive till it's used by the standby below.
+my $last_inactive_time =
+	capture_and_validate_slot_last_inactive_time($primary4, $sb4_slot, $slot_creation_time);
+
+$standby4->start;
+
+# Wait until standby has replayed enough data
+$primary4->wait_for_catchup($standby4);
+
+# Now the slot is active so last_inactive_time value must be NULL
+is( $primary4->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';]
+	),
+	't',
+	'last inactive time for an active physical slot is NULL');
+
+# Stop the standby to check its last_inactive_time value is updated
+$standby4->stop;
+
+# Let's restart the primary so that the last_inactive_time is set upon
+# loading the slot from the disk.
+$primary4->restart;
+
+is( $primary4->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND last_inactive_time IS NOT NULL;]
+	),
+	't',
+	'last inactive time for an inactive physical slot is updated correctly');
+
+$standby4->stop;
+
+# Testcase end: Check last_inactive_time property of the streaming standby's slot
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Check last_inactive_time property of the logical subscriber's slot
+my $publisher4 = $primary4;
+
+# Create subscriber node
+my $subscriber4 = PostgreSQL::Test::Cluster->new('subscriber4');
+$subscriber4->init;
+
+# Setup logical replication
+my $publisher4_connstr = $publisher4->connstr . ' dbname=postgres';
+$publisher4->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+
+$slot_creation_time = $publisher4->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
+my $lsub4_slot = 'lsub4_slot';
+$publisher4->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot(slot_name := '$lsub4_slot', plugin := 'pgoutput');"
+);
+
+# Get last_inactive_time value after the slot's creation. Note that the slot
+# is still inactive till it's used by the subscriber below.
+$last_inactive_time =
+	capture_and_validate_slot_last_inactive_time($publisher4, $lsub4_slot, $slot_creation_time);
+
+$subscriber4->start;
+$subscriber4->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher4_connstr' PUBLICATION pub WITH (slot_name = '$lsub4_slot', create_slot = false)"
+);
+
+# Wait until subscriber has caught up
+$subscriber4->wait_for_subscription_sync($publisher4, 'sub');
+
+# Now the slot is active so last_inactive_time value must be NULL
+is( $publisher4->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';]
+	),
+	't',
+	'last inactive time for an active logical slot is NULL');
+
+# Stop the subscriber to check its last_inactive_time value is updated
+$subscriber4->stop;
+
+# Let's restart the publisher so that the last_inactive_time is set upon
+# loading the slot from the disk.
+$publisher4->restart;
+
+is( $publisher4->safe_psql(
+		'postgres',
+		qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND last_inactive_time IS NOT NULL;]
+	),
+	't',
+	'last inactive time for an inactive logical slot is updated correctly');
+
+# Testcase end: Check last_inactive_time property of the logical subscriber's slot
+# =============================================================================
+
+$publisher4->stop;
+$subscriber4->stop;
+
+# Capture and validate last_inactive_time of a given slot.
+sub capture_and_validate_slot_last_inactive_time
+{
+	my ($node, $slot_name, $slot_creation_time) = @_;
+
+	my $last_inactive_time = $node->safe_psql('postgres',
+		qq(SELECT last_inactive_time FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND last_inactive_time IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	is( $node->safe_psql(
+			'postgres',
+			qq[SELECT '$last_inactive_time'::timestamptz > to_timestamp(0) AND
+				'$last_inactive_time'::timestamptz >= '$slot_creation_time'::timestamptz;]
+		),
+		't',
+		"last inactive time for an active slot $slot_name is sane");
+
+	return $last_inactive_time;
+}
+
 done_testing();
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 18829ea586..dfcbaec387 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,11 +1473,12 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
+    l.last_inactive_time,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

