From 53ffd09c7a3b339c7dd242a2d57cb94c02c90b43 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 6 Jan 2024 14:44:23 +0000
Subject: [PATCH v1] Add inactive_timeout based replication slot invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days, after which the inactive slots get dropped.

To achieve the above, postgres uses replication slot metric
inactive_at (the time at which the slot became inactive), and a
new GUC inactive_replication_slot_timeout. The checkpointer then
looks at all replication slots invalidating the inactive slots
based on the timeout set.
---
 doc/src/sgml/config.sgml                      | 18 ++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 24 ++++-
 src/backend/replication/slotfuncs.c           |  3 +
 src/backend/utils/misc/guc_tables.c           | 12 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/meson.build                 |  1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 87 +++++++++++++++++++
 9 files changed, 156 insertions(+), 3 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f323bba018..4293b3c182 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4404,6 +4404,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-inactive-replication-slot-timeout" xreflabel="inactive_replication_slot_timeout">
+      <term><varname>inactive_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>inactive_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are inactive for longer than this
+        amount of time at the next checkpoint. If this value is specified
+        without units, it is taken as seconds. A value of zero (which is
+        default) disables the timeout mechanism. This parameter can only be
+        set in the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 478377c4a2..f7ce2cbbb4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7051,6 +7051,11 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7495,6 +7500,11 @@ CreateRestartPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f4a884d96e..d921ac051f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -98,9 +98,9 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
-int			max_replication_slots = 10; /* the maximum number of replication
-										 * slots */
+/* GUC variables */
+int			max_replication_slots = 10;
+int			inactive_replication_slot_timeout = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -1346,6 +1346,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1444,6 +1447,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						conflict = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (s->data.inactive_at > 0)
+					{
+						TimestampTz now;
+
+						Assert(s->data.persistency == RS_PERSISTENT);
+						Assert(s->active_pid == 0);
+
+						now = GetCurrentTimestamp();
+						if (TimestampDifferenceExceeds(s->data.inactive_at, now,
+													   inactive_replication_slot_timeout * 1000))
+							conflict = cause;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1589,6 +1606,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 89262da486..e094225764 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -420,6 +420,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			case RS_INVAL_WAL_LEVEL:
 				values[i++] = CStringGetTextDatum("wal_level_insufficient");
 				break;
+			case RS_INVAL_INACTIVE_TIMEOUT:
+				values[i++] = CStringGetTextDatum("inactive_timeout");
+				break;
 		}
 
 		if (slot_contents.data.inactive_at > 0)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index e53ebc6dc2..c7fa14ed6b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2892,6 +2892,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&inactive_replication_slot_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b2809c711a..7984873533 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -325,6 +325,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#inactive_replication_slot_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index dfd2f82a67..ace946de62 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -50,6 +50,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -216,6 +218,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int inactive_replication_slot_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 88fb0306f5..22e5e2e45c 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -45,6 +45,7 @@ tests += {
       't/037_invalid_database.pl',
       't/038_save_logical_slots_shutdown.pl',
       't/039_end_of_wal.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..bf1cd4bbcc
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,87 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$primary->append_conf('postgresql.conf', q{
+checkpoint_timeout = 1h
+});
+$primary->start;
+$primary->safe_psql('postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb1_slot');
+]);
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name,
+	has_streaming => 1);
+$standby1->append_conf('postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+});
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# The inactive replication slot info should be null when the slot is active
+my $result = $primary->safe_psql('postgres', qq[
+	SELECT inactive_at IS NULL, inactive_count = 0 AS OK
+		FROM pg_replication_slots WHERE slot_name = 'sb1_slot';
+]);
+is($result, "t|t", 'check the inactive replication slot info for an active slot');
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql('postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s';
+]);
+$primary->reload;
+
+my $logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby1->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE inactive_at IS NOT NULL AND
+		inactive_count = 1 AND slot_name = 'sb1_slot';
+]) or die "Timed out while waiting for inactive replication slot info to be updated";
+
+my $invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb1_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb1_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'inactive_timeout';
+]) or die "Timed out while waiting for inactive replication slot sb1_slot to be invalidated";
+
+done_testing();
-- 
2.34.1

