Hi,

Thanks for looking into this.

On Fri, Aug 30, 2024 at 8:13 AM Peter Smith <smithpb2...@gmail.com> wrote:
>
> ======
> Commit message
>
> 1.
> ... introduces a GUC allowing users set inactive timeout.
>
> ~
>
> 1a. You should give the name of the new GUC in the commit message.

Modified.

> 1b. /set/to set/

Reworded the commit message.

> ======
> doc/src/sgml/config.sgml
>
> GUC "replication_slot_inactive_timeout"
>
> 2.
> Invalidates replication slots that are inactive for longer than
> specified amount of time
>
> nit - suggest use similar wording as the prior GUC (wal_sender_timeout):
> Invalidate replication slots that are inactive for longer than this
> amount of time.

Modified.

> 3.
> This invalidation check happens either when the slot is acquired for
> use or during a checkpoint. The time since the slot has become
> inactive is known from its inactive_since value using which the
> timeout is measured.
>
> nit - the wording is too complicated. suggest:
> The timeout check occurs when the slot is next acquired for use, or
> during a checkpoint. The slot's 'inactive_since' field value is when
> the slot became inactive.


> 4.
> Note that the inactive timeout invalidation mechanism is not
> applicable for slots on the standby that are being synced from a
> primary server (whose synced field is true).
>
> nit - that word "whose" seems ambiguous. suggest:
> (e.g. the standby slot has 'synced' field true).

Reworded.

> ======
> doc/src/sgml/system-views.sgml
>
> 5.
> inactive_timeout means that the slot has been inactive for the
> duration specified by replication_slot_inactive_timeout parameter.
>
> nit - suggestion ("longer than"):
> ... the slot has been inactive for longer than the duration specified
> by the replication_slot_inactive_timeout parameter.

Modified.

> ======
> src/backend/replication/slot.c
>
> 6.
>  /* Maximum number of invalidation causes */
> -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
> +#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
>
> IMO this #define belongs in the slot.h, immediately below where the
> enum is defined.

Please check the commit that introduced it -
https://www.postgresql.org/message-id/ZdU3CHqza9XJw4P-%40paquier.xyz.
It is kept in the file where it's used.

> 7. ReplicationSlotAcquire:
>
> I had a fundamental question about this logic.
>
> IIUC the purpose of the patch was to invalidate replication slots that
> have been inactive for too long.
>
> So, it makes sense to me that some periodic processing (e.g.
> CheckPointReplicationSlots) might do a sweep over all the slots, and
> invalidate the too-long-inactive ones that it finds.
>
> OTOH, it seemed quite strange to me that the patch logic is also
> detecting and invalidating inactive slots during the
> ReplicationSlotAcquire function. This is kind of saying "ERROR -
> sorry, because this was inactive for too long you can't have it" at
> the very moment that you wanted to use it again! IIUC such a slot
> would be invalidated by the function InvalidatePossiblyObsoleteSlot(),
> but therein lies my doubt -- how can the slot be considered as
> "obsolete" when we are in the very act of trying to acquire/use it?
>
> I guess it might be argued this is not so different to the scenario of
> attempting to acquire a slot that had been invalidated momentarily
> before during checkpoint processing. But, somehow that scenario seems
> more like bad luck to me, versus ReplicationSlotAcquire() deliberately
> invalidating something we *know* is wanted.

Hm. TBH, there's no real reason for invalidating the slot in
ReplicationSlotAcquire(). My thinking back then was to take this
opportunity to do some work. I agree to leave the invalidation work to
the checkpointer. However, I still think ReplicationSlotAcquire()
should error out if the slot has already been invalidated similar to
"can no longer get changes from replication slot \"%s\" for
wal_removed.

