From bb28e7ba7ba783b0c908412774b1d6ea4cca6dc5 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 29 Aug 2024 05:11:31 +0000
Subject: [PATCH v43 2/2] Add XID age based replication slot invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres introduces a GUC allowing users
set slot XID age. The replication slots whose xmin or catalog_xmin
has reached the age specified by this setting get invalidated.

The invalidation check happens at various locations to help being
as latest as possible, these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.

The invalidation check happens at various locations to help beingas latest as possible, these locations include the following:
- Whenever the slot is acquired and the slot acquisition errors
out if invalidated.
- During checkpoint
- During vacuum (both command-based and autovacuum)

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://www.postgresql.org/message-id/20240327150557.GA3994937%40nathanxps13
Discussion: https://www.postgresql.org/message-id/CA%2BTgmoaRECcnyqxAxUhP5dk2S4HX%3DpGh-p-PkA3uc%2BjG_9hiMw%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  26 ++
 doc/src/sgml/system-views.sgml                |   8 +
 src/backend/commands/vacuum.c                 |  66 ++++
 src/backend/replication/slot.c                | 157 ++++++++-
 src/backend/utils/misc/guc_tables.c           |  10 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 321 +++++++++++++++++-
 8 files changed, 572 insertions(+), 20 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 113303a501..a65c49d9fa 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4589,6 +4589,32 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-slot-xid-age" xreflabel="replication_slot_xid_age">
+      <term><varname>replication_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+
+       <para>
+        This invalidation check happens either when the slot is acquired
+        for use or during vacuum or during checkpoint.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 9e00f7d184..a4f1ab5275 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2625,6 +2625,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           <xref linkend="guc-replication-slot-inactive-timeout"/> parameter.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>xid_aged</literal> means that the slot's
+          <literal>xmin</literal> or <literal>catalog_xmin</literal>
+          has reached the age specified by
+          <xref linkend="guc-replication-slot-xid-age"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 7d8e9d2045..c909c0d001 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -47,6 +47,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/interrupt.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/pmsignal.h"
@@ -116,6 +117,7 @@ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
 static double compute_parallel_delay(void);
 static VacOptValue get_vacoptval_from_boolean(DefElem *def);
 static bool vac_tid_reaped(ItemPointer itemptr, void *state);
+static void try_replication_slot_invalidation(void);
 
 /*
  * GUC check function to ensure GUC value specified is within the allowable
@@ -452,6 +454,61 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
 	MemoryContextDelete(vac_context);
 }
 
+/*
+ * Try invalidating replication slots based on current replication slot xmin
+ * limits once every vacuum cycle.
+ */
+static void
+try_replication_slot_invalidation(void)
+{
+	TransactionId min_slot_xmin;
+	TransactionId min_slot_catalog_xmin;
+	bool		can_invalidate = false;
+	TransactionId cutoff;
+	TransactionId curr;
+
+	curr = ReadNextTransactionId();
+
+	/*
+	 * The cutoff can tell how far we can go back from the current transaction
+	 * id till the age. And then, we check whether or not the xmin or
+	 * catalog_xmin falls within the cutoff; if yes, return true, otherwise
+	 * false.
+	 */
+	cutoff = curr - replication_slot_xid_age;
+
+	if (!TransactionIdIsNormal(cutoff))
+		cutoff = FirstNormalTransactionId;
+
+	ProcArrayGetReplicationSlotXmin(&min_slot_xmin, &min_slot_catalog_xmin);
+
+	/*
+	 * Current replication slot xmin limits can never be larger than the
+	 * current transaction id even in the case of transaction ID wraparound.
+	 */
+	Assert(min_slot_xmin <= curr);
+	Assert(min_slot_catalog_xmin <= curr);
+
+	if (TransactionIdIsNormal(min_slot_xmin) &&
+		TransactionIdPrecedesOrEquals(min_slot_xmin, cutoff))
+		can_invalidate = true;
+	else if (TransactionIdIsNormal(min_slot_catalog_xmin) &&
+			 TransactionIdPrecedesOrEquals(min_slot_catalog_xmin, cutoff))
+		can_invalidate = true;
+
+	if (can_invalidate)
+	{
+		/*
+		 * Note that InvalidateObsoleteReplicationSlots is also called as part
+		 * of CHECKPOINT, and emitting ERRORs from within is avoided already.
+		 * Therefore, there is no concern here that any ERROR from
+		 * invalidating replication slots blocks VACUUM.
+		 */
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+	}
+}
+
 /*
  * Internal entry point for autovacuum and the VACUUM / ANALYZE commands.
  *
@@ -483,6 +540,7 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
 	const char *stmttype;
 	volatile bool in_outer_xact,
 				use_own_xacts;
+	static bool first_time = true;
 
 	Assert(params != NULL);
 
@@ -594,6 +652,14 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
 		CommitTransactionCommand();
 	}
 
+	if (params->options & VACOPT_VACUUM &&
+		first_time &&
+		replication_slot_xid_age > 0)
+	{
+		try_replication_slot_invalidation();
+		first_time = false;
+	}
+
 	/* Turn vacuum cost accounting on or off, and set/clear in_vacuum */
 	PG_TRY();
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 70093500fa..530c121c2f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
 	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
