On Tue, Mar 26, 2024 at 11:26 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> Review comments on v18_0002 and v18_0005
> =======================================
>
> 1.
> We have decided to update inactive_since for temporary slots. So,
> unless there is some reason, we should allow inactive_timeout to also
> be set for temporary slots.

WFM. A temporary slot that's inactive for a long time before even the
server isn't shutdown can utilize this inactive_timeout based
invalidation mechanism. And, I'd also vote for we being consistent for
temporary and synced slots.

>              L.last_inactive_time,
> +            L.inactive_timeout,
>
> Shall we keep inactive_timeout before
> last_inactive_time/inactive_since? I don't have any strong reason to
> propose that way apart from that the former is provided by the user.

Done.

> + if (InvalidateReplicationSlotForInactiveTimeout(slot, false, true, true))
> + invalidated = true;
>
> I don't think we should try to invalidate the slots in
> pg_get_replication_slots. This function's purpose is to get the
> current information on slots and has no intention to perform any work
> for slots. Any error due to invalidation won't be what the user would
> be expecting here.

Agree. Removed.

> 4.
> +static bool
> +InvalidateSlotForInactiveTimeout(ReplicationSlot *slot,
> + bool need_control_lock,
> + bool need_mutex)
> {
> ...
> ...
> + if (need_control_lock)
> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> +
> + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
> +
> + /*
> + * Check if the slot needs to be invalidated due to inactive_timeout. We
> + * do this with the spinlock held to avoid race conditions -- for example
> + * the restart_lsn could move forward, or the slot could be dropped.
> + */
> + if (need_mutex)
> + SpinLockAcquire(&slot->mutex);
> ...
>
> I find this combination of parameters a bit strange. Because, say if
> need_mutex is false and need_control_lock is true then that means this
> function will acquire LWlock after acquiring spinlock which is
> unacceptable. Now, this may not happen in practice as the callers
> won't pass such a combination but still, this functionality should be
> improved.

Right. Either we need two locks or not. So, changed it to use just one
bool need_locks, upon set both control lock and spin lock are acquired
and released.

On Mon, Mar 25, 2024 at 10:33 AM shveta malik <shveta.ma...@gmail.com> wrote:
>
> patch 002:
>
> 2)
> slotsync.c:
>
>   ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
>     remote_slot->two_phase,
>     remote_slot->failover,
> -   true);
> +   true, 0);
>
> + slot->data.inactive_timeout = remote_slot->inactive_timeout;
>
> Is there a reason we are not passing 'remote_slot->inactive_timeout'
> to ReplicationSlotCreate() directly?

The slot there gets created temporarily for which we were not
supporting inactive_timeout being set. But, in the latest v22 patch we
are supporting, so passing the remote_slot->inactive_timeout directly.

> 3)
> slotfuncs.c
> pg_create_logical_replication_slot():
> + int inactive_timeout = PG_GETARG_INT32(5);
>
> Can we mention here that timeout is in seconds either in comment or
> rename variable to inactive_timeout_secs?
>
> Please do this for create_physical_replication_slot(),
> create_logical_replication_slot(),
> pg_create_physical_replication_slot() as well.

Added /* in seconds */ next the variable declaration.

> ---------
> 4)
> + int inactive_timeout; /* The amount of time in seconds the slot
> + * is allowed to be inactive. */
>  } LogicalSlotInfo;
>
>  Do we need to mention "before getting invalided" like other places
> (in last patch)?

Done.

>  5)
> Same at these two places. "before getting invalided" to be added in
> the last patch otherwise the info is incompleted.
>
> +
> + /* The amount of time in seconds the slot is allowed to be inactive */
> + int inactive_timeout;
>  } ReplicationSlotPersistentData;
>
>
> + * inactive_timeout: The amount of time in seconds the slot is allowed to be
> + *     inactive.
>   */
>  void
>  ReplicationSlotCreate(const char *name, bool db_specific,
>  Same here. "before getting invalidated" ?

Done.

On Tue, Mar 26, 2024 at 12:04 PM shveta malik <shveta.ma...@gmail.com> wrote:
>
> > Please find the attached v21 patch implementing the above idea. It
> > also has changes for renaming last_inactive_time to inactive_since.
>
> Thanks for the patch. I have tested this patch alone, and it does what
> it says. One additional thing which I noticed is that now it sets
> inactive_since for temp slots as well, but that idea looks fine to me.

Right. Let's be consistent by treating all slots the same.

> I could not test 'invalidation on promotion bug' with this change, as
> that needed rebasing of the rest of the patches.

Please use the v22 patch set.

> Few trivial things:
>
> 1)
> Commti msg:
>
> ensures the value is set to current timestamp during the
> shutdown to help correctly interpret the time if the standby gets
> promoted without a restart.
>
> shutdown --> shutdown of slot sync worker   (as it was not clear if it
> is instance shutdown or something else)

Changed it to "shutdown of slot sync machinery" to be consistent with
the comments.

> 2)
> 'The time since the slot has became inactive'.
>
> has became-->has become
> or just became
>
> Please check it in all the files. There are multiple places.

Fixed.

Please see the attached v23 patches. I've addressed all the review
comments received so far from Amit and Shveta.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From a19a324c057994025f0486f5016dd67ca39b731b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 26 Mar 2024 07:56:25 +0000
Subject: [PATCH v22 1/3] Fix review comments for slot's last_inactive_time
 property

This commit addresses review comments received for the slot's
last_inactive_time property added by commit a11f330b55. It does
the following:

1. Name last_inactive_time seems confusing. With that, one
expects it to tell the last time that the slot was inactive. But,
it tells the last time that a currently-inactive slot previously
*WAS* active.

This commit uses a less confusing name inactive_since for the
property. Other names considered were released_time,
deactivated_at but inactive_since won the race since the word
inactive is predominant as far as the replication slots are
concerned.

2. The slot's last_inactive_time isn't currently maintained for
synced slots on the standby. The commit a11f330b55 prevents
updating last_inactive_time with RecoveryInProgress() check in
RestoreSlotFromDisk(). But, the issue is that RecoveryInProgress()
always returns true in RestoreSlotFromDisk() as
'xlogctl->SharedRecoveryState' is always 'RECOVERY_STATE_CRASH' at
that time. The impact of this on a promoted standby
last_inactive_at is always NULL for all synced slots even after
server restart.