> 8.
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("can no longer get changes from replication slot \"%s\"",
> + NameStr(s->data.name)),
> + errdetail("This slot has been invalidated because it was inactive
> since %s for more than %d seconds specified by
> \"replication_slot_inactive_timeout\".",
> +    timestamptz_to_str(s->inactive_since),
> +    replication_slot_inactive_timeout)));
>
> nit - IMO the info should be split into errdetail + errhint. Like this:
> errdetail("The slot became invalid because it was inactive since %s,
> which is more than %d seconds ago."...)
> errhint("You might need to increase \"%s\".",
> "replication_slot_inactive_timeout")

"invalid" is being covered by errmsg "invalidating obsolete
replication slot", so no need to duplicate it in errdetail.

> 9. ReportSlotInvalidation
>
> + appendStringInfo(&err_detail,
> + _("The slot has been inactive since %s for more than %d seconds
> specified by \"replication_slot_inactive_timeout\"."),
> + timestamptz_to_str(inactive_since),
> + replication_slot_inactive_timeout);
> + break;
>
> IMO this error in ReportSlotInvalidation() should be the same as the
> other one from ReplicationSlotAcquire(), which I suggested above
> (comment #8) should include a hint. Also, including a hint here will
> make this new message consistent with the other errhint (for
> "max_slot_wal_keep_size") that is already in this function.

Not exactly the same but similar. Because ReportSlotInvalidation()
errmsg has an "invalidating" component, whereas errmsg in
ReplicationSlotAcquire doesn't. Please check latest wordings.

> 10. InvalidatePossiblyObsoleteSlot
>
> + if (cause == RS_INVAL_INACTIVE_TIMEOUT &&
> + (replication_slot_inactive_timeout > 0 &&
> + s->inactive_since > 0 &&
> + !(RecoveryInProgress() && s->data.synced)))
>
> 10a. Everything here is && so this has some redundant parentheses.

Removed.

> 10b. Actually, IMO this complicated condition is overkill. Won't it be
> better to just unconditionally assign
> now = GetCurrentTimestamp(); here?

GetCurrentTimestamp() can get costlier on certain platforms. I think
the fields checking in the condition are pretty straight forward -
e.g. !RecoveryInProgress() server not in recovery, !s->data.synced
slot is not being synced and so on. Added a macro
IsInactiveTimeoutSlotInvalidationApplicable() for better readability
in two places.

> 11.
> + * Note that we don't invalidate synced slots because,
> + * they are typically considered not active as they don't
> + * perform logical decoding to produce the changes.
>
> nit - tweaked punctuation

Used the consistent wording in the commit message, docs and code comments.

> 12.
> + * If the slot can be acquired, do so or if the slot is already ours,
> + * then mark it invalidated.  Otherwise we'll signal the owning
> + * process, below, and retry.
>
> nit - tidied this comment. Suggestion:
> If the slot can be acquired, do so and mark it as invalidated. If the
> slot is already ours, mark it as invalidated. Otherwise, we'll signal
> the owning process below and retry.

Modified.

> 13.
> + if (active_pid == 0 ||
> + (MyReplicationSlot != NULL &&
> + MyReplicationSlot == s &&
> + active_pid == MyProcPid))
>
> You are already checking MyReplicationSlot == s here, so that extra
> check for MyReplicationSlot != NULL is redundant, isn't it?

Removed.

> 14. CheckPointReplicationSlots
>
>  /*
> - * Flush all replication slots to disk.
> + * Flush all replication slots to disk. Also, invalidate slots during
> + * non-shutdown checkpoint.
>   *
>   * It is convenient to flush dirty replication slots at the time of 
> checkpoint.
>   * Additionally, in case of a shutdown checkpoint, we also identify the slots
>
> nit - /Also, invalidate slots/Also, invalidate obsolete slots/

Modified.

> 15.
> + {"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
> + gettext_noop("Sets the amount of time to wait before invalidating an "
> + "inactive replication slot."),
>
> nit - that is maybe a bit misleading because IIUC there is no real
> "waiting" happening anywhere. Suggest:
> Sets the amount of time a replication slot can remain inactive before
> it will be invalidated.

Modified.

Please find the attached v44 patch with the above changes. I will
include the 0002 xid_age based invalidation patch later.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From d0ee643d29f7df6fa39581b4c9304f327c79256a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 31 Aug 2024 07:43:22 +0000
Subject: [PATCH v44] Add inactive_timeout based replication slot invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
this GUC is a bit tricky. Because the amount of WAL a database
generates, and the allocated storage for instance will vary
greatly in production, making it difficult to pin down a
one-size-fits-all value.

It is often easy for users to set a timeout of say 1 or 2 or n
days, after which all the inactive slots get invalidated. This
commit introduces a GUC named replication_slot_inactive_timeout.
When set, postgres invalidates slots (during non-shutdown
checkpoints) that are inactive for longer than this amount of
time.

Note that the inactive timeout invalidation mechanism is not
applicable for slots on the standby server that are being synced
from primary server (i.e., standby slots having 'synced' field
true). Because such synced slots are typically considered not
active (for them to be later considered as inactive) as they don't
perform logical decoding to produce the changes.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Reviewed-by: Ajin Cherian, Shveta Malik, Peter Smith
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/CA%2BTgmoZTbaaEjSZUG1FL0mzxAdN3qmXksO3O9_PZhEuXTkVnRQ%40mail.gmail.com
Discussion: https://www.postgresql.org/message-id/202403260841.5jcv7ihniccy%40alvherre.pgsql
---
 doc/src/sgml/config.sgml                      |  36 +++
 doc/src/sgml/system-views.sgml                |   7 +
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   4 +-
 src/backend/replication/slot.c                | 171 ++++++++++-
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   6 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 283 ++++++++++++++++++
 13 files changed, 508 insertions(+), 23 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0aec11f443..970b496e39 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4556,6 +4556,42 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-inactive-timeout" xreflabel="replication_slot_inactive_timeout">
+      <term><varname>replication_slot_inactive_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_inactive_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidates replication slots that are inactive for longer than
+        specified amount of time. If this value is specified without units,
+        it is taken as seconds. A value of zero (which is default) disables
+        the timeout mechanism. This parameter can only be set in
+        the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+
+       <para>
+        This invalidation check happens either when the slot is acquired
+        for use or during checkpoint. The time since the slot has become
+        inactive is known from its
+        <structfield>inactive_since</structfield> value using which the
+        timeout is measured.
+       </para>
+
+       <para>
+        Note that the inactive timeout invalidation mechanism is not
+        applicable for slots on the standby server that are being synced
+        from primary server (i.e., standby slots having
+        <structfield>synced</structfield> field <literal>true</literal>).
+        Because such synced slots are typically considered not active
+        (for them to be later considered as inactive) as they don't perform
+        logical decoding to produce the changes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 634a4c0fab..f230e6e572 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2618,6 +2618,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for longer than the duration specified by
+          <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 51072297fd..25c6a68b54 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -448,7 +448,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -667,7 +667,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0a03776156..26448ecbfd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -131,6 +132,12 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
 #define SLOT_VERSION	5		/* version for new files */
 
+#define IsInactiveTimeoutSlotInvalidationApplicable(s) \
+	(replication_slot_inactive_timeout > 0 && \
+	 s->inactive_since > 0 && \
+	 !RecoveryInProgress() && \
+	 !s->data.synced)
+
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 
@@ -140,6 +147,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			replication_slot_inactive_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -159,6 +167,13 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
+static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+										   ReplicationSlot *s,
+										   XLogRecPtr oldestLSN,
+										   Oid dboid,
+										   TransactionId snapshotConflictHorizon,
+										   bool *invalidated);
+
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
@@ -535,9 +550,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if check_for_invalidation is true and the slot has been
+ * invalidated previously.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +634,25 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * Error out if the slot has been invalidated previously. Because there's
+	 * no use in acquiring the invalidated slot.
+	 */
+	if (check_for_invalidation &&
+		s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+	{
+		Assert(s->inactive_since > 0);
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("can no longer get changes from replication slot \"%s\"",
+						NameStr(s->data.name)),
+				 errdetail("The slot became invalid because it was inactive since %s, which is more than %d seconds ago.",
+						   timestamptz_to_str(s->inactive_since),
+						   replication_slot_inactive_timeout),
+				 errhint("You might need to increase \"%s\".",
+						 "replication_slot_inactive_timeout.")));
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +823,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +850,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1501,10 +1539,11 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since)
 {
 	StringInfoData err_detail;
-	bool		hint = false;
+	StringInfo	err_hint = NULL;
 
 	initStringInfo(&err_detail);
 
@@ -1514,13 +1553,16 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 			{
 				unsigned long long ex = oldestLSN - restart_lsn;
 
-				hint = true;
 				appendStringInfo(&err_detail,
 								 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
 										  "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
 										  ex),
 								 LSN_FORMAT_ARGS(restart_lsn),
 								 ex);
+
+				err_hint = makeStringInfo();
+				appendStringInfo(err_hint,
+								 _("You might need to increase \"%s\"."), "max_slot_wal_keep_size");
 				break;
 			}
 		case RS_INVAL_HORIZON:
