From 2d98e0f46e502f530bdf644c23f8fa2c2983ca12 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 6 Jan 2024 14:45:13 +0000
Subject: [PATCH v1] Add XID based replication slot invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres uses replication slot xmin (the
oldest transaction that this slot needs the database to retain) or
catalog_xmin (the oldest transaction affecting the system catalogs
that this slot needs the database to retain), and a new GUC
max_slot_xid_age. The checkpointer then looks at all replication
slots invalidating the slots based on the age set.
---
 doc/src/sgml/config.sgml                      | 21 +++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 41 ++++++++++
 src/backend/replication/slotfuncs.c           |  3 +
 src/backend/utils/misc/guc_tables.c           | 10 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 81 +++++++++++++++++++
 8 files changed, 170 insertions(+)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 4293b3c182..f0b3a3bf2b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4422,6 +4422,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+      <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f7ce2cbbb4..a69099247a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7056,6 +7056,11 @@ CreateCheckPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7505,6 +7510,11 @@ CreateRestartPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d921ac051f..cffd84c23b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -101,6 +101,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10;
 int			inactive_replication_slot_timeout = 0;
+int			max_slot_xid_age = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropAcquired(void);
@@ -1349,6 +1350,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_INACTIVE_TIMEOUT:
 			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout."));
 			break;
+		case RS_INVAL_XID_AGE:
+			appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1461,6 +1465,42 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 							conflict = cause;
 					}
 					break;
+				case RS_INVAL_XID_AGE:
+					{
+						TransactionId xid_cur = ReadNextTransactionId();
+						TransactionId xid_limit;
+						TransactionId xid_slot;
+
+						if (TransactionIdIsNormal(s->data.xmin))
+						{
+							xid_slot = s->data.xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+						if (TransactionIdIsNormal(s->data.catalog_xmin))
+						{
+							xid_slot = s->data.catalog_xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1607,6 +1647,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index e094225764..4b56f11b57 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -423,6 +423,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			case RS_INVAL_INACTIVE_TIMEOUT:
 				values[i++] = CStringGetTextDatum("inactive_timeout");
 				break;
+			case RS_INVAL_XID_AGE:
+				values[i++] = CStringGetTextDatum("xid_aged");
+				break;
 		}
 
 		if (slot_contents.data.inactive_at > 0)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c7fa14ed6b..ce79436b4d 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2904,6 +2904,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."),
+			gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.")
+		},
+		&max_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 7984873533..2f3b777b5c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -326,6 +326,7 @@
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
 #inactive_replication_slot_timeout = 0	# in seconds; 0 disables
+#max_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index ace946de62..ad7e32678b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -52,6 +52,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* inactive slot timeout has occurred */
 	RS_INVAL_INACTIVE_TIMEOUT,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 /*
@@ -219,6 +221,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT int inactive_replication_slot_timeout;
+extern PGDLLIMPORT int max_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
index bf1cd4bbcc..e7da98412c 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -84,4 +84,85 @@ $primary->poll_query_until('postgres', qq[
 		invalidation_reason = 'inactive_timeout';
 ]) or die "Timed out while waiting for inactive replication slot sb1_slot to be invalidated";
 
+$primary->safe_psql('postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb2_slot');
+]);
+
+$primary->safe_psql('postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO 0;
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name,
+	has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby2->append_conf('postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+$standby2->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+$primary->poll_query_until('postgres', qq[
+	SELECT xmin IS NOT NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb2_slot';
+]) or die "Timed out waiting for slot xmin to advance";
+
+$primary->safe_psql('postgres', qq[
+	ALTER SYSTEM SET max_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby2->stop;
+
+# Do some work to advance xmin
+$primary->safe_psql(
+	'postgres', q{
+do $$
+begin
+  for i in 10000..11000 loop
+    -- use an exception block so that each iteration eats an XID
+    begin
+      insert into tab_int values (i);
+    exception
+      when division_by_zero then null;
+    end;
+  end loop;
+end$$;
+});
+
+$invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb2_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb2_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until('postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb2_slot' AND
+		invalidation_reason = 'xid_aged';
+]) or die "Timed out while waiting for replication slot sb2_slot to be invalidated";
+
 done_testing();
-- 
2.34.1

