On Fri, Dec 6, 2024 at 11:04 AM vignesh C <vignes...@gmail.com> wrote:
>
>
> Determining the correct time may be challenging for users, as it
> depends on when the active_since value is set, as well as when the
> checkpoint_timeout occurs and the subsequent checkpoint is triggered.
> Even if the user sets it to an appropriate value, there is still a
> possibility of delayed identification due to the timing of when the
> slot's active_timeout is being set. Including this information in the
> documentation should be sufficient.
>

+1
v54 documents this information as suggested.

Attached the v54 patch-set addressing all the comments till now in
[1], [2] and [3].

[1] 
https://www.postgresql.org/message-id/CALDaNm0mTWwg0z4v-sorq08S2CdZmL2s%2Brh4nHpWeJaBQ2F%2Bmg%40mail.gmail.com
[2] 
https://www.postgresql.org/message-id/CALDaNm1STyk%3DS_EAihWP9SowBkS5dJ32JfEqmG5tTeC2Ct39yg%40mail.gmail.com
[3] 
https://www.postgresql.org/message-id/CAHut%2BPtHbYNxPvtMfs7jARbsVcFXL1%3DC9SO3Q93NgVDgbKN7LQ%40mail.gmail.com

--
Thanks,
Nisha
From 713871d8cda02f2b70c63983fc49dede3097f016 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 18 Nov 2024 16:13:26 +0530
Subject: [PATCH v54 1/2] Enhance replication slot error handling, slot
 invalidation, and inactive_since setting logic

In ReplicationSlotAcquire(), raise an error for invalid slots if the
caller specifies error_if_invalid=true.

Add check if the slot is already acquired, then mark it invalidated directly.

Ensure same inactive_since time for all slots in update_synced_slots_inactive_since()
and RestoreSlotFromDisk().
---
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    | 13 ++--
 src/backend/replication/slot.c                | 61 ++++++++++++++++---
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 8 files changed, 67 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f4f80b2312..e3645aea53 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
@@ -1508,7 +1508,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 static void
 update_synced_slots_inactive_since(void)
 {
-	TimestampTz now = 0;
+	TimestampTz now;
 
 	/*
 	 * We need to update inactive_since only when we are promoting standby to
@@ -1523,6 +1523,9 @@ update_synced_slots_inactive_since(void)
 	/* The slot sync worker or SQL function mustn't be running by now */
 	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
 			/* The slot must not be acquired by any process */
 			Assert(s->active_pid == 0);
 
-			/* Use the same inactive_since time for all the slots. */
-			if (now == 0)
-				now = GetCurrentTimestamp();
-
 			SpinLockAcquire(&s->mutex);
 			s->inactive_since = now;
 			SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4a206f9527..db94cec5c3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +618,43 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot is found to
+	 * be invalid.
+	 */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+	{
+		StringInfoData err_detail;
+
+		initStringInfo(&err_detail);
+
+		switch (s->data.invalidated)
+		{
+			case RS_INVAL_WAL_REMOVED:
+				appendStringInfo(&err_detail, _("This slot has been invalidated because the required WAL has been removed."));
+				break;
+
+			case RS_INVAL_HORIZON:
+				appendStringInfo(&err_detail, _("This slot has been invalidated because the required rows have been removed."));
+				break;
+
+			case RS_INVAL_WAL_LEVEL:
+				/* translator: %s is a GUC variable name */
+				appendStringInfo(&err_detail, _("This slot has been invalidated because \"%s\" is insufficient for slot."),
+								 "wal_level");
+				break;
+
+			case RS_INVAL_NONE:
+				pg_unreachable();
+		}
+
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer get changes from replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail_internal("%s", err_detail.data));
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +825,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +852,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, false);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1676,11 +1716,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so and mark it as invalidated. If
+		 * the slot is already ours, mark it as invalidated. Otherwise, we'll
+		 * signal the owning process below and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot == s && active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -2208,6 +2249,7 @@ RestoreSlotFromDisk(const char *name)
 	bool		restored = false;
 	int			readBytes;
 	pg_crc32c	checksum;