@@ -1531,6 +1573,17 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			Assert(inactive_since > 0);
+			appendStringInfo(&err_detail,
+							 _("The slot became inactive since %s, which is more than %d seconds ago."),
+							 timestamptz_to_str(inactive_since),
+							 replication_slot_inactive_timeout);
+
+			err_hint = makeStringInfo();
+			appendStringInfo(err_hint,
+							 _("You might need to increase \"%s\"."), "replication_slot_inactive_timeout");
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1542,9 +1595,12 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 			errmsg("invalidating obsolete replication slot \"%s\"",
 				   NameStr(slotname)),
 			errdetail_internal("%s", err_detail.data),
-			hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
+			(err_hint != NULL) ? errhint("%s", err_hint->data) : 0);
 
 	pfree(err_detail.data);
+
+	if (err_hint != NULL)
+		destroyStringInfo(err_hint);
 }
 
 /*
@@ -1574,6 +1630,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1581,6 +1638,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1591,6 +1649,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_INACTIVE_TIMEOUT &&
+			IsInactiveTimeoutSlotInvalidationApplicable(s))
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1644,6 +1712,41 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+
+					/*
+					 * Quick exit if inactive timeout invalidation mechanism
+					 * is disabled or slot is currently being used or the
+					 * server is in recovery mode or the slot on standby is
+					 * currently being synced from the primary.
+					 *
+					 * Note that the inactive timeout invalidation mechanism
+					 * is not applicable for slots on the standby server that
+					 * are being synced from primary server. Because such
+					 * synced slots are typically considered not active (for
+					 * them to be later considered as inactive) as they don't
+					 * perform logical decoding to produce the changes.
+					 */
+					if (!IsInactiveTimeoutSlotInvalidationApplicable(s))
+						break;
+
+					/*
+					 * Check if the slot needs to be invalidated due to
+					 * replication_slot_inactive_timeout GUC.
+					 */
+					if (TimestampDifferenceExceeds(s->inactive_since, now,
+												   replication_slot_inactive_timeout * 1000))
+					{
+						invalidation_cause = cause;
+						inactive_since = s->inactive_since;
+
+						/*
+						 * Invalidation due to inactive timeout implies that
+						 * no one is using the slot.
+						 */
+						Assert(s->active_pid == 0);
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1669,11 +1772,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so and mark it as invalidated. If
+		 * the slot is already ours, mark it as invalidated. Otherwise, we'll
+		 * signal the owning process below and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot == s &&
+			 active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -1728,7 +1833,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1774,7 +1880,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since);
 
 			/* done with this slot for now */
 			break;