+	[RS_INVAL_XID_AGE] = "xid_aged",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -142,6 +143,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 int			replication_slot_inactive_timeout = 0;
+int			replication_slot_xid_age = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -160,6 +162,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool ReplicationSlotIsXIDAged(ReplicationSlot *slot,
+									 TransactionId *xmin,
+									 TransactionId *catalog_xmin);
 
 static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 										   ReplicationSlot *s,
@@ -636,8 +641,8 @@ retry:
 	 * gets invalidated now or has been invalidated previously, because
 	 * there's no use in acquiring the invalidated slot.
 	 *
-	 * XXX: Currently we check for inactive_timeout invalidation here. We
-	 * might need to check for other invalidations too.
+	 * XXX: Currently we check for inactive_timeout and xid_aged invalidations
+	 * here. We might need to check for other invalidations too.
 	 */
 	if (check_for_invalidation)
 	{
@@ -648,6 +653,22 @@ retry:
 													   InvalidTransactionId,
 													   &invalidated);
 
+		if (!invalidated && released_lock)
+		{
+			/* The slot is still ours */
+			Assert(s->active_pid == MyProcPid);
+
+			/* Reacquire the ControlLock */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+			released_lock = false;
+		}
+
+		if (!invalidated)
+			released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_XID_AGE,
+														   s, 0, InvalidOid,
+														   InvalidTransactionId,
+														   &invalidated);
+
 		/*
 		 * If the slot has been invalidated, recalculate the resource limits.
 		 */
@@ -657,7 +678,8 @@ retry:
 			ReplicationSlotsComputeRequiredLSN();
 		}
 
-		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT ||
+			s->data.invalidated == RS_INVAL_XID_AGE)
 		{
 			/*
 			 * Release the lock if it hasn't been already, to ensure smooth
@@ -665,7 +687,10 @@ retry:
 			 */
 			if (!released_lock)
 				LWLockRelease(ReplicationSlotControlLock);
+		}
 
+		if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT)
+		{
 			Assert(s->inactive_since > 0);
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -675,6 +700,20 @@ retry:
 							   timestamptz_to_str(s->inactive_since),
 							   replication_slot_inactive_timeout)));
 		}
+
+		if (s->data.invalidated == RS_INVAL_XID_AGE)
+		{
+			Assert(TransactionIdIsValid(s->data.xmin) ||
+				   TransactionIdIsValid(s->data.catalog_xmin));
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("can no longer get changes from replication slot \"%s\"",
+							NameStr(s->data.name)),
+					 errdetail("The slot's xmin %u or catalog_xmin %u has reached the age %d specified by \"replication_slot_xid_age\".",
+							   s->data.xmin,
+							   s->data.catalog_xmin,
+							   replication_slot_xid_age)));
+		}
 	}
 
 	if (!released_lock)
@@ -1567,7 +1606,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
 					   TransactionId snapshotConflictHorizon,
-					   TimestampTz inactive_since)
+					   TimestampTz inactive_since,
+					   TransactionId xmin,
+					   TransactionId catalog_xmin)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1604,6 +1645,20 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 							 timestamptz_to_str(inactive_since),
 							 replication_slot_inactive_timeout);
 			break;
+		case RS_INVAL_XID_AGE:
+			Assert(TransactionIdIsValid(xmin) ||
+				   TransactionIdIsValid(catalog_xmin));
+
+			if (TransactionIdIsValid(xmin))
+				appendStringInfo(&err_detail, _("The slot's xmin %u has reached the age %d specified by \"replication_slot_xid_age\"."),
+								 xmin,
+								 replication_slot_xid_age);
+			else if (TransactionIdIsValid(catalog_xmin))
+				appendStringInfo(&err_detail, _("The slot's catalog_xmin %u has reached the age %d specified by \"replication_slot_xid_age\"."),
+								 catalog_xmin,
+								 replication_slot_xid_age);
+
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1648,6 +1703,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
 	TimestampTz inactive_since = 0;
