On Thu, Mar 28, 2024 at 3:13 PM Bertrand Drouvot
<bertranddrouvot...@gmail.com> wrote:
>
> Regarding 0002:

Thanks for reviewing it.

> Some testing:
>
> T1 ===
>
> When the slot is invalidated on the primary, then the reason is propagated to
> the sync slot (if any). That's fine but we are loosing the inactive_since on 
> the
> standby:
>
> Primary:
>
> postgres=# select slot_name,inactive_since,conflicting,invalidation_reason 
> from pg_replication_slots where slot_name='lsub29_slot';
>   slot_name  |        inactive_since         | conflicting | 
> invalidation_reason
> -------------+-------------------------------+-------------+---------------------
>  lsub29_slot | 2024-03-28 08:24:51.672528+00 | f           | inactive_timeout
> (1 row)
>
> Standby:
>
> postgres=# select slot_name,inactive_since,conflicting,invalidation_reason 
> from pg_replication_slots where slot_name='lsub29_slot';
>   slot_name  | inactive_since | conflicting | invalidation_reason
> -------------+----------------+-------------+---------------------
>  lsub29_slot |                | f           | inactive_timeout
> (1 row)
>
> I think in this case it should always reflect the value from the primary (so
> that one can understand why it is invalidated).

I'll come back to this as soon as we all agree on inactive_since
behavior for synced slots.

> T2 ===
>
> And it is set to a value during promotion:
>
> postgres=# select pg_promote();
>  pg_promote
> ------------
>  t
> (1 row)
>
> postgres=# select slot_name,inactive_since,conflicting,invalidation_reason 
> from pg_replication_slots where slot_name='lsub29_slot';
>   slot_name  |        inactive_since        | conflicting | 
> invalidation_reason
> -------------+------------------------------+-------------+---------------------
>  lsub29_slot | 2024-03-28 08:30:11.74505+00 | f           | inactive_timeout
> (1 row)
>
> I think when it is invalidated it should always reflect the value from the
> primary (so that one can understand why it is invalidated).

I'll come back to this as soon as we all agree on inactive_since
behavior for synced slots.

> T3 ===
>
> As far the slot invalidation on the primary:
>
> postgres=# SELECT * FROM pg_logical_slot_get_changes('lsub29_slot', NULL, 
> NULL, 'include-xids', '0');
> ERROR:  cannot acquire invalidated replication slot "lsub29_slot"
>
> Can we make the message more consistent with what can be found in 
> CreateDecodingContext()
> for example?

Hm, that makes sense because slot acquisition and release is something
internal to the server.

> T4 ===
>
> Also, it looks like querying pg_replication_slots() does not trigger an
> invalidation: I think it should if the slot is not invalidated yet (and 
> matches
> the invalidation criteria).

There's a different opinion on this, check comment #3 from
https://www.postgresql.org/message-id/CAA4eK1LLj%2BeaMN-K8oeOjfG%2BUuzTY%3DL5PXbcMJURZbFm%2B_aJSA%40mail.gmail.com.

> Code review:
>
> CR1 ===
>
> +        Invalidate replication slots that are inactive for longer than this
> +        amount of time. If this value is specified without units, it is taken
>
> s/Invalidate/Invalidates/?

Done.

> Should we mention the relationship with inactive_since?

Done.

> CR2 ===
>
> + *
> + * If check_for_invalidation is true, the slot is checked for invalidation
> + * based on replication_slot_inactive_timeout GUC and an error is raised 
> after making the slot ours.
>   */
>  void
> -ReplicationSlotAcquire(const char *name, bool nowait)
> +ReplicationSlotAcquire(const char *name, bool nowait,
> +                                          bool check_for_invalidation)
>
>
> s/check_for_invalidation/check_for_timeout_invalidation/?

Done.