+	TimestampTz now;
 
 	/* no need to lock here, no concurrent access allowed yet */
 
@@ -2368,6 +2410,9 @@ RestoreSlotFromDisk(const char *name)
 						NameStr(cp.slotdata.name)),
 				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	/* nothing can be active yet, don't lock anything */
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -2400,7 +2445,7 @@ RestoreSlotFromDisk(const char *name)
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
 		 * makes the slot active will reset it.
 		 */
-		slot->inactive_since = GetCurrentTimestamp();
+		slot->inactive_since = now;
 
 		restored = true;
 		break;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 488a161b3e..578cff64c8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 371eef3ddd..b36ae90b2c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 8a45b5827e..e8bc986c07 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index d2cf786fd5..f5f2d22163 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index efb4ba3af1..333e040e7f 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This slot has been invalidated because the required WAL has been removed",
 			$logstart))
 	{
 		$failed = 1;
-- 
2.34.1

From 7fc20252d8828083613e7948b9d5a48349af0b26 Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Wed, 4 Dec 2024 12:51:22 +0530
Subject: [PATCH v54 2/2] Introduce inactive_timeout based replication slot
 invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
this GUC is a bit tricky. Because the amount of WAL a database
generates, and the allocated storage per instance will vary
greatly in production, making it difficult to pin down a
one-size-fits-all value.

It is often easy for users to set a timeout of say 1 or 2 or n
days, after which all the inactive slots get invalidated. This
commit introduces a GUC named idle_replication_slot_timeout.
When set, postgres invalidates slots (during non-shutdown
checkpoints) that are idle for longer than this amount of
time.

Note that the idle timeout invalidation mechanism is not
applicable for slots on the standby server that are being synced
from the primary server (i.e., standby slots having 'synced' field
'true'). Synced slots are always considered to be inactive because
they don't perform logical decoding to produce changes.
---
 doc/src/sgml/config.sgml                      |  39 ++++
 doc/src/sgml/logical-replication.sgml         |   5 +
 doc/src/sgml/system-views.sgml                |  10 +-
 src/backend/replication/logical/slotsync.c    |   4 +-
 src/backend/replication/slot.c                | 155 +++++++++++++--
 src/backend/utils/misc/guc_tables.c           |  12 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/bin/pg_basebackup/pg_createsubscriber.c   |   4 +
 src/bin/pg_upgrade/server.c                   |   7 +
 src/include/replication/slot.h                |  22 ++
 src/include/utils/guc_hooks.h                 |   2 +
 src/test/recovery/meson.build                 |   1 +
 .../t/043_invalidate_inactive_slots.pl        | 188 ++++++++++++++++++
 13 files changed, 434 insertions(+), 16 deletions(-)
 create mode 100644 src/test/recovery/t/043_invalidate_inactive_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e0c8325a39..a888c709fd 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4593,6 +4593,45 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-idle-replication-slot-timeout" xreflabel="idle_replication_slot_timeout">
+      <term><varname>idle_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>idle_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are idle for longer than this
+        amount of time. If this value is specified without units,
+        it is taken as milliseconds. A value of zero (which is default) disables
+        the idle timeout invalidation mechanism. This parameter can only
+        be set in the <filename>postgresql.conf</filename> file or on the
+        server command line.
+       </para>
+
+       <para>
+        Slot invalidation due to idle timeout occurs during checkpoint.
+        If the <varname>checkpoint_timeout</varname> exceeds
+        <varname>idle_replication_slot_timeout</varname>, the slot
+        invalidation will be delayed until the next checkpoint is triggered.
+        To avoid delays, users can force a checkpoint to promptly invalidate
+        inactive slots. The duration of slot inactivity is calculated using the slot's
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>inactive_since</structfield>
+        value.
+       </para>
+
+       <para>
+        Note that the idle timeout invalidation mechanism is not
+        applicable for slots on the standby server that are being synced
+        from the primary server (i.e., standby slots having
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+        value <literal>true</literal>).
+        Synced slots are always considered to be inactive because they don't
+        perform logical decoding to produce changes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 8290cd1a08..158ec18211 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -2163,6 +2163,11 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
     plus some reserve for table synchronization.
    </para>
 
+   <para>
+    Logical replication slots are also affected by
+    <link linkend="guc-idle-replication-slot-timeout"><varname>idle_replication_slot_timeout</varname></link>.
+   </para>
+
    <para>
     <link linkend="guc-max-wal-senders"><varname>max_wal_senders</varname></link>
     should be set to at least the same as
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index a586156614..611db4e539 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2566,7 +2566,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para>
       <para>
         The time when the slot became inactive. <literal>NULL</literal> if the
-        slot is currently being streamed.
+        slot is currently being streamed. If the slot becomes invalidated,
+        this value will remain unchanged until server shutdown.
         Note that for slots on the standby that are being synced from a
         primary server (whose <structfield>synced</structfield> field is
         <literal>true</literal>), the <structfield>inactive_since</structfield>
@@ -2620,6 +2621,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>idle_timeout</literal> means that the slot has remained
+          idle beyond the duration specified by the
+          <xref linkend="guc-idle-replication-slot-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index e3645aea53..9777c6a9cc 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -1540,9 +1540,7 @@ update_synced_slots_inactive_since(void)
 			/* The slot must not be acquired by any process */
 			Assert(s->active_pid == 0);
 
-			SpinLockAcquire(&s->mutex);
-			s->inactive_since = now;
-			SpinLockRelease(&s->mutex);
+			ReplicationSlotSetInactiveSince(s, now, true);
 		}
 	}
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index db94cec5c3..41ef8ecc89 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_IDLE_TIMEOUT] = "idle_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_IDLE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -141,6 +142,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 
+/* Invalidate replication slots idle beyond this time; '0' disables it */
+int			idle_replication_slot_timeout_ms = 0;
+
 /*
  * This GUC lists streaming replication standby server slot names that
  * logical WAL sender processes will wait for.
@@ -644,6 +648,12 @@ retry:
 								 "wal_level");
 				break;
 
+			case RS_INVAL_IDLE_TIMEOUT:
+				/* translator: %s is a GUC variable name */
+				appendStringInfo(&err_detail, _("This slot has been invalidated because inactivity exceeded the time limit set by \"%s\"."),
+								 "idle_replication_slot_timeout");
+				break;
+
 			case RS_INVAL_NONE:
 				pg_unreachable();
 		}