+	TransactionId aged_xmin = InvalidTransactionId;
+	TransactionId aged_catalog_xmin = InvalidTransactionId;
 
 	for (;;)
 	{
@@ -1764,6 +1821,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 						Assert(s->active_pid == 0);
 					}
 					break;
+				case RS_INVAL_XID_AGE:
+					if (ReplicationSlotIsXIDAged(s, &aged_xmin, &aged_catalog_xmin))
+					{
+						Assert(TransactionIdIsValid(aged_xmin) ||
+							   TransactionIdIsValid(aged_catalog_xmin));
+
+						invalidation_cause = cause;
+						break;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1852,7 +1919,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
 									   oldestLSN, snapshotConflictHorizon,
-									   inactive_since);
+									   inactive_since, aged_xmin,
+									   aged_catalog_xmin);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1899,7 +1967,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
 								   oldestLSN, snapshotConflictHorizon,
-								   inactive_since);
+								   inactive_since, aged_xmin,
+								   aged_catalog_xmin);
 
 			/* done with this slot for now */
 			break;
@@ -1923,6 +1992,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -2043,10 +2113,11 @@ CheckPointReplicationSlots(bool is_shutdown)
 		 * NB: We will make another pass over replication slots for
 		 * invalidation checks to keep the code simple. Testing shows that
 		 * there is no noticeable overhead (when compared with wal_removed
-		 * invalidation) even if we were to do inactive_timeout invalidation
-		 * of thousands of replication slots here. If it is ever proven that
-		 * this assumption is wrong, we will have to perform the invalidation
-		 * checks in the above for loop with the following changes:
+		 * invalidation) even if we were to do inactive_timeout/xid_aged
+		 * invalidation of thousands of replication slots here. If it is ever
+		 * proven that this assumption is wrong, we will have to perform the
+		 * invalidation checks in the above for loop with the following
+		 * changes:
 		 *
 		 * - Acquire ControlLock lock once before the loop.
 		 *
@@ -2058,16 +2129,78 @@ CheckPointReplicationSlots(bool is_shutdown)
 		 * - Avoid saving slot info to disk two times for each invalidated
 		 * slot.
 		 *
-		 * XXX: Should we move inactive_timeout inavalidation check closer to
+		 * XXX: Should we move these inavalidation checks closer to
 		 * wal_removed in CreateCheckPoint and CreateRestartPoint?
 		 */
 		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT,
 										   0,
 										   InvalidOid,
 										   InvalidTransactionId);
+
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE,
+										   0,
+										   InvalidOid,
+										   InvalidTransactionId);
 	}
 }
 
