From a2c66c711da7118eae48446c87e5595f85c8e45a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 6 Apr 2024 11:13:02 +0000
Subject: [PATCH v38] Add inactive_timeout based replication slot invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days at slot level, after which the inactive slots get
invalidated.

To achieve the above, postgres introduces a GUC allowing users
set inactive timeout. The replication slots that are inactive
for longer than specified amount of time get invalidated.

The invalidation check happens at various locations to help being
as latest as possible, these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.
- During checkpoint

Note that this new invalidation mechanism won't kick-in for the
slots that are currently being synced from the primary to the
standby.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  33 ++
 doc/src/sgml/system-views.sgml                |   7 +
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |  11 +-
 src/backend/replication/slot.c                | 174 ++++++++++-
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   6 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/044_invalidate_slots.pl   | 283 ++++++++++++++++++
 13 files changed, 523 insertions(+), 15 deletions(-)
 create mode 100644 src/test/recovery/t/044_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 624518e0b0..42f7b15aa7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4547,6 +4547,39 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-inactive-timeout" xreflabel="replication_slot_inactive_timeout">
+      <term><varname>replication_slot_inactive_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_inactive_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidates replication slots that are inactive for longer than
+        specified amount of time. If this value is specified without units,
+        it is taken as seconds. A value of zero (which is default) disables
+        the timeout mechanism. This parameter can only be set in
+        the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+
+       <para>
+        This invalidation check happens either when the slot is acquired
+        for use or during a checkpoint. The time since the slot has become
+        inactive is known from its
+        <structfield>inactive_since</structfield> value using which the
+        timeout is measured.
+       </para>
+
+       <para>
+        Note that the inactive timeout invalidation mechanism is not
+        applicable for slots on the standby that are being synced from a
+        primary server (whose <structfield>synced</structfield> field is
+        <literal>true</literal>).
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 7ed617170f..063638beda 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2580,6 +2580,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for the duration specified by
+          <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index d18e2c7342..e92f559539 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -362,7 +362,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -575,6 +575,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 						   " name slot \"%s\" already exists on the standby",
 						   remote_slot->name));
 
+		/*
+		 * Skip the sync if the local slot is already invalidated. We do this
+		 * beforehand to avoid slot acquire and release.
+		 */
+		if (slot->data.invalidated != RS_INVAL_NONE)
+			return false;
+
 		/*
 		 * The slot has been synchronized before.
 		 *
@@ -591,7 +598,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 3bddaae022..767d818706 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -140,6 +141,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			replication_slot_inactive_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -159,6 +161,14 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
+static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+										   ReplicationSlot *s,
+										   XLogRecPtr oldestLSN,
+										   Oid dboid,
+										   TransactionId snapshotConflictHorizon,
+										   bool need_lock,
+										   bool *invalidated);
+
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
@@ -535,12 +545,18 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * If check_for_timeout_invalidation is true, the slot is checked for
+ * invalidation based on replication_slot_inactive_timeout GUC, and an error is
+ * raised after making the slot ours.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_timeout_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
+	bool		released_lock = false;
 
 	Assert(name != NULL);
 
@@ -583,7 +599,60 @@ retry:
 	}
 	else
 		active_pid = MyProcPid;
-	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * When the slot is not active under other process, check if the given
+	 * slot is to be invalidated based on inactive timeout before we acquire
+	 * it.
+	 */
+	if (active_pid == MyProcPid &&
+		check_for_timeout_invalidation)
+	{
+		bool		invalidated = false;
+
+		released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_INACTIVE_TIMEOUT,
+													   s, 0, InvalidOid,
+													   InvalidTransactionId,
+													   false,
+													   &invalidated);
+
+		/*
+		 * If the slot has been invalidated, recalculate the resource limits.
+		 */
+		if (invalidated)
+		{
+			ReplicationSlotsComputeRequiredXmin(false);
+			ReplicationSlotsComputeRequiredLSN();
+		}
+
+		/*
+		 * If the slot has been invalidated now or previously, error out as
+		 * there's no point in acquiring the slot.
+		 *
+		 * XXX: We might need to check for all invalidations and error out
+		 * here.
+		 */
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		{
+			/*
+			 * Release the lock if it's not yet and make this slot ours to
+			 * keep the cleanup path on error is happy.
+			 */
+			if (!released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+
+			MyReplicationSlot = s;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("can no longer get changes from replication slot \"%s\"",
+							NameStr(s->data.name)),
+					 errdetail("This slot has been invalidated because it was inactive for more than replication_slot_inactive_timeout.")));
+		}
+	}
+
+	if (!released_lock)
+		LWLockRelease(ReplicationSlotControlLock);
 
 	/*
 	 * If we found the slot but it's already active in another process, we
@@ -781,7 +850,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -804,7 +873,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1506,6 +1575,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than replication_slot_inactive_timeout."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1540,7 +1612,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 							   ReplicationSlot *s,
 							   XLogRecPtr oldestLSN,
 							   Oid dboid, TransactionId snapshotConflictHorizon,
-							   bool *invalidated)
+							   bool need_lock, bool *invalidated)
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
@@ -1550,12 +1622,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
 
+	if (need_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
 	for (;;)
 	{
 		XLogRecPtr	restart_lsn;
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1566,6 +1642,18 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_INACTIVE_TIMEOUT &&
+			(replication_slot_inactive_timeout > 0 &&
+			 s->inactive_since > 0 &&
+			 !(RecoveryInProgress() && s->data.synced)))
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1619,6 +1707,41 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					{
+						/*
+						 * Quick exit if inactive timeout invalidation
+						 * mechanism is disabled or slot is currently being
+						 * used or the slot on standby is currently being
+						 * synced from the primary.
+						 *
+						 * Note that we don't invalidate synced slots because,
+						 * they are typically considered not active as they
+						 * don't perform logical decoding to produce the
+						 * changes.
+						 */
+						if (replication_slot_inactive_timeout == 0 ||
+							s->inactive_since == 0 ||
+							(RecoveryInProgress() && s->data.synced))
+							break;
+
+						/*
+						 * Check if the slot needs to be invalidated due to
+						 * replication_slot_inactive_timeout GUC.
+						 */
+						if (TimestampDifferenceExceeds(s->inactive_since, now,
+													   replication_slot_inactive_timeout * 1000))
+						{
+							invalidation_cause = cause;
+
+							/*
+							 * Invalidation due to inactive timeout implies no
+							 * one using the slot.
+							 */
+							Assert(s->active_pid == 0);
+						}
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1756,6 +1879,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		}
 	}
 
