From 3f08b8cc6346aabba5226d060823fcdf71e6b1f8 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 16 Mar 2024 03:47:15 +0000
Subject: [PATCH v11 3/4] Track inactive replication slot information

Up until now, postgres doesn't track metrics like the time at
which the slot became inactive, and the total number of times the
slot became inactive in its lifetime. This commit adds two new
metrics last_inactive_at of type timestamptz and inactive_count of
type numeric to ReplicationSlotPersistentData. Whenever a slot
becomes inactive, the current timestamp and inactive count are
persisted to disk.

These metrics are useful in the following ways:
- To improve replication slot monitoring tools. For instance, one
can build a monitoring tool that signals a) when replication slots
is lying inactive for a day or so using last_inactive_at metric,
b) when a replication slot is becoming inactive too frequently
using inactive_count metric.

- To implement timeout-based inactive replication slot management
capability in postgres.

Increases SLOT_VERSION due to the added two new metrics.
---
 doc/src/sgml/system-views.sgml       | 20 +++++++++++++
 src/backend/catalog/system_views.sql |  4 ++-
 src/backend/replication/slot.c       | 43 ++++++++++++++++++++++------
 src/backend/replication/slotfuncs.c  | 15 +++++++++-
 src/include/catalog/pg_proc.dat      |  6 ++--
 src/include/replication/slot.h       |  6 ++++
 src/test/regress/expected/rules.out  |  6 ++--
 7 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 56252b12ee..365c0fd52d 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2758,6 +2758,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        ID of role
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_inactive_at</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently actively being
+        used.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_count</structfield> <type>numeric</type>
+      </para>
+      <para>
+        The total number of times the slot became inactive in its lifetime.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index cd22dad959..de9f1d5506 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1026,7 +1026,9 @@ CREATE VIEW pg_replication_slots AS
             L.conflicting,
             L.failover,
             L.synced,
-            L.invalidation_reason
+            L.invalidation_reason,
+            L.last_inactive_at,
+            L.inactive_count
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc37586dcc..6b6e5141f7 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -130,7 +130,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	5		/* version for new files */
+#define SLOT_VERSION	6		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -401,6 +401,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
 	slot->data.synced = synced;
+	slot->data.last_inactive_at = 0;
+	slot->data.inactive_count = 0;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
@@ -627,6 +629,17 @@ retry:
 
 	if (am_walsender)
 	{
+		if (s->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&s->mutex);
+			s->data.last_inactive_at = 0;
+			SpinLockRelease(&s->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				SlotIsLogical(s)
 				? errmsg("acquired logical replication slot \"%s\"",
@@ -694,16 +707,20 @@ ReplicationSlotRelease(void)
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 
-	MyReplicationSlot = NULL;
-
-	/* might not have been set when we've been a plain slot */
-	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
-	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
-	LWLockRelease(ProcArrayLock);
-
 	if (am_walsender)
 	{
+		if (slot->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&slot->mutex);
+			slot->data.last_inactive_at = GetCurrentTimestamp();
+			slot->data.inactive_count++;
+			SpinLockRelease(&slot->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				is_logical
 				? errmsg("released logical replication slot \"%s\"",
@@ -713,6 +730,14 @@ ReplicationSlotRelease(void)
 
 		pfree(slotname);
 	}
+
+	MyReplicationSlot = NULL;
+
+	/* might not have been set when we've been a plain slot */
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
+	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+	LWLockRelease(ProcArrayLock);
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index b5a638edea..4c7a120df1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 18
+#define PG_GET_REPLICATION_SLOTS_COLS 20
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -264,6 +264,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		WALAvailability walstate;
 		int			i;
 		ReplicationSlotInvalidationCause cause;
+		char		buf[256];
 
 		if (!slot->in_use)
 			continue;
@@ -436,6 +437,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
 
+		if (slot_contents.data.last_inactive_at > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.data.last_inactive_at);
+		else
+			nulls[i++] = true;
+
+		/* Convert to numeric. */
+		snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count);
+		values[i++] = DirectFunctionCall3(numeric_in,
+										  CStringGetDatum(buf),
+										  ObjectIdGetDatum(0),
+										  Int32GetDatum(-1));
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 63fd0b4cd7..c7ab0893eb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11123,9 +11123,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text,timestamptz,numeric}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason,last_inactive_at,inactive_count}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 614ba0e30b..780767a819 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -129,6 +129,12 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/* When did this slot become inactive last time? */
+	TimestampTz last_inactive_at;
+
+	/* How many times the slot has been inactive? */
+	uint64		inactive_count;
 } ReplicationSlotPersistentData;
 
 /*
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 055bec068d..c0bdfe76d8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1476,8 +1476,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.conflicting,
     l.failover,
     l.synced,
-    l.invalidation_reason
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason)
+    l.invalidation_reason,
+    l.last_inactive_at,
+    l.inactive_count
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason, last_inactive_at, inactive_count)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