> CR3 ===
>
> +       if (slot->inactive_since == 0 ||
> +               replication_slot_inactive_timeout == 0)
> +               return false;
>
> Better to test replication_slot_inactive_timeout first? (I mean there is no
> point of testing inactive_since if replication_slot_inactive_timeout == 0)
>
> CR4 ===
>
> +       if (slot->inactive_since > 0 &&
> +               replication_slot_inactive_timeout > 0)
> +       {
>
> Same.
>
> So, instead of CR3 === and CR4 ===, I wonder if it wouldn't be better to do
> something like:
>
> if (replication_slot_inactive_timeout == 0)
>         return false;
> else if (slot->inactive_since > 0)
> .
> else
>         return false;
>
> That would avoid checking replication_slot_inactive_timeout and inactive_since
> multiple times.

Done.

> CR5 ===
>
> +        * held to avoid race conditions -- for example the restart_lsn could 
> move
> +        * forward, or the slot could be dropped.
>
> Does the restart_lsn example makes sense here?

No, it doesn't. Modified that.

> CR6 ===
>
> +static bool
> +InvalidateSlotForInactiveTimeout(ReplicationSlot *slot, bool need_locks)
> +{
>
> InvalidatePossiblyInactiveSlot() maybe?

I think we will lose the essence i.e. timeout from the suggested
function name, otherwise just the inactive doesn't give a clearer
meaning. I kept it that way unless anyone suggests otherwise.

> CR7 ===
>
> +       /* Make sure the invalidated state persists across server restart */
> +       slot->just_dirtied = true;
> +       slot->dirty = true;
> +       SpinLockRelease(&slot->mutex);
>
> Maybe we could create a new function say MarkGivenReplicationSlotDirty()
> with a slot as parameter, that ReplicationSlotMarkDirty could call too?

Done that.

> Then maybe we could set slot->data.invalidated = RS_INVAL_INACTIVE_TIMEOUT in
> InvalidateSlotForInactiveTimeout()? (to avoid multiple 
> SpinLockAcquire/SpinLockRelease).

Done that.

> CR8 ===
>
> +       if (persist_state)
> +       {
> +               char            path[MAXPGPATH];
> +
> +               sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
> +               SaveSlotToPath(slot, path, ERROR);
> +       }
>
> Maybe we could create a new function say GivenReplicationSlotSave()
> with a slot as parameter, that ReplicationSlotSave() could call too?

Done that.

> CR9 ===
>
> +       if (check_for_invalidation)
> +       {
> +               /* The slot is ours by now */
> +               Assert(s->active_pid == MyProcPid);
> +
> +               /*
> +                * Well, the slot is not yet ours really unless we check for 
> the
> +                * invalidation below.
> +                */
> +               s->active_pid = 0;
> +               if (InvalidateReplicationSlotForInactiveTimeout(s, true, 
> true))
> +               {
> +                       /*
> +                        * If the slot has been invalidated, recalculate the 
> resource
> +                        * limits.
> +                        */
> +                       ReplicationSlotsComputeRequiredXmin(false);
> +                       ReplicationSlotsComputeRequiredLSN();
> +
> +                       /* Might need it for slot clean up on error, so 
> restore it */
> +                       s->active_pid = MyProcPid;
> +                       ereport(ERROR,
> +                                       
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                                        errmsg("cannot acquire invalidated 
> replication slot \"%s\"",
> +                                                       
> NameStr(MyReplicationSlot->data.name))));
> +               }
> +               s->active_pid = MyProcPid;
>
> Are we not missing some SpinLockAcquire/Release on the slot's mutex here? (the
> places where we set the active_pid).

Hm, yes. But, shall I acquire the mutex, set active_pid to 0 for a
moment just to satisfy Assert(slot->active_pid == 0); in
InvalidateReplicationSlotForInactiveTimeout and
InvalidateSlotForInactiveTimeout? I just removed the assertions
because being replication_slot_inactive_timeout > 0 and inactive_since
> 0 is enough for these functions to think and decide on inactive
timeout invalidation.

> CR10 ===
>
> @@ -1628,6 +1674,10 @@ 
> InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
>                                         if (SlotIsLogical(s))
>                                                 invalidation_cause = cause;
>                                         break;
> +                               case RS_INVAL_INACTIVE_TIMEOUT:
> +                                       if 
> (InvalidateReplicationSlotForInactiveTimeout(s, false, false))
> +                                               invalidation_cause = cause;
> +                                       break;
>
> InvalidatePossiblyObsoleteSlot() is not called with such a reason, better to 
> use
> an Assert here and in the caller too?

Done.

> CR11 ===
>
> +++ b/src/test/recovery/t/050_invalidate_slots.pl
>
> why not using 019_replslot_limit.pl?

I understand that 019_replslot_limit covers wal_removed related
invalidations. But, I don't want to kludge it with a bunch of other
tests. The new tests anyway need a bunch of new nodes and a couple of
helper functions. Any future invalidation mechanisms can be added here
in this new file. Also, having a separate file quickly helps isolate
any test failures that BF animals might report in future. I don't
think a separate test file here hurts anyone unless there's a strong
reason against it.

Please see the attached v30 patch. 0002 is where all of the above
review comments have been addressed.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From b84cf9dc5d20e202e08c372e0aa7850966ed7271 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 30 Mar 2024 20:52:48 +0000
Subject: [PATCH v30 1/2] Maintain inactive_since for synced slots correctly.

The slot's inactive_since isn't currently maintained for
synced slots on the standby. The commit a11f330b55 prevents
updating inactive_since with RecoveryInProgress() check in
RestoreSlotFromDisk(). But, the issue is that
RecoveryInProgress() always returns true in
RestoreSlotFromDisk() as 'xlogctl->SharedRecoveryState' is
always 'RECOVERY_STATE_CRASH' at that time. Because of this,
inactive_since is always NULL on a promoted standby for all
synced slots even after server restart.

Above issue led us to a question as to why we can't just update
inactive_since for synced slots on the standby with the value
received from remote slot on the primary. This is consistent with
any other slot parameter i.e. all of them are synced from the
primary.

This commit does two things:
1) Updates inactive_since for sync slots with the value
received from the primary's slot.