Above issue led us to a question as to why we can't maintain
last_inactive_time for synced slots on the standby. There's a
use-case for having it that as it can tell the last synced time on
the standby apart from fixing the above issue. So, this commit does
two things a) maintains last_inactive_time for such slots,
b) ensures the value is set to current timestamp during the
shutdown of slot sync machinery to help correctly interpret the
time if the standby gets promoted without a restart.

Reported-by: Robert Haas, Shveta Malik
Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/ZgGrCBQoktdLi1Ir%40ip-10-97-1-34.eu-west-3.compute.internal
Discussion: https://www.postgresql.org/message-id/ZgGrCBQoktdLi1Ir%40ip-10-97-1-34.eu-west-3.compute.internal
Discussion: https://www.postgresql.org/message-id/CAJpy0uB-yE%2BRiw7JQ4hW0%2BigJxvPc%2Brq%2B9c7WyTa1Jz7%2B2gAiA%40mail.gmail.com
---
 doc/src/sgml/system-views.sgml                |  4 +-
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/replication/logical/slotsync.c    | 43 +++++++++++++
 src/backend/replication/slot.c                | 38 +++++-------
 src/backend/replication/slotfuncs.c           |  4 +-
 src/include/catalog/pg_proc.dat               |  2 +-
 src/include/replication/slot.h                |  4 +-
 src/test/recovery/t/019_replslot_limit.pl     | 62 +++++++++----------
 .../t/040_standby_failover_slots_sync.pl      | 34 ++++++++++
 src/test/regress/expected/rules.out           |  4 +-
 10 files changed, 135 insertions(+), 62 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 5f4165a945..3c8dca8ca3 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2525,10 +2525,10 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>last_inactive_time</structfield> <type>timestamptz</type>
+       <structfield>inactive_since</structfield> <type>timestamptz</type>
       </para>
       <para>
-        The time at which the slot became inactive.
+        The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
       </para></entry>
      </row>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bc70ff193e..401fb35947 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,7 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.last_inactive_time,
+            L.inactive_since,
             L.conflicting,
             L.invalidation_reason,
             L.failover,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..bbf9a2c485 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -140,6 +140,7 @@ typedef struct RemoteSlot
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void reset_synced_slots_info(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -1296,6 +1297,45 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	Assert(false);
 }
 
+/*
+ * Reset the synced slots info such as inactive_since after shutting
+ * down the slot sync machinery.
+ */
+static void
+reset_synced_slots_info(void)
+{
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Check if it is a synchronized slot */
+		if (s->in_use && s->data.synced)
+		{
+			TimestampTz now;
+
+			Assert(SlotIsLogical(s));
+			Assert(s->active_pid == 0);
+
+			/*
+			 * Set the time since the slot has become inactive after shutting
+			 * down slot sync machinery. This helps correctly interpret the
+			 * time if the standby gets promoted without a restart. We get the
+			 * current time beforehand to avoid a system call while holding
+			 * the lock.
+			 */
+			now = GetCurrentTimestamp();
+
+			SpinLockAcquire(&s->mutex);
+			s->inactive_since = now;
+			SpinLockRelease(&s->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1309,6 +1349,7 @@ ShutDownSlotSync(void)
 	if (SlotSyncCtx->pid == InvalidPid)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
+		reset_synced_slots_info();
 		return;
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1341,6 +1382,8 @@ ShutDownSlotSync(void)
 	}
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	reset_synced_slots_info();
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 45f7a28f7d..d0a2f440ef 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -409,7 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
 	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
-	slot->last_inactive_time = 0;
+	slot->inactive_since = 0;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -623,9 +623,12 @@ retry:
 	if (SlotIsLogical(s))
 		pgstat_acquire_replslot(s);
 
-	/* Reset the last inactive time as the slot is active now. */
+	/*
+	 * Reset the time since the slot has become inactive as the slot is active
+	 * now.
+	 */
 	SpinLockAcquire(&s->mutex);
-	s->last_inactive_time = 0;
+	s->inactive_since = 0;
 	SpinLockRelease(&s->mutex);
 
 	if (am_walsender)
@@ -651,7 +654,7 @@ ReplicationSlotRelease(void)
 	ReplicationSlot *slot = MyReplicationSlot;
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
-	TimestampTz now = 0;
+	TimestampTz now;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -687,13 +690,11 @@ ReplicationSlotRelease(void)
 	}
 
 	/*
-	 * Set the last inactive time after marking the slot inactive. We don't
-	 * set it for the slots currently being synced from the primary to the
-	 * standby because such slots are typically inactive as decoding is not
-	 * allowed on those.
+	 * Set the time since the slot has become inactive after marking it
+	 * inactive. We get the current time beforehand to avoid a system call
+	 * while holding the lock.
 	 */
-	if (!(RecoveryInProgress() && slot->data.synced))
-		now = GetCurrentTimestamp();
+	now = GetCurrentTimestamp();
 
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
@@ -703,14 +704,14 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->last_inactive_time = now;
+		slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 	else
 	{
 		SpinLockAcquire(&slot->mutex);
-		slot->last_inactive_time = now;
+		slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 	}
 
@@ -2366,16 +2367,11 @@ RestoreSlotFromDisk(const char *name)
 		slot->active_pid = 0;
 
 		/*
-		 * We set the last inactive time after loading the slot from the disk
-		 * into memory. Whoever acquires the slot i.e. makes the slot active
-		 * will reset it. We don't set it for the slots currently being synced
-		 * from the primary to the standby because such slots are typically
-		 * inactive as decoding is not allowed on those.
+		 * We set the time since the slot has become inactive after loading
+		 * the slot from the disk into memory. Whoever acquires the slot i.e.
+		 * makes the slot active will reset it.
 		 */
-		if (!(RecoveryInProgress() && slot->data.synced))
-			slot->last_inactive_time = GetCurrentTimestamp();
-		else
-			slot->last_inactive_time = 0;
+		slot->inactive_since = GetCurrentTimestamp();
 
 		restored = true;
 		break;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 24f5e6d90a..da57177c25 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -410,8 +410,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
-		if (slot_contents.last_inactive_time > 0)
-			values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_time);
+		if (slot_contents.inactive_since > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
 		else
 			nulls[i++] = true;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 0d26e5b422..2f7cfc02c6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11135,7 +11135,7 @@
   proargtypes => '',
   proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index eefd7abd39..7b937d1a0c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -202,8 +202,8 @@ typedef struct ReplicationSlot
 	 */
 	XLogRecPtr	last_saved_confirmed_flush;
 
-	/* The time at which this slot becomes inactive */
-	TimestampTz last_inactive_time;
+	/* The time since the slot has become inactive */
+	TimestampTz inactive_since;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 3409cf88cd..3b9a306a8b 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -411,7 +411,7 @@ $node_primary3->stop;
 $node_standby3->stop;
 
 # =============================================================================
-# Testcase start: Check last_inactive_time property of the streaming standby's slot
+# Testcase start: Check inactive_since property of the streaming standby's slot
 #
 
 # Initialize primary node
@@ -440,45 +440,45 @@ $primary4->safe_psql(
     SELECT pg_create_physical_replication_slot(slot_name := '$sb4_slot');
 ]);
 