+	/*
+	 * Release the lock if we have acquired it at the beginning, and have not
+	 * released it yet.
+	 */
+	if (need_lock && !released_lock)
+	{
+		LWLockRelease(ReplicationSlotControlLock);
+		released_lock = true;
+	}
+
 	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
 
 	return released_lock;
@@ -1772,6 +1905,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1803,7 +1937,7 @@ restart:
 
 		if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
 										   snapshotConflictHorizon,
-										   &invalidated))
+										   false, &invalidated))
 		{
 			/* if the lock was released, start from scratch */
 			goto restart;
@@ -1835,6 +1969,7 @@ void
 CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
+	bool		invalidated = false;
 
 	elog(DEBUG1, "performing replication slot checkpoint");
 
@@ -1851,10 +1986,26 @@ CheckPointReplicationSlots(bool is_shutdown)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		char		path[MAXPGPATH];
+		bool		released_lock PG_USED_FOR_ASSERTS_ONLY = false;
 
 		if (!s->in_use)
 			continue;
 
+		/*
+		 * Here's an opportunity to invalidate inactive replication slots
+		 * based on timeout, so let's do it.
+		 */
+		released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_INACTIVE_TIMEOUT, s,
+													   0, InvalidOid,
+													   InvalidTransactionId,
+													   true, &invalidated);
+
+		/*
+		 * InvalidatePossiblyObsoleteSlot must have released the lock as we
+		 * have told it explicitly acquire the lock.
+		 */
+		Assert(released_lock);
+
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
 
