From 9229ef9e28694a55906e92f42e966280c1beffea Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 6 Jan 2024 14:19:05 +0000
Subject: [PATCH v1] Track inactive replication slot information

Currently postgres doesn't track metrics like the time at which
the slot became inactive, and the total number of times the slot
became inactive in its lifetime. This commit adds two new metrics
inactive_at of type timestamptz and inactive_count of type numeric
to ReplicationSlotPersistentData. Whenever a slot becomes
inactive, the current timestamp and inactive count are persisted
to disk.

These metrics are useful in the following ways:

- To improve replication slot monitoring tools. For instance, one
can build a monitoring tool that signals a) when replication slots
is lying inactive for a day or so using inactive_at metric,
b) when a replication slot is becoming inactive too frequently
using inactive_at metric.

- To implement timeout-based inactive replication slot management
capability in postgres.

Increases SLOT_VERSION due to the added two new metrics.
---
 doc/src/sgml/system-views.sgml       | 20 +++++++++++
 src/backend/catalog/system_views.sql |  4 ++-
 src/backend/replication/slot.c       | 50 +++++++++++++++++++++++-----
 src/backend/replication/slotfuncs.c  | 15 ++++++++-
 src/include/catalog/pg_proc.dat      |  6 ++--
 src/include/replication/slot.h       |  6 ++++
 src/test/regress/expected/rules.out  |  6 ++--
 7 files changed, 91 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 104bd2fb1f..b6914a3197 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2556,6 +2556,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        </itemizedlist>
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_at</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently actively being
+        used.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_count</structfield> <type>numeric</type>
+      </para>
+      <para>
+        The total number of times the slot became inactive in its lifetime.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7d40e9549b..611682a1b5 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,9 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.invalidation_reason
+            L.invalidation_reason,
+            L.inactive_at,
+            L.inactive_count
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 52da694c79..f4a884d96e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	3		/* version for new files */
+#define SLOT_VERSION	4		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -311,6 +311,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.persistency = persistency;
 	slot->data.two_phase = two_phase;
 	slot->data.two_phase_at = InvalidXLogRecPtr;
+	slot->data.inactive_at = 0;
+	slot->data.inactive_count = 0;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
@@ -540,6 +542,17 @@ retry:
 
 	if (am_walsender)
 	{
+		if (s->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&s->mutex);
+			s->data.inactive_at = 0;
+			SpinLockRelease(&s->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				SlotIsLogical(s)
 				? errmsg("acquired logical replication slot \"%s\"",
@@ -607,16 +620,27 @@ ReplicationSlotRelease(void)
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 
-	MyReplicationSlot = NULL;
-
-	/* might not have been set when we've been a plain slot */
-	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
-	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
-	LWLockRelease(ProcArrayLock);
-
 	if (am_walsender)
 	{
+		if (slot->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&slot->mutex);
+			slot->data.inactive_at = GetCurrentTimestamp();
+
+			/*
+			 * XXX: Can inactive_count of type uint64 ever overflow? It takes
+			 * about a half-billion years for inactive_count to overflow even
+			 * if slot becomes inactive for every 1 millisecond. So, using
+			 * pg_add_u64_overflow might be an overkill.
+			 */
+			slot->data.inactive_count++;
+			SpinLockRelease(&slot->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				is_logical
 				? errmsg("released logical replication slot \"%s\"",
@@ -626,6 +650,14 @@ ReplicationSlotRelease(void)
 
 		pfree(slotname);
 	}
+
+	MyReplicationSlot = NULL;
+
+	/* might not have been set when we've been a plain slot */
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
+	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+	LWLockRelease(ProcArrayLock);
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 77f7134872..89262da486 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,10 +232,11 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 15
+#define PG_GET_REPLICATION_SLOTS_COLS 17
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
+	char		buf[256];
 
 	/*
 	 * We don't require any special permission to see this function's data
@@ -421,6 +422,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				break;
 		}
 
+		if (slot_contents.data.inactive_at > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.data.inactive_at);
+		else
+			nulls[i++] = true;
+
+		/* Convert to numeric. */
+		snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count);
+		values[i++] = DirectFunctionCall3(numeric_in,
+										  CStringGetDatum(buf),
+										  ObjectIdGetDatum(0),
+										  Int32GetDatum(-1));
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 51e0f8f264..c6995876ed 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11115,9 +11115,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,timestamptz,numeric}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,invalidation_reason,inactive_at,inactive_count}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 9e39aaf303..dfd2f82a67 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData
 
 	/* plugin name */
 	NameData	plugin;
+
+	/* When did this slot become inactive last time? */
+	TimestampTz inactive_at;
+
+	/* How many times the slot has been inactive? */
+	uint64		inactive_count;
 } ReplicationSlotPersistentData;
 
 /*
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7cca0fbc87..16807eea46 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,8 +1473,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.invalidation_reason
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, invalidation_reason)
+    l.invalidation_reason,
+    l.inactive_at,
+    l.inactive_count
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, invalidation_reason, inactive_at, inactive_count)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