-# Get last_inactive_time value after the slot's creation. Note that the slot
-# is still inactive till it's used by the standby below.
-my $last_inactive_time =
-	capture_and_validate_slot_last_inactive_time($primary4, $sb4_slot, $slot_creation_time);
+# Get inactive_since value after the slot's creation. Note that the slot is
+# still inactive till it's used by the standby below.
+my $inactive_since =
+	capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
 
 $standby4->start;
 
 # Wait until standby has replayed enough data
 $primary4->wait_for_catchup($standby4);
 
-# Now the slot is active so last_inactive_time value must be NULL
+# Now the slot is active so inactive_since value must be NULL
 is( $primary4->safe_psql(
 		'postgres',
-		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';]
+		qq[SELECT inactive_since IS NULL FROM pg_replication_slots WHERE slot_name = '$sb4_slot';]
 	),
 	't',
 	'last inactive time for an active physical slot is NULL');
 
-# Stop the standby to check its last_inactive_time value is updated
+# Stop the standby to check its inactive_since value is updated
 $standby4->stop;
 
-# Let's restart the primary so that the last_inactive_time is set upon
-# loading the slot from the disk.
+# Let's restart the primary so that the inactive_since is set upon loading the
+# slot from the disk.
 $primary4->restart;
 
 is( $primary4->safe_psql(
 		'postgres',
-		qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND last_inactive_time IS NOT NULL;]
+		qq[SELECT inactive_since > '$inactive_since'::timestamptz FROM pg_replication_slots WHERE slot_name = '$sb4_slot' AND inactive_since IS NOT NULL;]
 	),
 	't',
 	'last inactive time for an inactive physical slot is updated correctly');
 
 $standby4->stop;
 
-# Testcase end: Check last_inactive_time property of the streaming standby's slot
+# Testcase end: Check inactive_since property of the streaming standby's slot
 # =============================================================================
 
 # =============================================================================
-# Testcase start: Check last_inactive_time property of the logical subscriber's slot
+# Testcase start: Check inactive_since property of the logical subscriber's slot
 my $publisher4 = $primary4;
 
 # Create subscriber node
@@ -499,10 +499,10 @@ $publisher4->safe_psql('postgres',
 	"SELECT pg_create_logical_replication_slot(slot_name := '$lsub4_slot', plugin := 'pgoutput');"
 );
 