@@ -743,16 +753,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		ReplicationSlotSetInactiveSince(slot, now, false);
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 	else
-	{
-		SpinLockAcquire(&slot->mutex);
-		slot->inactive_since = now;
-		SpinLockRelease(&slot->mutex);
-	}
+		ReplicationSlotSetInactiveSince(slot, now, true);
 
 	MyReplicationSlot = NULL;
 
@@ -1548,7 +1554,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1578,6 +1585,16 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
 			break;
+
+		case RS_INVAL_IDLE_TIMEOUT:
+			Assert(inactive_since > 0);
+			/* translator: second %s is a GUC variable name */
+			appendStringInfo(&err_detail,
+							 _("The slot has been inactive since %s, exceeding the time limit set by \"%s\"."),
+							 timestamptz_to_str(inactive_since),
+							 "idle_replication_slot_timeout");
+			break;
+
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1594,6 +1611,30 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 	pfree(err_detail.data);
 }
 
+/*
+ * Is idle timeout invalidation possible for this replication slot?
+ *
+ * Idle timeout invalidation is allowed only when:
+ *
+ * 1. Idle timeout is set
+ * 2. Slot is inactive
+ * 3. The slot is not being synced from the primary while the server
+ *    is in recovery
+ *
+ * Note that the idle timeout invalidation mechanism is not
+ * applicable for slots on the standby server that are being synced
+ * from the primary server (i.e., standby slots having 'synced' field 'true').
+ * Synced slots are always considered to be inactive because they don't
+ * perform logical decoding to produce changes.
+ */
+static inline bool
+IsSlotIdleTimeoutPossible(ReplicationSlot *s)
+{
+	return (idle_replication_slot_timeout_ms > 0 &&
+			s->inactive_since > 0 &&
+			!(RecoveryInProgress() && s->data.synced));
+}
+
 /*
  * Helper for InvalidateObsoleteReplicationSlots
  *
@@ -1621,6 +1662,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1628,6 +1670,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1638,6 +1681,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_IDLE_TIMEOUT)
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1691,6 +1743,21 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_IDLE_TIMEOUT:
+					Assert(now > 0);
+
+					/*
+					 * Check if the slot needs to be invalidated due to
+					 * idle_replication_slot_timeout GUC.
+					 */
+					if (IsSlotIdleTimeoutPossible(s) &&
+						TimestampDifferenceExceeds(s->inactive_since, now,
+												   idle_replication_slot_timeout_ms))
+					{
+						invalidation_cause = cause;
+						inactive_since = s->inactive_since;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1776,7 +1843,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1822,7 +1890,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since);
 
 			/* done with this slot for now */
 			break;