2) Ensures the value is set to current timestamp during the
shutdown of slot sync machinery to help correctly interpret the
time if the standby gets promoted without a restart.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACWLctoiH-pSjWnEpR54q4DED6rw_BRJm5pCx86_Y01MoQ%40mail.gmail.com
---
---
 doc/src/sgml/system-views.sgml                |  9 +++
 src/backend/replication/logical/slotsync.c    | 61 ++++++++++++++++++-
 src/backend/replication/slot.c                | 40 ++++++++----
 src/test/perl/PostgreSQL/Test/Cluster.pm      | 34 +++++++++++
 src/test/recovery/t/019_replslot_limit.pl     | 26 +-------
 .../t/040_standby_failover_slots_sync.pl      | 43 +++++++++++++
 6 files changed, 173 insertions(+), 40 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3c8dca8ca3..c8d97ab375 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2530,6 +2530,15 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       <para>
         The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
+        Note that the slots on the standbys that are being synced from a
+        primary server (whose <structfield>synced</structfield> field is
+        <literal>true</literal>), will get the
+        <structfield>inactive_since</structfield> value from the
+        corresponding remote slot on the primary. Also, after the standby
+        starts up, the <structfield>inactive_since</structfield> value
+        (for such synced slots) will remain <literal>NULL</literal> until
+        the next synchronization (see
+        <xref linkend="logicaldecoding-replication-slots-synchronization"/>).
       </para></entry>
      </row>
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..9c95a4b062 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -137,9 +137,12 @@ typedef struct RemoteSlot
 
 	/* RS_INVAL_NONE if valid, or the reason of invalidation */
 	ReplicationSlotInvalidationCause invalidated;
+
+	TimestampTz inactive_since; /* in seconds */
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -167,6 +170,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+		remote_slot->inactive_since == slot->inactive_since &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
 		return false;
 
@@ -182,6 +186,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->inactive_since = remote_slot->inactive_since;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -652,9 +657,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, TIMESTAMPTZOID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -663,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_since"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -743,6 +748,13 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		/*
+		 * It is possible to get null value for inactive_since if the slot is
+		 * active on the primary server, so handle accordingly.
+		 */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		remote_slot->inactive_since = isnull ? 0 : DatumGetTimestampTz(d);
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
@@ -1296,6 +1308,46 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	Assert(false);
 }
 