-# Get last_inactive_time value after the slot's creation. Note that the slot
-# is still inactive till it's used by the subscriber below.
-$last_inactive_time =
-	capture_and_validate_slot_last_inactive_time($publisher4, $lsub4_slot, $slot_creation_time);
+# Get inactive_since value after the slot's creation. Note that the slot is
+# still inactive till it's used by the subscriber below.
+$inactive_since =
+	capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
 
 $subscriber4->start;
 $subscriber4->safe_psql('postgres',
@@ -512,54 +512,54 @@ $subscriber4->safe_psql('postgres',
 # Wait until subscriber has caught up
 $subscriber4->wait_for_subscription_sync($publisher4, 'sub');
 
-# Now the slot is active so last_inactive_time value must be NULL
+# Now the slot is active so inactive_since value must be NULL
 is( $publisher4->safe_psql(
 		'postgres',
-		qq[SELECT last_inactive_time IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';]
+		qq[SELECT inactive_since IS NULL FROM pg_replication_slots WHERE slot_name = '$lsub4_slot';]
 	),
 	't',
 	'last inactive time for an active logical slot is NULL');
 
-# Stop the subscriber to check its last_inactive_time value is updated
+# Stop the subscriber to check its inactive_since value is updated
 $subscriber4->stop;
 
-# Let's restart the publisher so that the last_inactive_time is set upon
+# Let's restart the publisher so that the inactive_since is set upon
 # loading the slot from the disk.
 $publisher4->restart;
 
 is( $publisher4->safe_psql(
 		'postgres',
-		qq[SELECT last_inactive_time > '$last_inactive_time'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND last_inactive_time IS NOT NULL;]
+		qq[SELECT inactive_since > '$inactive_since'::timestamptz FROM pg_replication_slots WHERE slot_name = '$lsub4_slot' AND inactive_since IS NOT NULL;]
 	),
 	't',
 	'last inactive time for an inactive logical slot is updated correctly');
 
-# Testcase end: Check last_inactive_time property of the logical subscriber's slot
+# Testcase end: Check inactive_since property of the logical subscriber's slot
 # =============================================================================
 
 $publisher4->stop;
 $subscriber4->stop;
 
-# Capture and validate last_inactive_time of a given slot.
-sub capture_and_validate_slot_last_inactive_time
+# Capture and validate inactive_since of a given slot.
+sub capture_and_validate_slot_inactive_since
 {
 	my ($node, $slot_name, $slot_creation_time) = @_;
 
-	my $last_inactive_time = $node->safe_psql('postgres',
-		qq(SELECT last_inactive_time FROM pg_replication_slots
-			WHERE slot_name = '$slot_name' AND last_inactive_time IS NOT NULL;)
+	my $inactive_since = $node->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
 		);
 
 	# Check that the captured time is sane
 	is( $node->safe_psql(
 			'postgres',
-			qq[SELECT '$last_inactive_time'::timestamptz > to_timestamp(0) AND
-				'$last_inactive_time'::timestamptz >= '$slot_creation_time'::timestamptz;]
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+				'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
 		),
 		't',
 		"last inactive time for an active slot $slot_name is sane");
 
-	return $last_inactive_time;
+	return $inactive_since;
 }
 
 done_testing();
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..e7c33c0066 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -178,6 +178,13 @@ $primary->poll_query_until(
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before which the logical failover slots are synced/created
+# on the standby.
+my $slots_creation_time = $standby1->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Synchronize the primary server slots to the standby.
 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
 
@@ -190,6 +197,11 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Confirm that the logical failover slots that have synced on the standby has
+# got a valid inactive_since value representing the last slot sync time.
+capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $slots_creation_time);
+capture_and_validate_slot_inactive_since($standby1, 'lsub2_slot', $slots_creation_time);
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -773,4 +785,26 @@ is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
 	"20",
 	'data replicated from the new primary');
 
+# Capture and validate inactive_since of a given slot.
+sub capture_and_validate_slot_inactive_since
+{
+	my ($node, $slot_name, $slot_creation_time) = @_;
+
+	my $inactive_since = $node->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	is( $node->safe_psql(
+			'postgres',
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+				'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
+		),
+		't',
+		"last inactive time for a synced slot $slot_name is sane");
+
+	return $inactive_since;
+}
+
 done_testing();
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index dfcbaec387..f53c3036a6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,12 +1473,12 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.last_inactive_time,
+    l.inactive_since,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

From bfb5031edf2efefc9461ab649342099925b671e8 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 26 Mar 2024 08:45:01 +0000
Subject: [PATCH v22 2/3] Allow setting inactive_timeout for replication slots
 via SQL API.

This commit adds a new replication slot property called
inactive_timeout specifying the amount of time in seconds the slot
is allowed to be inactive. It is added to slot's persistent data
structure to survive during server restarts. It will be synced to
failover slots on the standby, and also will be carried over to
the new cluster as part of pg_upgrade.

This commit particularly lets one specify the inactive_timeout for
a slot via SQL functions pg_create_physical_replication_slot and
pg_create_logical_replication_slot.

The new property will be useful to implement inactive timeout based
replication slot invalidation in a future commit.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 contrib/test_decoding/expected/slot.out       | 97 +++++++++++++++++++
 contrib/test_decoding/sql/slot.sql            | 30 ++++++
 doc/src/sgml/func.sgml                        | 18 ++--
 doc/src/sgml/system-views.sgml                |  9 ++
 src/backend/catalog/system_functions.sql      |  2 +
 src/backend/catalog/system_views.sql          |  1 +
 src/backend/replication/logical/slotsync.c    | 17 +++-
 src/backend/replication/slot.c                |  8 +-
 src/backend/replication/slotfuncs.c           | 31 +++++-
 src/backend/replication/walsender.c           |  4 +-
 src/bin/pg_upgrade/info.c                     |  6 +-
 src/bin/pg_upgrade/pg_upgrade.c               |  5 +-
 src/bin/pg_upgrade/pg_upgrade.h               |  2 +
 src/bin/pg_upgrade/t/003_logical_slots.pl     | 11 ++-
 src/include/catalog/pg_proc.dat               | 22 ++---
 src/include/replication/slot.h                |  5 +-
 .../t/040_standby_failover_slots_sync.pl      | 13 ++-
 src/test/regress/expected/rules.out           |  3 +-
 18 files changed, 243 insertions(+), 41 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 349ab2d380..c318eceefd 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -466,3 +466,100 @@ SELECT pg_drop_replication_slot('physical_slot');
  
 (1 row)
 
+-- Test negative value for inactive_timeout option for slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300);  -- error
+ERROR:  "inactive_timeout" must not be negative
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600);  -- error
+ERROR:  "inactive_timeout" must not be negative
+-- Test inactive_timeout option of physical slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Copy physical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+  slot_name   | slot_type | inactive_timeout 
+--------------+-----------+------------------
+ it_phy_slot1 | physical  |              300
+ it_phy_slot2 | physical  |                0
+ it_phy_slot3 | physical  |              300
+(3 rows)
+
+SELECT pg_drop_replication_slot('it_phy_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_phy_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_phy_slot3');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test inactive_timeout option of logical slots.
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Copy logical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+  slot_name   | slot_type | inactive_timeout 
+--------------+-----------+------------------
+ it_log_slot1 | logical   |              600
+ it_log_slot2 | logical   |                0
+ it_log_slot3 | logical   |              600
+(3 rows)
+
+SELECT pg_drop_replication_slot('it_log_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_log_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_log_slot3');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3be..e5c7b3d359 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -190,3 +190,33 @@ SELECT pg_drop_replication_slot('failover_true_slot');
 SELECT pg_drop_replication_slot('failover_false_slot');
 SELECT pg_drop_replication_slot('failover_default_slot');
 SELECT pg_drop_replication_slot('physical_slot');