@@ -1845,6 +1914,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_IDLE_TIMEOUT: idle slot timeout has occurred
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1897,7 +1967,8 @@ restart:
 }
 
 /*
- * Flush all replication slots to disk.
+ * Flush all replication slots to disk. Also, invalidate obsolete slots during
+ * non-shutdown checkpoint.
  *
  * It is convenient to flush dirty replication slots at the time of checkpoint.
  * Additionally, in case of a shutdown checkpoint, we also identify the slots
@@ -1955,6 +2026,45 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!is_shutdown)
+	{
+		elog(DEBUG1, "performing replication slot invalidation checks");
+
+		/*
+		 * NB: We will make another pass over replication slots for
+		 * invalidation checks to keep the code simple. Testing shows that
+		 * there is no noticeable overhead (when compared with wal_removed
+		 * invalidation) even if we were to do idle_timeout invalidation of
+		 * thousands of replication slots here. If it is ever proven that this
+		 * assumption is wrong, we will have to perform the invalidation
+		 * checks in the above for loop with the following changes:
+		 *
+		 * - Acquire ControlLock lock once before the loop.
+		 *
+		 * - Call InvalidatePossiblyObsoleteSlot for each slot.
+		 *
+		 * - Handle the cases in which ControlLock gets released just like
+		 * InvalidateObsoleteReplicationSlots does.
+		 *
+		 * - Avoid saving slot info to disk two times for each invalidated
+		 * slot.
+		 *
+		 * XXX: Should we move idle_timeout invalidation check closer to
+		 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+		 *
+		 * XXX: Slot invalidation due to 'idle_timeout' occurs only for
+		 * released slots, based on 'idle_replication_slot_timeout'. Active
+		 * slots in use for replication are excluded, preventing accidental
+		 * invalidation. Slots where communication between the publisher and
+		 * subscriber is down are also excluded, as they are managed by the
+		 * 'wal_sender_timeout'.
+		 */
+		InvalidateObsoleteReplicationSlots(RS_INVAL_IDLE_TIMEOUT,
+										   0,
+										   InvalidOid,
+										   InvalidTransactionId);
+	}
 }
 
 /*
@@ -2443,7 +2553,9 @@ RestoreSlotFromDisk(const char *name)
 		/*
 		 * Set the time since the slot has become inactive after loading the
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
-		 * makes the slot active will reset it.
+		 * makes the slot active will reset it. Avoid calling
+		 * ReplicationSlotSetInactiveSince() here, as it will not set the time
+		 * for invalid slots.
 		 */
 		slot->inactive_since = now;
 