@@ -1797,6 +1904,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1849,7 +1957,8 @@ restart:
 }
 
 /*
- * Flush all replication slots to disk.
+ * Flush all replication slots to disk. Also, invalidate obsolete slots during
+ * non-shutdown checkpoint.
  *
  * It is convenient to flush dirty replication slots at the time of checkpoint.
  * Additionally, in case of a shutdown checkpoint, we also identify the slots
@@ -1907,6 +2016,38 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!is_shutdown)
+	{
+		elog(DEBUG1, "performing replication slot invalidation checks");
+
+		/*
+		 * NB: We will make another pass over replication slots for
+		 * invalidation checks to keep the code simple. Testing shows that
+		 * there is no noticeable overhead (when compared with wal_removed
+		 * invalidation) even if we were to do inactive_timeout invalidation
+		 * of thousands of replication slots here. If it is ever proven that
+		 * this assumption is wrong, we will have to perform the invalidation
+		 * checks in the above for loop with the following changes:
+		 *
+		 * - Acquire ControlLock lock once before the loop.
+		 *
+		 * - Call InvalidatePossiblyObsoleteSlot for each slot.
+		 *
+		 * - Handle the cases in which ControlLock gets released just like
+		 * InvalidateObsoleteReplicationSlots does.
+		 *
+		 * - Avoid saving slot info to disk two times for each invalidated
+		 * slot.
+		 *
+		 * XXX: Should we move inactive_timeout invalidation check closer to
+		 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+		 */
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT,
+										   0,
+										   InvalidOid,
+										   InvalidTransactionId);
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index c7bfbb15e0..b1b7b075bd 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -540,7 +540,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c5f1009f37..61a0e38715 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -844,7 +844,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1462,7 +1462,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 521ec5591c..675eb115ac 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3028,6 +3028,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_inactive_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time a replication slot can remain inactive before "
+						 "it will be invalidated."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&replication_slot_inactive_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..deca3a4aeb 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -335,6 +335,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#replication_slot_inactive_timeout = 0	# in seconds; 0 disables
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 45582cf9d8..431cc08c99 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -56,6 +56,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -233,6 +235,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
+extern PGDLLIMPORT int replication_slot_inactive_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -249,7 +252,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 712924c2fa..301be0f6c1 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -10,6 +10,7 @@ tests += {
        'enable_injection_points': get_option('injection_points') ? 'yes' : 'no',
     },
     'tests': [
+      't/050_invalidate_slots.pl',
       't/001_stream_rep.pl',
       't/002_archiving.pl',
       't/003_recovery_targets.pl',
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..fa6a12a12d
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,283 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# =============================================================================
+# Testcase start
+#
+# Invalidate streaming standby's slot as well as logical
+# failover slot on primary due to replication_slot_inactive_timeout. Also,
+# check the logical failover slot synced on to the standby doesn't invalidate
+# the slot on its own, but gets the invalidated state from the remote slot on
+# the primary.
+
+# Initialize primary
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid unpredictability
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+# Create sync slot on the primary
+$primary->psql('postgres',
+	q{SELECT pg_create_logical_replication_slot('lsub1_sync_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', immediately_reserve := true);
+]);
+
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Synchronize the primary slots to the standby
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced'.
+is( $standby1->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub1_sync_slot' AND synced AND NOT temporary;}
+	),
+	"t",
+	'logical slot lsub1_sync_slot has synced as true on standby');
+
+my $standby1_logstart = -s $standby1->logfile;
+my $logstart = -s $primary->logfile;
+my $inactive_timeout = 2;
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '${inactive_timeout}s';
+]);
+$primary->reload;
+
+# Wait for the logical failover slot to become inactive on the primary. Note
+# that nobody has acquired that slot yet, so due to
+# replication_slot_inactive_timeout setting above it must get invalidated.
+wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart,
+	$inactive_timeout);
+
+# Set timeout on the standby also to check the synced slots don't get
+# invalidated due to timeout on the standby.
+$standby1->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '2s';
+]);
+$standby1->reload;
+
+# Now, sync the logical failover slot from the remote slot on the primary.
+# Note that the remote slot has already been invalidated due to inactive
+# timeout. Now, the standby must also see it as invalidated.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Wait for the inactive replication slot to be invalidated.
+$standby1->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'lsub1_sync_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for lsub1_sync_slot invalidation to be synced on standby";
+
+# Synced slot mustn't get invalidated on the standby even after a checkpoint,
+# it must sync invalidation from the primary. So, we must not see the slot's
+# invalidation message in server log.
+$standby1->safe_psql('postgres', "CHECKPOINT");
+ok( !$standby1->log_contains(
+		"invalidating obsolete replication slot \"lsub1_sync_slot\"",
+		$standby1_logstart),
+	'check that syned lsub1_sync_slot has not been invalidated on the standby'
+);
+
+# Stop standby to make the standby's replication slot on the primary inactive
+$standby1->stop;
+
+# Wait for the standby's replication slot to become inactive
+wait_for_slot_invalidation($primary, 'sb1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end
+# =============================================================================
+
+# =============================================================================
+# Testcase start
+# Invalidate logical subscriber's slot due to replication_slot_inactive_timeout.
+
+my $publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$publisher->reload;
+
+# Create subscriber
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$publisher->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_logical_replication_slot(slot_name := 'lsub1_slot', plugin := 'pgoutput');
+]);
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot', create_slot = false)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+my $result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO ' ${inactive_timeout}s';
+]);
+$publisher->reload;
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# timeout.
+wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
+	$inactive_timeout);
+
+# Testcase end: Invalidate logical subscriber's slot due to
+# replication_slot_inactive_timeout.
+# =============================================================================
+
+sub wait_for_slot_invalidation
+{
+	my ($node, $slot_name, $offset, $inactive_timeout) = @_;
+	my $name = $node->name;
+
+	# Wait for the replication slot to become inactive
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for slot $slot_name to become inactive on node $name";
+
+	# Wait for the replication slot info to be updated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE inactive_since IS NOT NULL
+				AND slot_name = '$slot_name' AND active = 'f';
+	])
+	  or die
+	  "Timed out while waiting for info of slot $slot_name to be updated on node $name";
+
+	# Sleep at least $inactive_timeout duration to avoid multiple checkpoints
+	# for the slot to get invalidated.
+	sleep($inactive_timeout);
+
+	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset);
+
+	# Wait for the inactive replication slot to be invalidated
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND
+			invalidation_reason = 'inactive_timeout';
+	])
+	  or die
+	  "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name";
+
+	# Check that the invalidated slot cannot be acquired
+	my ($result, $stdout, $stderr);
+
+	($result, $stdout, $stderr) = $node->psql(
+		'postgres', qq[
+			SELECT pg_replication_slot_advance('$slot_name', '0/1');
+	]);
+
+	ok( $stderr =~
+		  /can no longer get changes from replication slot "$slot_name"/,
+		"detected error upon trying to acquire invalidated slot $slot_name on node $name"
+	  )
+	  or die
+	  "could not detect error upon trying to acquire invalidated slot $slot_name on node $name";
+}
+
+# Check for invalidation of slot in server log
+sub check_for_slot_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $name = $node->name;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"",
+				$offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated,
+		"check that slot $slot_name invalidation has been logged on node $name"
+	);
+}
+
+done_testing();
-- 
2.43.0

Reply via email to