+/*
+ * Returns true if the given replication slot's xmin or catalog_xmin age is
+ * more than replication_slot_xid_age.
+ *
+ * Note that the caller must hold the replication slot's spinlock to avoid
+ * race conditions while this function reads xmin and catalog_xmin.
+ */
+static bool
+ReplicationSlotIsXIDAged(ReplicationSlot *slot, TransactionId *xmin,
+						 TransactionId *catalog_xmin)
+{
+	TransactionId cutoff;
+	TransactionId curr;
+
+	if (replication_slot_xid_age == 0)
+		return false;
+
+	curr = ReadNextTransactionId();
+
+	/*
+	 * Replication slot's xmin and catalog_xmin can never be larger than the
+	 * current transaction id even in the case of transaction ID wraparound.
+	 */
+	Assert(slot->data.xmin <= curr);
+	Assert(slot->data.catalog_xmin <= curr);
+
+	/*
+	 * The cutoff can tell how far we can go back from the current transaction
+	 * id till the age. And then, we check whether or not the xmin or
+	 * catalog_xmin falls within the cutoff; if yes, return true, otherwise
+	 * false.
+	 */
+	cutoff = curr - replication_slot_xid_age;
+
+	if (!TransactionIdIsNormal(cutoff))
+		cutoff = FirstNormalTransactionId;
+
+	*xmin = InvalidTransactionId;
+	*catalog_xmin = InvalidTransactionId;
+
+	if (TransactionIdIsNormal(slot->data.xmin) &&
+		TransactionIdPrecedesOrEquals(slot->data.xmin, cutoff))
+	{
+		*xmin = slot->data.xmin;
+		return true;
+	}
+
+	if (TransactionIdIsNormal(slot->data.catalog_xmin) &&
+		TransactionIdPrecedesOrEquals(slot->data.catalog_xmin, cutoff))
+	{
+		*catalog_xmin = slot->data.catalog_xmin;
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Load all replication slots from disk into memory at server startup. This
  * needs to be run before we start crash recovery.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 861692c683..5d10dd1c8a 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3040,6 +3040,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replication_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."),
+			gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.")
+		},
+		&replication_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index deca3a4aeb..3fb6813195 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -336,6 +336,7 @@
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
 #replication_slot_inactive_timeout = 0	# in seconds; 0 disables
+#replication_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index dd56a77547..5ea73f9331 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* inactive slot timeout has occurred */
 	RS_INVAL_INACTIVE_TIMEOUT,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -233,6 +235,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
 extern PGDLLIMPORT int replication_slot_inactive_timeout;
+extern PGDLLIMPORT int replication_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
index 4663019c16..18300cfeca 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -89,7 +89,7 @@ $primary->reload;
 # that nobody has acquired that slot yet, so due to
 # replication_slot_inactive_timeout setting above it must get invalidated.
 wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Set timeout on the standby also to check the synced slots don't get
 # invalidated due to timeout on the standby.
@@ -129,7 +129,7 @@ $standby1->stop;
 
 # Wait for the standby's replication slot to become inactive
 wait_for_slot_invalidation($primary, 'sb1_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Testcase end: Invalidate streaming standby's slot as well as logical failover
 # slot on primary due to replication_slot_inactive_timeout. Also, check the
@@ -197,15 +197,280 @@ $subscriber->stop;
 # Wait for the replication slot to become inactive and then invalidated due to
 # timeout.
 wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart,
-	$inactive_timeout);
+	$inactive_timeout, 'inactive_timeout');
 
 # Testcase end: Invalidate logical subscriber's slot due to
 # replication_slot_inactive_timeout.
 # =============================================================================
 
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot due to replication_slot_xid_age
+# GUC.
+
+# Prepare for the next test
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET replication_slot_inactive_timeout TO '0';
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby2->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb2_slot', immediately_reserve := true);
+]);
+
+$standby2->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NOT NULL AND catalog_xmin IS NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb2_slot';
+]) or die "Timed out waiting for slot sb2_slot xmin to advance";
+
+$primary->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby2->stop;
+
+$logstart = -s $primary->logfile;
+
+# Do some work to advance xids on primary
+advance_xids($primary, 'tab_int');
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($primary, 'sb2_slot', $logstart, 0, 'xid_aged');
+
+# Testcase end: Invalidate streaming standby's slot due to replication_slot_xid_age
+# GUC.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to
+# replication_slot_xid_age GUC.
+
+$publisher = $primary;
+$publisher->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$publisher->reload;
+
+$subscriber->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+));
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl2 VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+$publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub2 FOR TABLE test_tbl2");
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (slot_name = 'lsub2_slot')"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub2');
+
+$result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl2");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NULL AND catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'lsub2_slot';
+]) or die "Timed out waiting for slot lsub2_slot catalog_xmin to advance";
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Do some work to advance xids on publisher
+advance_xids($publisher, 'test_tbl2');
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($publisher, 'lsub2_slot', $logstart, 0,
+	'xid_aged');
+
+# Testcase end: Invalidate logical subscriber's slot due to
+# replication_slot_xid_age GUC.
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical slot on standby that's being synced from
+# the primary due to replication_slot_xid_age GUC.
+
+$publisher = $primary;
+
+# Prepare for the next test
+$publisher->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 0;
+]);
+$publisher->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby3 = PostgreSQL::Test::Cluster->new('standby3');
+$standby3->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+$standby3->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb3_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb3_slot', immediately_reserve := true);
+]);
+
+$standby3->start;
+
+my $standby3_logstart = -s $standby3->logfile;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby3);
+
+$subscriber->append_conf(
+	'postgresql.conf', qq(
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+));
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)");
+
+# Insert some data
+$publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl3 VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+$publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub3 FOR TABLE test_tbl3");
+
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (slot_name = 'lsub3_sync_slot', failover = true)"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub3');
+
+$result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl3");
+
+is($result, qq(5), "check initial copy was done");
+
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NULL AND catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'lsub3_sync_slot';
+])
+  or die "Timed out waiting for slot lsub3_sync_slot catalog_xmin to advance";
+
+# Synchronize the primary server slots to the standby
+$standby3->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slot is created on the standby and is
+# flagged as 'synced' and has got catalog_xmin from the primary.
+is( $standby3->safe_psql(
+		'postgres',
+		q{SELECT count(*) = 1 FROM pg_replication_slots
+		  WHERE slot_name = 'lsub3_sync_slot' AND synced AND NOT temporary AND
+			xmin IS NULL AND catalog_xmin IS NOT NULL;}
+	),
+	"t",
+	'logical slot has synced as true on standby');
+
+my $primary_catalog_xmin = $primary->safe_psql('postgres',
+	"SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;"
+);
+
+my $stabdby3_catalog_xmin = $standby3->safe_psql('postgres',
+	"SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;"
+);
+
+is($primary_catalog_xmin, $stabdby3_catalog_xmin,
+	"check catalog_xmin are same for primary slot and synced slot");
+
+# Enable XID age based invalidation on the standby. Note that we disabled the
+# same on the primary to check if the invalidation occurs for synced slot on
+# the standby.
+$standby3->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET replication_slot_xid_age = 500;
+]);
+$standby3->reload;
+
+$logstart = -s $standby3->logfile;
+
+# Do some work to advance xids on primary
+advance_xids($primary, 'test_tbl3');
+
+# Wait for standby to catch up with the above work
+$primary->wait_for_catchup($standby3);
+
+# Wait for the replication slot to become inactive and then invalidated due to
+# XID age.
+wait_for_slot_invalidation($standby3, 'lsub3_sync_slot', $logstart, 0,
+	'xid_aged');
+
+# Note that the replication slot on the primary is still active
+$result = $primary->safe_psql('postgres',
+	"SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND invalidation_reason IS NULL;"
+);
+
+is($result, 't', "check lsub3_sync_slot is still active on primary");
+
+# Testcase end: Invalidate logical slot on standby that's being synced from
+# the primary due to replication_slot_xid_age GUC.
+# =============================================================================
+
 sub wait_for_slot_invalidation
 {
-	my ($node, $slot_name, $offset, $inactive_timeout) = @_;
+	my ($node, $slot_name, $offset, $inactive_timeout, $reason) = @_;
 	my $name = $node->name;
 
 	# Wait for the replication slot to become inactive
@@ -231,14 +496,15 @@ sub wait_for_slot_invalidation
 	# for the slot to get invalidated.
 	sleep($inactive_timeout);
 
-	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset);
+	check_for_slot_invalidation_in_server_log($node, $slot_name, $offset,
+		$reason);
 
 	# Wait for the inactive replication slot to be invalidated
 	$node->poll_query_until(
 		'postgres', qq[
 		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
 			WHERE slot_name = '$slot_name' AND
-			invalidation_reason = 'inactive_timeout';
+			invalidation_reason = '$reason';
 	])
 	  or die
 	  "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name";