@@ -1884,6 +2035,15 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	/*
+	 * If any slots have been invalidated, recalculate the resource limits.
+	 */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index dd6c1d5a7e..9ad3e55704 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -539,7 +539,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..96eeb8b7d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1459,7 +1459,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c12784cbec..4149ff1ffe 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2971,6 +2971,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&replication_slot_inactive_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index baecde2841..2e1ad2eaca 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -335,6 +335,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#replication_slot_inactive_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..3f3bdf7441 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -230,6 +232,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
+extern PGDLLIMPORT int replication_slot_inactive_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -245,7 +248,8 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_timeout_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 712924c2fa..0437ab5c46 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -52,6 +52,7 @@ tests += {
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
       't/043_wal_replay_wait.pl',
+      't/044_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/044_invalidate_slots.pl b/src/test/recovery/t/044_invalidate_slots.pl
new file mode 100644
index 0000000000..7796d87963
--- /dev/null
+++ b/src/test/recovery/t/044_invalidate_slots.pl
@@ -0,0 +1,283 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot as well as logical
+# failover slot on primary due to replication_slot_inactive_timeout. Also,
+# check the logical failover slot synced on to the standby doesn't invalidate
+# the slot on its own, but gets the invalidated state from the remote slot on
+# the primary.
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid checkpoint during the test, otherwise, the test can get unpredictable
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+# Create sync slot on the primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('lsub1_sync_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot');
+]);
+
+$standby1->start;
+
+my $standby1_logstart = -s $standby1->logfile;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Synchronize the primary server slots to the standby
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced'.
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub1_sync_slot' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot has synced as true on standby');
+
+my $logstart = -s $primary->logfile;
+my $inactive_timeout = 2;
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '${inactive_timeout}s';
+]);
+$primary->reload;
+
+# Wait for the logical failover slot to become inactive on the primary. Note
+# that nobody has acquired that slot yet, so due to
+# replication_slot_inactive_timeout setting above it must get invalidated.
+wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart,
+	$inactive_timeout);
+
+# Set timeout on the standby also to check the synced slots don't get
+# invalidated due to timeout on the standby.
+$standby1->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$standby1->reload;
+
+# Now, sync the logical failover slot from the remote slot on the primary.
+# Note that the remote slot has already been invalidated due to inactive
+# timeout. Now, the standby must also see it as invalidated.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Wait for the inactive replication slot to be invalidated.
+$standby1->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'lsub1_sync_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for replication slot lsub1_sync_slot invalidation to be synced on standby";
+
+# Synced slot mustn't get invalidated on the standby even after a checkpoint,
+# it must sync invalidation from the primary. So, we must not see the slot's
+# invalidation message in server log.
+$standby1->safe_psql('postgres', "CHECKPOINT");
+ok( !$standby1->log_contains(
+		"invalidating obsolete replication slot \"lsub1_sync_slot\"",
+		$standby1_logstart),
+	'check that syned slot has not been invalidated on the standby');
+
+# Stop standby to make the standby's replication slot on the primary inactive
+$standby1->stop;
+
+# Wait for the standby's replication slot to become inactive
+wait_for_slot_invalidation($primary, 'sb1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end: Invalidate streaming standby's slot as well as logical failover
+# slot on primary due to replication_slot_inactive_timeout. Also, check the
+# logical failover slot synced on to the standby doesn't invalidate the slot on
+# its own, but gets the invalidated state from the remote slot on the primary.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to
+# replication_slot_inactive_timeout.
+
+my $publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$publisher->reload;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$subscriber->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput');
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+my $result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO ' ${inactive_timeout}s';
+]);
+$publisher->reload;
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# timeout.
+wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end: Invalidate logical subscriber's slot due to
+# replication_slot_inactive_timeout.
+# =============================================================================
+
+sub wait_for_slot_invalidation
+{
+	my ($node, $slot_name, $offset, $inactive_timeout) = @_;
+	my $name = $node->name;
+
+	# Wait for the replication slot to become inactive
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for replication slot to become inactive";
+
+	# Wait for the replication slot info to be updated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE inactive_since IS NOT NULL
+				AND slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for info of replication slot $slot_name to be updated on node $name";
+
+	# Sleep at least $inactive_timeout duration to avoid multiple checkpoints
+	# for the slot to get invalidated.
+	sleep($inactive_timeout);
+
+	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset);
+
+	# Wait for the inactive replication slot to be invalidated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND
+			invalidation_reason = 'inactive_timeout';
+	])
+	  or die
+	  "Timed out while waiting for inactive replication slot $slot_name to be invalidated on node $name";
+
+	# Check that the invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+
+	($result, $stdout, $stderr) = $node->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot_name', '0/1');
+	]);
+
+	ok( $stderr =~
+		  /can no longer get changes from replication slot "$slot_name"/,
+		"detected error upon trying to acquire invalidated slot $slot_name on node $name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot_name";
+}
+
+# Check for invalidation of slot in server log
+sub check_for_slot_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"",
+				$offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated,
+		"check that slot $slot_name invalidation has been logged");
+}
+
+done_testing();
-- 
2.34.1