+
+-- Test negative value for inactive_timeout option for slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300);  -- error
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600);  -- error
+
+-- Test inactive_timeout option of physical slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300);
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2');
+
+-- Copy physical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3');
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+
+SELECT pg_drop_replication_slot('it_phy_slot1');
+SELECT pg_drop_replication_slot('it_phy_slot2');
+SELECT pg_drop_replication_slot('it_phy_slot3');
+
+-- Test inactive_timeout option of logical slots.
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600);
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding');
+
+-- Copy logical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3');
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+
+SELECT pg_drop_replication_slot('it_log_slot1');
+SELECT pg_drop_replication_slot('it_log_slot2');
+SELECT pg_drop_replication_slot('it_log_slot3');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8ecc02f2b9..2cc26e927a 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28373,7 +28373,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <function>pg_create_physical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</parameter> <type>boolean</type>, <parameter>temporary</parameter> <type>boolean</type> </optional> )
+        <function>pg_create_physical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</parameter> <type>boolean</type>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>inactive_timeout</parameter> <type>integer</type> </optional>)
         <returnvalue>record</returnvalue>
         ( <parameter>slot_name</parameter> <type>name</type>,
         <parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -28390,9 +28390,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         parameter, <parameter>temporary</parameter>, when set to true, specifies that
         the slot should not be permanently stored to disk and is only meant
         for use by the current session. Temporary slots are also
-        released upon any error. This function corresponds
-        to the replication protocol command <literal>CREATE_REPLICATION_SLOT
-        ... PHYSICAL</literal>.
+        released upon any error. The optional fourth
+        parameter, <parameter>inactive_timeout</parameter>, when set to a
+        non-zero value, specifies the amount of time in seconds the slot is
+        allowed to be inactive. This function corresponds to the replication
+        protocol command
+        <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
        </para></entry>
       </row>
 
@@ -28417,7 +28420,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type> </optional> )
+        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type>, <parameter>inactive_timeout</parameter> <type>integer</type> </optional> )
         <returnvalue>record</returnvalue>
         ( <parameter>slot_name</parameter> <type>name</type>,
         <parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -28436,7 +28439,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <parameter>failover</parameter>, when set to true,
         specifies that this slot is enabled to be synced to the
         standbys so that logical replication can be resumed after
-        failover. A call to this function has the same effect as
+        failover. The optional sixth parameter,
+        <parameter>inactive_timeout</parameter>, when set to a
+        non-zero value, specifies the amount of time in seconds the slot is
+        allowed to be inactive. A call to this function has the same effect as
         the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </para></entry>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3c8dca8ca3..a6cb13fd9d 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2523,6 +2523,15 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_timeout</structfield> <type>integer</type>
+      </para>
+      <para>
+        The amount of time in seconds the slot is allowed to be inactive.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>inactive_since</structfield> <type>timestamptz</type>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fe2bb50f46..af27616657 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -469,6 +469,7 @@ AS 'pg_logical_emit_message_bytea';
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     IN temporary boolean DEFAULT false,
+    IN inactive_timeout int DEFAULT 0,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
@@ -480,6 +481,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN temporary boolean DEFAULT false,
     IN twophase boolean DEFAULT false,
     IN failover boolean DEFAULT false,
+    IN inactive_timeout int DEFAULT 0,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 401fb35947..7d9d743dd5 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,6 +1023,7 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
+            L.inactive_timeout,
             L.inactive_since,
             L.conflicting,
             L.invalidation_reason,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index bbf9a2c485..79a968373c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -131,6 +131,7 @@ typedef struct RemoteSlot
 	char	   *database;
 	bool		two_phase;
 	bool		failover;
+	int			inactive_timeout;	/* in seconds */
 	XLogRecPtr	restart_lsn;
 	XLogRecPtr	confirmed_lsn;
 	TransactionId catalog_xmin;
@@ -168,7 +169,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 &&
+		remote_slot->inactive_timeout == slot->data.inactive_timeout)
 		return false;
 
 	/* Avoid expensive operations while holding a spinlock. */
@@ -183,6 +185,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->data.inactive_timeout = remote_slot->inactive_timeout;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -608,7 +611,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
 							  remote_slot->two_phase,
 							  remote_slot->failover,
-							  true);
+							  true,
+							  remote_slot->inactive_timeout);
 
 		/* For shorter lines. */
 		slot = MyReplicationSlot;
@@ -653,9 +657,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, INT4OID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -664,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_timeout"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -744,6 +748,9 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		remote_slot->inactive_timeout = DatumGetInt32(slot_getattr(tupslot, ++col,
+																   &isnull));
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d0a2f440ef..bc7424bac3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -129,7 +129,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	5		/* version for new files */
+#define SLOT_VERSION	6		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -304,11 +304,14 @@ ReplicationSlotValidateName(const char *name, int elevel)
  * failover: If enabled, allows the slot to be synced to standbys so
  *     that logical replication can be resumed after failover.
  * synced: True if the slot is synchronized from the primary server.
+ * inactive_timeout: The amount of time in seconds the slot is allowed to be
+ *     inactive.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency persistency,
-					  bool two_phase, bool failover, bool synced)
+					  bool two_phase, bool failover, bool synced,
+					  int inactive_timeout)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -398,6 +401,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
 	slot->data.synced = synced;
+	slot->data.inactive_timeout = inactive_timeout;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..6e1d8d1f9a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -38,14 +38,15 @@
  */
 static void
 create_physical_replication_slot(char *name, bool immediately_reserve,
-								 bool temporary, XLogRecPtr restart_lsn)
+								 bool temporary, int inactive_timeout,
+								 XLogRecPtr restart_lsn)
 {
 	Assert(!MyReplicationSlot);
 
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-						  false, false);
+						  false, false, inactive_timeout);
 
 	if (immediately_reserve)
 	{
@@ -71,6 +72,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	Name		name = PG_GETARG_NAME(0);
 	bool		immediately_reserve = PG_GETARG_BOOL(1);
 	bool		temporary = PG_GETARG_BOOL(2);
+	int			inactive_timeout = PG_GETARG_INT32(3);	/* in seconds */
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -84,9 +86,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
+	if (inactive_timeout < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("\"inactive_timeout\" must not be negative")));
+
 	create_physical_replication_slot(NameStr(*name),
 									 immediately_reserve,
 									 temporary,
+									 inactive_timeout,
 									 InvalidXLogRecPtr);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
@@ -120,7 +128,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
-								bool failover,
+								bool failover, int inactive_timeout,
 								XLogRecPtr restart_lsn,
 								bool find_startpoint)
 {
@@ -138,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
 	 */
 	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-						  failover, false);
+						  failover, false, inactive_timeout);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -177,6 +185,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
 	bool		failover = PG_GETARG_BOOL(4);
+	int			inactive_timeout = PG_GETARG_INT32(5);	/* in seconds */
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -190,11 +199,17 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckLogicalDecodingRequirements();
 
+	if (inactive_timeout < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("\"inactive_timeout\" must not be negative")));
+
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
 									two_phase,
 									failover,
+									inactive_timeout,
 									InvalidXLogRecPtr,
 									true);
 
@@ -239,7 +254,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -410,6 +425,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		values[i++] = Int32GetDatum(slot_contents.data.inactive_timeout);
+
 		if (slot_contents.inactive_since > 0)
 			values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
 		else
@@ -720,6 +737,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	XLogRecPtr	src_restart_lsn;
 	bool		src_islogical;
 	bool		temporary;
+	int			inactive_timeout;	/* in seconds */
 	char	   *plugin;
 	Datum		values[2];
 	bool		nulls[2];