+/*
+ * Update the inactive_since property for synced slots.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+	TimestampTz now = 0;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Check if it is a synchronized slot */
+		if (s->in_use && s->data.synced)
+		{
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * We get the current time beforehand and only once to avoid
+			 * system calls overhead while holding the lock.
+			 */
+			if (now == 0)
+				now = GetCurrentTimestamp();
+
+			/*
+			 * Set the time since the slot has become inactive after shutting
+			 * down slot sync machinery. This helps correctly interpret the
+			 * time if the standby gets promoted without a restart.
+			 */
+			SpinLockAcquire(&s->mutex);
+			s->inactive_since = now;
+			SpinLockRelease(&s->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1309,6 +1361,7 @@ ShutDownSlotSync(void)
 	if (SlotSyncCtx->pid == InvalidPid)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
+		update_synced_slots_inactive_since();
 		return;
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1341,6 +1394,8 @@ ShutDownSlotSync(void)
 	}
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	update_synced_slots_inactive_since();
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d778c0b921..7dbb44b7b0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -42,6 +42,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -655,6 +656,7 @@ ReplicationSlotRelease(void)
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
 	TimestampTz now = 0;
+	bool		is_slot_being_synced = false;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -690,12 +692,15 @@ ReplicationSlotRelease(void)
 	}
 
 	/*
-	 * Set the last inactive time after marking the slot inactive. We don't
-	 * set it for the slots currently being synced from the primary to the
-	 * standby because such slots are typically inactive as decoding is not
-	 * allowed on those.
+	 * Set the time since the slot has become inactive.
+	 *
+	 * Note that we don't set it for the slots currently being synced from the
+	 * primary to the standby, because such slots typically sync the data from
+	 * the remote slot.
 	 */
-	if (!(RecoveryInProgress() && slot->data.synced))
+	if (RecoveryInProgress() && slot->data.synced)
+		is_slot_being_synced = true;
+	else
 		now = GetCurrentTimestamp();
 
 	if (slot->data.persistency == RS_PERSISTENT)