@@ -2838,3 +2950,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * GUC check_hook for idle_replication_slot_timeout
+ *
+ * The idle_replication_slot_timeout must be disabled (set to 0)
+ * during the binary upgrade.
+ */
+bool
+check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
+{
+	if (IsBinaryUpgrade && *newval != 0)
+	{
+		GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.",
+							"idle_replication_slot_timeout");
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 8cf1afbad2..6a4f15b832 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3047,6 +3047,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time a replication slot can remain idle before "
+						 "it will be invalidated."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&idle_replication_slot_timeout_ms,
+		0, 0, INT_MAX,
+		check_idle_replication_slot_timeout, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a2ac7575ca..6b5c246e8d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -337,6 +337,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#idle_replication_slot_timeout = 0	# in milliseconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index e96370a9ec..a9ac00f44f 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1438,6 +1438,10 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
 	appendShellString(pg_ctl_cmd, subscriber_dir);
 	appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
+
+	/* Prevent unintended slot invalidation */
+	appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
+
 	if (restricted_access)
 	{
 		appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index 91bcb4dbc7..90e2b9f188 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -252,6 +252,13 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
 		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
 
+	/*
+	 * Use idle_replication_slot_timeout=0 to prevent slot invalidation due to
+	 * inactive_timeout by checkpointer process during upgrade.
+	 */
+	if (GET_MAJOR_VERSION(cluster->major_version) >= 1800)
+		appendPQExpBufferStr(&pgoptions, " -c idle_replication_slot_timeout=0");
+
 	/*
 	 * Use -b to disable autovacuum and logical replication launcher
 	 * (effective in PG17 or later for the latter).
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index f5f2d22163..98625f9d13 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -56,6 +56,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* idle slot timeout has occurred */
+	RS_INVAL_IDLE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -228,6 +230,25 @@ typedef struct ReplicationSlotCtlData
 	ReplicationSlot replication_slots[1];
 } ReplicationSlotCtlData;
 
+/*
+ * Set slot's inactive_since property unless it was previously invalidated.
+ */
+static inline void
+ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz now,
+								bool acquire_lock)
+{
+	if (s->data.invalidated != RS_INVAL_NONE)
+		return;
+
+	if (acquire_lock)
+		SpinLockAcquire(&s->mutex);
+
+	s->inactive_since = now;
+
+	if (acquire_lock)
+		SpinLockRelease(&s->mutex);
+}
+
 /*
  * Pointers to shared memory
  */
@@ -237,6 +258,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
+extern PGDLLIMPORT int idle_replication_slot_timeout_ms;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 5813dba0a2..d7a7dffab5 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -174,5 +174,7 @@ extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
 extern bool check_synchronized_standby_slots(char **newval, void **extra,
 											 GucSource source);
 extern void assign_synchronized_standby_slots(const char *newval, void *extra);