@@ -776,6 +794,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	src_restart_lsn = first_slot_contents.data.restart_lsn;
 	temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
 	plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
+	inactive_timeout = first_slot_contents.data.inactive_timeout;
 
 	/* Check type of replication slot */
 	if (src_islogical != logical_slot)
@@ -823,6 +842,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										false,
+										inactive_timeout,
 										src_restart_lsn,
 										false);
 	}
@@ -830,6 +850,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		create_physical_replication_slot(NameStr(*dst_name),
 										 true,
 										 temporary,
+										 inactive_timeout,
 										 src_restart_lsn);
 
 	/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..5315c08650 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1221,7 +1221,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false);
+							  false, false, false, 0);
 
 		if (reserve_wal)
 		{
@@ -1252,7 +1252,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false);
+							  two_phase, failover, false, 0);
 
 		/*
 		 * Do options check early so that we can bail before calling the
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..12626987f0 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -676,7 +676,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
+							"%s as caught_up, invalidation_reason IS NOT NULL as invalid, "
+							"inactive_timeout "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
@@ -696,6 +697,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 		int			i_failover;
 		int			i_caught_up;
 		int			i_invalid;
+		int			i_inactive_timeout;
 
 		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
 
@@ -705,6 +707,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 		i_failover = PQfnumber(res, "failover");
 		i_caught_up = PQfnumber(res, "caught_up");
 		i_invalid = PQfnumber(res, "invalid");
+		i_inactive_timeout = PQfnumber(res, "inactive_timeout");
 
 		for (int slotnum = 0; slotnum < num_slots; slotnum++)
 		{
@@ -716,6 +719,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 			curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
 			curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
 			curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+			curr->inactive_timeout = atooid(PQgetvalue(res, slotnum, i_inactive_timeout));
 		}
 	}
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index f6143b6bc4..2656056103 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -931,9 +931,10 @@ create_logical_replication_slots(void)
 			appendPQExpBuffer(query, ", ");
 			appendStringLiteralConn(query, slot_info->plugin, conn);
 
-			appendPQExpBuffer(query, ", false, %s, %s);",
+			appendPQExpBuffer(query, ", false, %s, %s, %d);",
 							  slot_info->two_phase ? "true" : "false",
-							  slot_info->failover ? "true" : "false");
+							  slot_info->failover ? "true" : "false",
+							  slot_info->inactive_timeout);
 
 			PQclear(executeQueryOrDie(conn, "%s", query->data));
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 92bcb693fb..eb86d000b1 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -162,6 +162,8 @@ typedef struct
 	bool		invalid;		/* if true, the slot is unusable */
 	bool		failover;		/* is the slot designated to be synced to the
 								 * physical standby? */
+	int			inactive_timeout;	/* The amount of time in seconds the slot
+									 * is allowed to be inactive. */
 } LogicalSlotInfo;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl
index 83d71c3084..6e82d2cb7b 100644
--- a/src/bin/pg_upgrade/t/003_logical_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_slots.pl
@@ -153,14 +153,17 @@ like(
 # TEST: Successful upgrade
 
 # Preparations for the subsequent test:
-# 1. Setup logical replication (first, cleanup slots from the previous tests)
+# 1. Setup logical replication (first, cleanup slots from the previous tests,
+# and then create slot for this test with inactive_timeout set).
 my $old_connstr = $oldpub->connstr . ' dbname=postgres';
 
+my $inactive_timeout = 3600;
 $oldpub->start;
 $oldpub->safe_psql(
 	'postgres', qq[
 	SELECT * FROM pg_drop_replication_slot('test_slot1');
 	SELECT * FROM pg_drop_replication_slot('test_slot2');
+	SELECT pg_create_logical_replication_slot(slot_name := 'regress_sub', plugin := 'pgoutput', inactive_timeout := $inactive_timeout);
 	CREATE PUBLICATION regress_pub FOR ALL TABLES;
 ]);
 
@@ -172,7 +175,7 @@ $sub->start;
 $sub->safe_psql(
 	'postgres', qq[
 	CREATE TABLE tbl (a int);
-	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true', failover = 'true')
+	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (slot_name = 'regress_sub', create_slot = false, two_phase = 'true', failover = 'true')
 ]);
 $sub->wait_for_subscription_sync($oldpub, 'regress_sub');
 
@@ -192,8 +195,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
 # Check that the slot 'regress_sub' has migrated to the new cluster
 $newpub->start;
 my $result = $newpub->safe_psql('postgres',
-	"SELECT slot_name, two_phase, failover FROM pg_replication_slots");
-is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster');
+	"SELECT slot_name, two_phase, failover, inactive_timeout = $inactive_timeout FROM pg_replication_slots");
+is($result, qq(regress_sub|t|t|t), 'check the slot exists on new cluster');
 
 # Update the connection
 my $new_connstr = $newpub->connstr . ' dbname=postgres';
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2f7cfc02c6..ea4ffb509a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11105,10 +11105,10 @@
 # replication slots
 { oid => '3779', descr => 'create a physical replication slot',
   proname => 'pg_create_physical_replication_slot', provolatile => 'v',
-  proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool',
-  proallargtypes => '{name,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,o,o}',
-  proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool int4',
+  proallargtypes => '{name,bool,bool,int4,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{slot_name,immediately_reserve,temporary,inactive_timeout,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
 { oid => '4220',
   descr => 'copy a physical replication slot, changing temporality',
@@ -11133,17 +11133,17 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,int4,timestamptz,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_timeout,inactive_since,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
   proparallel => 'u', prorettype => 'record',
-  proargtypes => 'name name bool bool bool',
-  proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+  proargtypes => 'name name bool bool bool int4',
+  proallargtypes => '{name,name,bool,bool,bool,int4,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,failover,inactive_timeout,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..5a812ef528 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -127,6 +127,9 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/* The amount of time in seconds the slot is allowed to be inactive */
+	int			inactive_timeout;
 } ReplicationSlotPersistentData;
 
 /*
@@ -239,7 +242,7 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  ReplicationSlotPersistency persistency,
 								  bool two_phase, bool failover,
-								  bool synced);
+								  bool synced, int inactive_timeout);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index e7c33c0066..a6fb0f1040 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -152,8 +152,9 @@ log_min_messages = 'debug2'
 $primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
 $primary->reload;
 
+my $inactive_timeout = 3600;
 $primary->psql('postgres',
-	q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
+	"SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true, $inactive_timeout);"
 );
 
 $primary->psql('postgres',
@@ -202,6 +203,16 @@ is( $standby1->safe_psql(
 capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $slots_creation_time);
 capture_and_validate_slot_inactive_since($standby1, 'lsub2_slot', $slots_creation_time);
 
+# Confirm that the synced slot on the standby has got inactive_timeout from the
+# primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT inactive_timeout = $inactive_timeout FROM pg_replication_slots
+			WHERE slot_name = 'lsub2_slot' AND synced AND NOT temporary;"
+	),
+	"t",
+	'synced logical slot has got inactive_timeout on standby');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f53c3036a6..7f3b70f598 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,12 +1473,13 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
+    l.inactive_timeout,
     l.inactive_since,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_timeout, inactive_since, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

From 0701c1bc0d61a812e052697d81f49cb9ca23f194 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Tue, 26 Mar 2024 08:54:13 +0000
Subject: [PATCH v22 3/3] Add inactive_timeout based replication slot
 invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days at slot level, after which the inactive slots get
dropped.

To achieve the above, postgres uses replication slot property
last_inactive_time (the time at which the slot became inactive),
and a new slot level parameter inactive_timeout and finds an
opportunity to invalidate the slot based on this new mechanism.
The invalidation check happens at various locations to help
being as latest as possible, these locations include the
following:
- Whenever the slot is acquired if the slot
  gets invalidated due to this new mechanism, an error is
  emitted.
- During checkpoint.

Note that this new invalidation mechanism won't kick-in for the
slots that are currently being synced from the primary to the
standby.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/func.sgml                        |   8 +-
 doc/src/sgml/system-views.sgml                |  10 +-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   4 +-
 src/backend/replication/slot.c                | 176 +++++++++++++++++-
 src/backend/replication/slotfuncs.c           |  12 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/bin/pg_upgrade/pg_upgrade.h               |   3 +-
 src/include/replication/slot.h                |   8 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 169 +++++++++++++++++
 12 files changed, 380 insertions(+), 19 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 2cc26e927a..fb1640ae12 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28393,8 +28393,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         released upon any error. The optional fourth
         parameter, <parameter>inactive_timeout</parameter>, when set to a
         non-zero value, specifies the amount of time in seconds the slot is
-        allowed to be inactive. This function corresponds to the replication
-        protocol command
+        allowed to be inactive before getting invalidated.
+        This function corresponds to the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
        </para></entry>
       </row>
@@ -28442,8 +28442,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         failover. The optional sixth parameter,
         <parameter>inactive_timeout</parameter>, when set to a
         non-zero value, specifies the amount of time in seconds the slot is
-        allowed to be inactive. A call to this function has the same effect as
-        the replication protocol command
+        allowed to be inactive before getting invalidated. A call to this
+        function has the same effect as the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </para></entry>
       </row>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index a6cb13fd9d..3b09838a0b 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2528,7 +2528,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        <structfield>inactive_timeout</structfield> <type>integer</type>
       </para>
       <para>
-        The amount of time in seconds the slot is allowed to be inactive.
+        The amount of time in seconds the slot is allowed to be inactive
+        before getting invalidated.
       </para></entry>
      </row>
 
@@ -2582,6 +2583,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for the duration specified by slot's
+          <literal>inactive_timeout</literal> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 79a968373c..e94ac0f13f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -320,7 +320,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -530,7 +530,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
 		 * if the slot is not acquired by other processes.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bc7424bac3..0d7f2c0f50 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -158,6 +159,8 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool InvalidateSlotForInactiveTimeout(ReplicationSlot *slot,
+											 bool need_locks);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -305,7 +308,7 @@ ReplicationSlotValidateName(const char *name, int elevel)
  *     that logical replication can be resumed after failover.
  * synced: True if the slot is synchronized from the primary server.
  * inactive_timeout: The amount of time in seconds the slot is allowed to be
- *     inactive.
+ *     inactive before getting invalidated.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
@@ -539,9 +542,14 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * If check_for_invalidation is true, the slot is checked for invalidation
+ * based on its inactive_timeout parameter and an error is raised after making
+ * the slot ours.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -619,6 +627,42 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * Check if the given slot can be invalidated based on its
+	 * inactive_timeout parameter. If yes, persist the invalidated state to
+	 * disk and then error out. We do this only after making the slot ours to
+	 * avoid anyone else acquiring it while we check for its invalidation.
+	 */
+	if (check_for_invalidation)
+	{
+		/* The slot is ours by now */
+		Assert(s->active_pid == MyProcPid);
+
+		/*
+		 * Well, the slot is not yet ours really unless we check for the
+		 * invalidation below.
+		 */
+		s->active_pid = 0;
+		if (InvalidateReplicationSlotForInactiveTimeout(s, true, true))
+		{
+			/*
+			 * If the slot has been invalidated, recalculate the resource
+			 * limits.
+			 */
+			ReplicationSlotsComputeRequiredXmin(false);
+			ReplicationSlotsComputeRequiredLSN();
+
+			/* Might need it for slot clean up on error, so restore it */
+			s->active_pid = MyProcPid;
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot acquire invalidated replication slot \"%s\"",
+							NameStr(MyReplicationSlot->data.name)),
+					 errdetail("This slot has been invalidated because of its inactive_timeout parameter.")));
+		}
+		s->active_pid = MyProcPid;
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -786,7 +830,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -809,7 +853,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1511,6 +1555,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by slot's inactive_timeout parameter."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1624,6 +1671,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (InvalidateReplicationSlotForInactiveTimeout(s, false, false))
+						invalidation_cause = cause;
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1777,6 +1828,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1828,6 +1880,105 @@ restart:
 	return invalidated;
 }
 