@@ -706,11 +711,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		if (!is_slot_being_synced)
+			slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
-	else
+	else if (!is_slot_being_synced)
 	{
 		SpinLockAcquire(&slot->mutex);
 		slot->inactive_since = now;
@@ -2369,13 +2375,21 @@ RestoreSlotFromDisk(const char *name)
 		slot->active_pid = 0;
 
 		/*
-		 * We set the last inactive time after loading the slot from the disk
-		 * into memory. Whoever acquires the slot i.e. makes the slot active
-		 * will reset it. We don't set it for the slots currently being synced
-		 * from the primary to the standby because such slots are typically
-		 * inactive as decoding is not allowed on those.
+		 * Set the time since the slot has become inactive after loading the
+		 * slot from the disk into memory. Whoever acquires the slot i.e.
+		 * makes the slot active will reset it.
+		 *
+		 * Note that we don't set it for the slots currently being synced from
+		 * the primary to the standby, because such slots typically sync the
+		 * data from the remote slot. We use InRecovery flag instead of
+		 * RecoveryInProgress() as the latter always returns true at this time
+		 * even on primary.
+		 *
+		 * Note that for synced slots after the standby starts up (i.e. after
+		 * the slots are loaded from the disk), the inactive_since will remain
+		 * zero until the next slot sync cycle.
 		 */
-		if (!(RecoveryInProgress() && slot->data.synced))
+		if (!(InRecovery && slot->data.synced))
 			slot->inactive_since = GetCurrentTimestamp();
 		else
 			slot->inactive_since = 0;
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index b08296605c..ddfc3236f3 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3276,6 +3276,40 @@ sub create_logical_slot_on_standby
 
 =pod
 
+=item $node->get_slot_inactive_since_value(self, slot_name, reference_time)
+
+Get inactive_since column value for a given replication slot validating it
+against optional reference time.
+
+=cut
+
+sub get_slot_inactive_since_value
+{
+	my ($self, $slot_name, $reference_time) = @_;
+	my $name = $self->name;
+
+	my $inactive_since = $self->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	if (defined $reference_time)
+	{
+		is($self->safe_psql('postgres',
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+					'$inactive_since'::timestamptz >= '$reference_time'::timestamptz;]
+			),
+			't',
+			"last inactive time for slot $slot_name is valid on node $name")
+			or die "could not validate captured inactive_since for slot $slot_name";
+	}
+
+	return $inactive_since;
+}
+
+=pod
+
 =item $node->advance_wal(num)
 
 Advance WAL of node by given number of segments.
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 3b9a306a8b..c8e5e5054d 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -443,7 +443,7 @@ $primary4->safe_psql(
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the standby below.
 my $inactive_since =
-	capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
+	$primary4->get_slot_inactive_since_value($sb4_slot, $slot_creation_time);
 
 $standby4->start;
 
@@ -502,7 +502,7 @@ $publisher4->safe_psql('postgres',
 # Get inactive_since value after the slot's creation. Note that the slot is
 # still inactive till it's used by the subscriber below.
 $inactive_since =
-	capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
+	$publisher4->get_slot_inactive_since_value($lsub4_slot, $slot_creation_time);
 
 $subscriber4->start;
 $subscriber4->safe_psql('postgres',
@@ -540,26 +540,4 @@ is( $publisher4->safe_psql(
 $publisher4->stop;
 $subscriber4->stop;
 
-# Capture and validate inactive_since of a given slot.
-sub capture_and_validate_slot_inactive_since
-{
-	my ($node, $slot_name, $slot_creation_time) = @_;
-
-	my $inactive_since = $node->safe_psql('postgres',
-		qq(SELECT inactive_since FROM pg_replication_slots
-			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
-		);
-
-	# Check that the captured time is sane
-	is( $node->safe_psql(
-			'postgres',
-			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
-				'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
-		),
-		't',
-		"last inactive time for an active slot $slot_name is sane");
-
-	return $inactive_since;
-}
-
 done_testing();
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..33e3a8dcf0 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
 $subscriber1->start;
 
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Create a slot on the publisher with failover disabled
 $publisher->safe_psql('postgres',
 	"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,10 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
 	1);
 
+# Capture the inactive_since of the slot from the primary
+my $inactive_since_on_primary =
+	$primary->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -190,6 +201,18 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+	$standby1->get_slot_inactive_since_value('lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slots on the standby must get the inactive_since from the primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_primary'::timestamptz = '$inactive_since_on_standby'::timestamptz;"
+	),
+	"t",
+	'synchronized slot has got the inactive_since from the primary');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -750,8 +773,28 @@ $primary->reload;
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 $standby1->promote;
 
+# Capture the inactive_since of the synced slot after the promotion.
+# Expectation here is that the slot gets its own inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+	$standby1->get_slot_inactive_since_value('lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+	),
+	"t",
+	'synchronized slot has got its own inactive_since on the new primary');
+
 # Update subscription with the new primary's connection info
 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
 $subscriber1->safe_psql('postgres',
-- 
2.34.1

From 64bb4f8396595dae62ee07726dfacadb6e87f119 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 31 Mar 2024 04:40:42 +0000
Subject: [PATCH v30 2/2] Add inactive_timeout based replication slot
 invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days at slot level, after which the inactive slots get
dropped.

To achieve the above, postgres introduces a GUC allowing users
set inactive timeout and then a slot stays inactive for this much
amount of time it invalidates the slot. The invalidation check
happens at various locations to help being as latest as possible,
these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.
- During checkpoint
Note that this new invalidation mechanism won't kick-in for the
slots that are currently being synced from the primary to the
standby.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  25 ++
 doc/src/sgml/system-views.sgml                |   7 +
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   4 +-
 src/backend/replication/slot.c                | 196 +++++++++++-
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   8 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 278 ++++++++++++++++++
 13 files changed, 518 insertions(+), 24 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f65c17e5ae..126b461bb1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4545,6 +4545,31 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-inactive-timeout" xreflabel="replication_slot_inactive_timeout">
+      <term><varname>replication_slot_inactive_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_inactive_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidates replication slots that are inactive for longer the
+        specified amount of time. If this value is specified without units,
+        it is taken as seconds. A value of zero (which is default) disables
+        the timeout mechanism. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+
+       <para>
+        The timeout is measured from the time since the slot has become
+        inactive (known from its
+        <structfield>inactive_since</structfield> value) until it gets
+        used (i.e., its <structfield>active</structfield> is set to true).
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index c8d97ab375..5c05fd1c07 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2582,6 +2582,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for the duration specified by
+          <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 9c95a4b062..71b6a254cf 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -321,7 +321,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -531,7 +531,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
 		 * if the slot is not acquired by other processes.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 7dbb44b7b0..7182d89b58 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -141,6 +142,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			replication_slot_inactive_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -159,6 +161,7 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool InvalidateSlotForInactiveTimeout(ReplicationSlot *slot);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -536,9 +539,14 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * If check_for_timeout_invalidation is true, the slot is checked for
+ * invalidation based on replication_slot_inactive_timeout GUC, and an error is
+ * raised after making the slot ours.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_timeout_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -616,6 +624,34 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * Check if the given slot can be invalidated based on its inactive
+	 * timeout. If yes, persist the invalidated state to disk and then error
+	 * out. We do this only after making the slot ours to avoid anyone else
+	 * acquiring it while we check for its invalidation.
+	 */
+	if (check_for_timeout_invalidation)
+	{
+		/* The slot is ours by now */
+		Assert(s->active_pid == MyProcPid);
+
+		if (InvalidateReplicationSlotForInactiveTimeout(s, true))
+		{
+			/*
+			 * If the slot has been invalidated, recalculate the resource
+			 * limits.
+			 */
+			ReplicationSlotsComputeRequiredXmin(false);
+			ReplicationSlotsComputeRequiredLSN();
+
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("can no longer get changes from replication slot \"%s\"",
+							NameStr(MyReplicationSlot->data.name)),
+					 errdetail("This slot has been invalidated because it was inactive for more than the time specified by replication_slot_inactive_timeout parameter.")));
+		}
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -790,7 +826,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -813,7 +849,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -989,6 +1025,20 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	LWLockRelease(ReplicationSlotAllocationLock);
 }
 
+/*
+ * Helper for ReplicationSlotSave
+ */
+static inline void
+SaveGivenReplicationSlot(ReplicationSlot *slot, int elevel)
+{
+	char		path[MAXPGPATH];
+
+	Assert(slot != NULL);
+
+	sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+	SaveSlotToPath(slot, path, elevel);
+}
+
 /*
  * Serialize the currently acquired slot's state from memory to disk, thereby
  * guaranteeing the current state will survive a crash.
@@ -996,12 +1046,21 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 void
 ReplicationSlotSave(void)
 {
-	char		path[MAXPGPATH];
+	SaveGivenReplicationSlot(MyReplicationSlot, ERROR);
+}
 
-	Assert(MyReplicationSlot != NULL);
+/*
+ * Helper for ReplicationSlotMarkDirty
+ */
+static inline void
+MarkGivenReplicationSlotDirty(ReplicationSlot *slot)
+{
+	Assert(slot != NULL);
 
-	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
-	SaveSlotToPath(MyReplicationSlot, path, ERROR);
+	SpinLockAcquire(&slot->mutex);
+	slot->just_dirtied = true;
+	slot->dirty = true;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
@@ -1014,14 +1073,7 @@ ReplicationSlotSave(void)
 void
 ReplicationSlotMarkDirty(void)
 {
-	ReplicationSlot *slot = MyReplicationSlot;
-
-	Assert(MyReplicationSlot != NULL);
-
-	SpinLockAcquire(&slot->mutex);
-	MyReplicationSlot->just_dirtied = true;
-	MyReplicationSlot->dirty = true;
-	SpinLockRelease(&slot->mutex);
+	MarkGivenReplicationSlotDirty(MyReplicationSlot);
 }
 
 /*
@@ -1515,6 +1567,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by replication_slot_inactive_timeout parameter."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1559,6 +1614,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
 
+	/*
+	 * This function isn't expected to be called for inactive timeout based
+	 * invalidation. A separate function
+	 * InvalidateReplicationSlotForInactiveTimeout is to be used for that.
+	 */
+	Assert(cause != RS_INVAL_INACTIVE_TIMEOUT);
+
 	for (;;)
 	{
 		XLogRecPtr	restart_lsn;
@@ -1628,6 +1690,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					/* not reachable */
+					Assert(false);
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1781,6 +1847,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1796,6 +1863,13 @@ InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
 	Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
 	Assert(cause != RS_INVAL_NONE);
 
+	/*
+	 * This function isn't expected to be called for inactive timeout based
+	 * invalidation. A separate function
+	 * InvalidateReplicationSlotForInactiveTimeout is to be used for that.
+	 */
+	Assert(cause != RS_INVAL_INACTIVE_TIMEOUT);
+
 	if (max_replication_slots == 0)
 		return invalidated;
 
@@ -1832,6 +1906,81 @@ restart:
 	return invalidated;
 }
 
+/*
+ * Invalidate given slot based on replication_slot_inactive_timeout GUC.
+ *
+ * Returns true if the slot has got invalidated.
+ *
+ * NB - this function also runs as part of checkpoint, so avoid raising errors
+ * if possible.
+ */
+bool
+InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+											bool persist_state)
+{
+	if (!InvalidateSlotForInactiveTimeout(slot))
+		return false;
+
+	/* Make sure the invalidated state persists across server restart */
+	MarkGivenReplicationSlotDirty(slot);
+
+	if (persist_state)
+		SaveGivenReplicationSlot(slot, ERROR);
+
+	ReportSlotInvalidation(RS_INVAL_INACTIVE_TIMEOUT, false, 0,
+						   slot->data.name, InvalidXLogRecPtr,
+						   InvalidXLogRecPtr, InvalidTransactionId);
+
+	return true;
+}
+
+/*
+ * Helper for InvalidateReplicationSlotForInactiveTimeout
+ */
+static bool
+InvalidateSlotForInactiveTimeout(ReplicationSlot *slot)
+{
+	ReplicationSlotInvalidationCause inavidation_cause = RS_INVAL_NONE;
+
+	if (replication_slot_inactive_timeout == 0)
+		return false;
+	else if (slot->inactive_since > 0)
+	{
+		TimestampTz now;
+
+		/*
+		 * Do not invalidate the slots which are currently being synced from
+		 * the primary to the standby.
+		 */
+		if (RecoveryInProgress() && slot->data.synced)
+			return false;
+
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		SpinLockAcquire(&slot->mutex);
+
+		/*
+		 * Check if the slot needs to be invalidated due to
+		 * replication_slot_inactive_timeout GUC. We do this with the spinlock
+		 * held to avoid race conditions -- for example the inactive_since
+		 * could change, or the slot could be dropped.
+		 */
+		now = GetCurrentTimestamp();
+		if (TimestampDifferenceExceeds(slot->inactive_since, now,
+									   replication_slot_inactive_timeout * 1000))
+		{
+			inavidation_cause = RS_INVAL_INACTIVE_TIMEOUT;
+			slot->data.invalidated = RS_INVAL_INACTIVE_TIMEOUT;
+		}
+
+		SpinLockRelease(&slot->mutex);
+		LWLockRelease(ReplicationSlotControlLock);
+
+		return (inavidation_cause == RS_INVAL_INACTIVE_TIMEOUT);
+	}
+
+	return false;
+}
+
 /*
  * Flush all replication slots to disk.
  *
@@ -1844,6 +1993,7 @@ void
 CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
+	bool		invalidated = false;
 
 	elog(DEBUG1, "performing replication slot checkpoint");
 
@@ -1867,6 +2017,13 @@ CheckPointReplicationSlots(bool is_shutdown)
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
 
+		/*
+		 * Here's an opportunity to invalidate inactive replication slots
+		 * based on timeout, so let's do it.
+		 */
+		if (InvalidateReplicationSlotForInactiveTimeout(s, false))
+			invalidated = true;
+
 		/*
 		 * Slot's data is not flushed each time the confirmed_flush LSN is
 		 * updated as that could lead to frequent writes.  However, we decide
@@ -1893,6 +2050,13 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	/* If the slot has been invalidated, recalculate the resource limits */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..677c0bf0a2 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -651,7 +651,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..96eeb8b7d2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1459,7 +1459,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 92fcd5fa4d..c63f76505f 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2971,6 +2971,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&replication_slot_inactive_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index adcc0257f9..18dd57e589 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -334,6 +334,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#replication_slot_inactive_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..8f5e602745 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -230,6 +232,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
+extern PGDLLIMPORT int replication_slot_inactive_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -245,7 +248,8 @@ extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_timeout_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
@@ -264,6 +268,8 @@ extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause
 											   XLogSegNo oldestSegno,
 											   Oid dboid,
 											   TransactionId snapshotConflictHorizon);
+extern bool InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+														bool persist_state);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..708a2a3798 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..8e919915f1
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,278 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot as well as logical
+# failover slot on primary due to inactive timeout GUC. Also, check the logical
+# failover slot synced on to the standby doesn't invalidate the slot on its own,
+# but gets the invalidated state from the remote slot on the primary.
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid checkpoint during the test, otherwise, the test can get unpredictable
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+# Create sync slot on the primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('lsub1_sync_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot');
+]);
+
+$standby1->start;
+
+my $standby1_logstart = -s $standby1->logfile;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Synchronize the primary server slots to the standby.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced'.
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub1_sync_slot' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot has synced as true on standby');
+
+my $logstart = -s $primary->logfile;
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$primary->reload;
+
+# Wait for the logical failover slot to become inactive on the primary. Note
+# that nobody has acquired that slot yet, so due to inactive timeout setting
+# above it must get invalidated.
+wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart);
+
+# Set timeout on the standby also to check the synced slots don't get
+# invalidated due to timeout on the standby.
+$standby1->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$standby1->reload;
+
+# Now, sync the logical failover slot from the remote slot on the primary.
+# Note that the remote slot has already been invalidated due to inactive
+# timeout. Now, the standby must also see it as invalidated.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Wait for the inactive replication slot to be invalidated.
+$standby1->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'lsub1_sync_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for replication slot lsub1_sync_slot invalidation to be synced on standby";
+
+# Synced slot mustn't get invalidated on the standby, it must sync invalidation
+# from the primary. So, we must not see the slot's invalidation message in server
+# log.
+ok( !$standby1->log_contains(
+		"invalidating obsolete replication slot \"lsub1_sync_slot\"",
+		$standby1_logstart),
+	'check that syned slot has not been invalidated on the standby');
+
+# Stop standby to make the standby's replication slot on the primary inactive
+$standby1->stop;
+
+# Wait for the standby's replication slot to become inactive
+wait_for_slot_invalidation($primary, 'sb1_slot', $logstart);
+
+# Testcase end: Invalidate streaming standby's slot as well as logical failover
+# slot on primary due to inactive timeout GUC. Also, check the logical failover
+# slot synced on to the standby doesn't invalidate the slot on its own, but
+# gets the invalidated state from the remote slot on the primary.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to inactive timeout
+# GUC.
+
+my $publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$publisher->reload;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$subscriber->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput');
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+my $result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$publisher->reload;
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# timeout.
+wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart);
+
+# Testcase end: Invalidate logical subscriber's slot due to inactive timeout
+# GUC.
+# =============================================================================
+
+# =============================================================================
+# Start: Helper functions used for this test file
+
+sub wait_for_slot_invalidation
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $name = $node->name;
+
+	# Wait for the replication slot to become inactive
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for replication slot to become inactive";
+
+	# Wait for the replication slot info to be updated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE inactive_since IS NOT NULL
+				AND slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for info of replication slot $slot_name to be updated on node $name";
+
+	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset);
+
+	# Wait for the inactive replication slot to be invalidated.
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND
+			invalidation_reason = 'inactive_timeout';
+	])
+	  or die
+	  "Timed out while waiting for inactive replication slot $slot_name to be invalidated on node $name";
+
+	# Check that the invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+
+	($result, $stdout, $stderr) = $primary->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot_name', '0/1');
+	]);
+
+	ok( $stderr =~ /can no longer get changes from replication slot "$slot_name"/,
+		"detected error upon trying to acquire invalidated slot $slot_name on node $name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot_name";
+}
+
+# Check for invalidation of slot in server log.
+sub check_for_slot_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"",
+				$offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated,
+		"check that slot $slot_name invalidation has been logged");
+}
+
+# =============================================================================
+# End: Helper functions used for this test file
+
+done_testing();
-- 
2.34.1

Reply via email to