+extern bool check_idle_replication_slot_timeout(int *newval, void **extra,
+												GucSource source);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..3b8f45c93e 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_invalidate_inactive_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_invalidate_inactive_slots.pl b/src/test/recovery/t/043_invalidate_inactive_slots.pl
new file mode 100644
index 0000000000..a947bdf2e5
--- /dev/null
+++ b/src/test/recovery/t/043_invalidate_inactive_slots.pl
@@ -0,0 +1,188 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+# =============================================================================
+# Testcase start
+#
+# Test invalidation of streaming standby slot and logical failover slot on the
+# primary due to idle timeout. Also, test logical failover slot synced to
+# the standby from the primary doesn't get invalidated on its own, but gets the
+# invalidated state from the primary.
+
+# Initialize primary
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid unpredictability
+$primary->append_conf(
+	'postgresql.conf', qq{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb_slot1'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+# Create sync slot on the primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('sync_slot1', 'test_decoding', false, false, true);}
+);
+
+# Create standby slot on the primary
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb_slot1', immediately_reserve := true);
+]);
+
+$standby1->start;
+
+# Wait until the standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+my $logstart = -s $standby1->logfile;
+
+# Set timeout GUC on the standby to verify that the next checkpoint will not
+# invalidate synced slots.
+my $idle_timeout_1s = 1;
+$standby1->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET idle_replication_slot_timeout TO '${idle_timeout_1s}s';
+]);
+$standby1->reload;
+
+# Sync the primary slots to the standby
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'sync_slot1' AND synced
+			AND NOT temporary
+			AND invalidation_reason IS NULL;}
+	),
+	"t",
+	'logical slot sync_slot1 is synced to standby');
+
+# Give enough time for inactive_since to exceed the timeout
+sleep($idle_timeout_1s + 1);
+
+# On standby, synced slots are not invalidated by the idle timeout
+# until the invalidation state is propagated from the primary.
+$standby1->safe_psql('postgres', "CHECKPOINT");
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'sync_slot1'
+			AND invalidation_reason IS NULL;}
+	),
+	"t",
+	'check that synced slot sync_slot1 has not been invalidated on standby');
+
+$logstart = -s $primary->logfile;
+
+# Set timeout GUC so that the next checkpoint will invalidate inactive slots
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET idle_replication_slot_timeout TO '${idle_timeout_1s}s';
+]);
+$primary->reload;
+
+# Wait for logical failover slot to become inactive on the primary. Note that
+# nobody has acquired the slot yet, so it must get invalidated due to
+# idle timeout.
+wait_for_slot_invalidation($primary, 'sync_slot1', $logstart,
+	$idle_timeout_1s);
+
+# Re-sync the primary slots to the standby. Note that the primary slot was
+# already invalidated (above) due to idle timeout. The standby must just
+# sync the invalidated state.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'sync_slot1'
+			AND invalidation_reason = 'idle_timeout';}
+	),
+	"t",
+	'check that invalidation of synced slot sync_slot1 is synced on standby');
+
+# Make the standby slot on the primary inactive and check for invalidation
+$standby1->stop;
+wait_for_slot_invalidation($primary, 'sb_slot1', $logstart, $idle_timeout_1s);
+
+# Testcase end
+# =============================================================================
+
+# Wait for slot to first become idle and then get invalidated
+sub wait_for_slot_invalidation
+{
+	my ($node, $slot, $offset, $idle_timeout) = @_;
+	my $node_name = $node->name;
+
+	trigger_slot_invalidation($node, $slot, $offset, $idle_timeout);
+
+	# Check that an invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+	($result, $stdout, $stderr) = $node->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot', '0/1');
+	]);
+	ok( $stderr =~ /can no longer get changes from replication slot "$slot"/,
+		"detected error upon trying to acquire invalidated slot $slot on node $node_name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot on node $node_name";
+}
+
+# Trigger slot invalidation and confirm it in the server log
+sub trigger_slot_invalidation
+{
+	my ($node, $slot, $offset, $idle_timeout) = @_;
+	my $node_name = $node->name;
+	my $invalidated = 0;
+
+	# Give enough time for inactive_since to exceed the timeout
+	sleep($idle_timeout + 1);
+
+	# Run a checkpoint
+	$node->safe_psql('postgres', "CHECKPOINT");
+
+	# The slot's invalidation should be logged
+	$node->wait_for_log(qr/invalidating obsolete replication slot \"$slot\"/,
+		$offset);
+
+	# Check that the invalidation reason is 'idle_timeout'
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot' AND
+			invalidation_reason = 'idle_timeout';
+	])
+	  or die
+	  "Timed out while waiting for invalidation reason of slot $slot to be set on node $node_name";
+}
+
+done_testing();
-- 
2.34.1

Reply via email to