+/*
+ * Invalidate given slot based on its inactive_timeout parameter.
+ *
+ * Returns true if the slot has got invalidated.
+ *
+ * NB - this function also runs as part of checkpoint, so avoid raising errors
+ * if possible.
+ */
+bool
+InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+											bool need_locks,
+											bool persist_state)
+{
+	if (!InvalidateSlotForInactiveTimeout(slot, need_locks))
+		return false;
+
+	Assert(slot->active_pid == 0);
+
+	SpinLockAcquire(&slot->mutex);
+	slot->data.invalidated = RS_INVAL_INACTIVE_TIMEOUT;
+
+	/* Make sure the invalidated state persists across server restart */
+	slot->just_dirtied = true;
+	slot->dirty = true;
+	SpinLockRelease(&slot->mutex);
+
+	if (persist_state)
+	{
+		char		path[MAXPGPATH];
+
+		sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+		SaveSlotToPath(slot, path, ERROR);
+	}
+
+	ReportSlotInvalidation(RS_INVAL_INACTIVE_TIMEOUT, false, 0,
+						   slot->data.name, InvalidXLogRecPtr,
+						   InvalidXLogRecPtr, InvalidTransactionId);
+
+	return true;
+}
+
+/*
+ * Helper for InvalidateReplicationSlotForInactiveTimeout
+ */
+static bool
+InvalidateSlotForInactiveTimeout(ReplicationSlot *slot, bool need_locks)
+{
+	ReplicationSlotInvalidationCause inavidation_cause = RS_INVAL_NONE;
+
+	if (slot->inactive_since == 0 ||
+		slot->data.inactive_timeout == 0)
+		return false;
+
+	/*
+	 * Do not invalidate the slots which are currently being synced from the
+	 * primary to the standby.
+	 */
+	if (RecoveryInProgress() && slot->data.synced)
+		return false;
+
+	if (need_locks)
+	{
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+		/*
+		 * Check if the slot needs to be invalidated due to inactive_timeout.
+		 * We do this with the spinlock held to avoid race conditions -- for
+		 * example the restart_lsn could move forward, or the slot could be
+		 * dropped.
+		 */
+
+		SpinLockAcquire(&slot->mutex);
+	}
+
+	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+	if (slot->inactive_since > 0 &&
+		slot->data.inactive_timeout > 0)
+	{
+		TimestampTz now;
+
+		/* inactive_since is only tracked for inactive slots */
+		Assert(slot->active_pid == 0);
+
+		now = GetCurrentTimestamp();
+		if (TimestampDifferenceExceeds(slot->inactive_since, now,
+									   slot->data.inactive_timeout * 1000))
+			inavidation_cause = RS_INVAL_INACTIVE_TIMEOUT;
+	}
+
+	if (need_locks)
+	{
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+
+	return (inavidation_cause == RS_INVAL_INACTIVE_TIMEOUT);
+}
+
 /*
  * Flush all replication slots to disk.
  *
@@ -1840,6 +1991,7 @@ void
 CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
+	bool		invalidated = false;
 
 	elog(DEBUG1, "performing replication slot checkpoint");
 
@@ -1863,6 +2015,13 @@ CheckPointReplicationSlots(bool is_shutdown)
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
 
+		/*
+		 * Here's an opportunity to invalidate inactive replication slots
+		 * based on timeout, so let's do it.
+		 */
+		if (InvalidateReplicationSlotForInactiveTimeout(s, true, false))
+			invalidated = true;
+
 		/*
 		 * Slot's data is not flushed each time the confirmed_flush LSN is
 		 * updated as that could lead to frequent writes.  However, we decide
@@ -1889,6 +2048,13 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	/* If the slot has been invalidated, recalculate the resource limits */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6e1d8d1f9a..4ea4db0f87 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -258,6 +258,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
+	bool		invalidated = false;
 
 	/*
 	 * We don't require any special permission to see this function's data
@@ -466,6 +467,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 	LWLockRelease(ReplicationSlotControlLock);
 
+	/*
+	 * If the slot has been invalidated, recalculate the resource limits
+	 */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
+
 	return (Datum) 0;
 }
 
@@ -668,7 +678,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5315c08650..7dda2f5a66 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1459,7 +1459,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index eb86d000b1..38d105c5d6 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -163,7 +163,8 @@ typedef struct
 	bool		failover;		/* is the slot designated to be synced to the
 								 * physical standby? */
 	int			inactive_timeout;	/* The amount of time in seconds the slot
-									 * is allowed to be inactive. */
+									 * is allowed to be inactive before
+									 * getting invalidated. */
 } LogicalSlotInfo;
 
 typedef struct
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 5a812ef528..75b0bad083 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -248,7 +250,8 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
@@ -267,6 +270,9 @@ extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause
 											   XLogSegNo oldestSegno,
 											   Oid dboid,
 											   TransactionId snapshotConflictHorizon);
+extern bool InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+														bool need_locks,
+														bool persist_state);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..708a2a3798 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..b906d240a5
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,169 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Check for invalidation of slot in server log.
+sub check_slots_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"", $offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated, "check that slot $slot_name invalidation has been logged");
+}
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot due to inactive_timeout
+#
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid checkpoint during the test, otherwise, the test can get unpredictable
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+$standby1->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+});
+
+# Set timeout so that the slot when inactive will get invalidated after the
+# timeout.
+my $inactive_timeout = 5;
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', inactive_timeout := $inactive_timeout);
+]);
+
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Check inactive_timeout is what we've set above
+my $result = $primary->safe_psql(
+	'postgres', qq[
+	SELECT inactive_timeout = $inactive_timeout
+		FROM pg_replication_slots WHERE slot_name = 'sb1_slot';
+]);
+is($result, "t",
+	'check the inactive replication slot info for an active slot');
+
+my $logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby1->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE inactive_since IS NOT NULL
+            AND slot_name = 'sb1_slot'
+            AND inactive_timeout = $inactive_timeout;
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+check_slots_invalidation_in_server_log($primary, 'sb1_slot', $logstart);
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for inactive replication slot sb1_slot to be invalidated";
+
+# Testcase end: Invalidate streaming standby's slot due to inactive_timeout
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to inactive_timeout
+my $publisher = $primary;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$subscriber->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput', inactive_timeout := $inactive_timeout);
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the inactive replication slot info to be updated
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE inactive_since IS NOT NULL
+            AND slot_name = 'lsub1_slot'
+            AND inactive_timeout = $inactive_timeout;
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+check_slots_invalidation_in_server_log($publisher, 'lsub1_slot', $logstart);
+
+# Testcase end: Invalidate logical subscriber's slot due to inactive_timeout
+# =============================================================================
+
+done_testing();
-- 
2.34.1

Reply via email to