@@ -262,15 +528,33 @@ sub wait_for_slot_invalidation
 # Check for invalidation of slot in server log
 sub check_for_slot_invalidation_in_server_log
 {
-	my ($node, $slot_name, $offset) = @_;
+	my ($node, $slot_name, $offset, $reason) = @_;
 	my $name = $node->name;
 	my $invalidated = 0;
+	my $isrecovery =
+	  $node->safe_psql('postgres', "SELECT pg_is_in_recovery()");
+
+	chomp($isrecovery);
 
 	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 	{
-		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($reason eq 'xid_aged' && $isrecovery eq 'f')
+		{
+			$node->safe_psql('postgres', "VACUUM");
+		}
+		else
+		{
+			$node->safe_psql('postgres', "CHECKPOINT");
+		}
+
 		if ($node->log_contains(
 				"invalidating obsolete replication slot \"$slot_name\"",
+				$offset)
+			|| $node->log_contains(
+				"The slot's xmin .* has reached the age .* specified by \"replication_slot_xid_age\".",
+				$offset)
+			|| $node->log_contains(
+				"The slot's catalog_xmin .* has reached the age .* specified by \"replication_slot_xid_age\".",
 				$offset))
 		{
 			$invalidated = 1;
@@ -283,4 +567,25 @@ sub check_for_slot_invalidation_in_server_log
 	);
 }
 
+# Do some work for advancing xids on a given node
+sub advance_xids
+{
+	my ($node, $table_name) = @_;
+
+	$node->safe_psql(
+		'postgres', qq[
+		do \$\$
+		begin
+		for i in 10000..11000 loop
+			-- use an exception block so that each iteration eats an XID
+			begin
+			insert into $table_name values (i);
+			exception
+			when division_by_zero then null;
+			end;
+		end loop;
+		end\$\$;
+	]);
+}
+
 done_testing();
-- 
2.43.0

