From 483824a8b3248fe08b6bdf22c68bada4f0549212 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sat, 16 Mar 2024 03:39:35 +0000
Subject: [PATCH v11 1/4] Track invalidation_reason in pg_replication_slots

Up until now, reason for replication slot invalidation is not
tracked in pg_replication_slots. A recent commit 007693f2a added
conflict_reason to show the reasons for slot invalidation, but
only for logical slots.

This commit adds a new column to show invalidation reasons for
both physical and logical slots. And, this commit also turns
conflict_reason text column to conflicting boolean column
(effectively reverting commit 007693f2a). One now can look at the
new invalidation_reason column for logical slots conflict with
recovery.
---
 doc/src/sgml/ref/pgupgrade.sgml               |  4 +-
 doc/src/sgml/system-views.sgml                | 63 +++++++++++--------
 src/backend/catalog/system_views.sql          |  5 +-
 src/backend/replication/logical/slotsync.c    |  2 +-
 src/backend/replication/slot.c                |  8 +--
 src/backend/replication/slotfuncs.c           | 25 +++++---
 src/bin/pg_upgrade/info.c                     |  4 +-
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/replication/slot.h                |  2 +-
 .../t/035_standby_logical_decoding.pl         | 39 ++++++------
 .../t/040_standby_failover_slots_sync.pl      |  4 +-
 src/test/regress/expected/rules.out           |  7 ++-
 12 files changed, 97 insertions(+), 72 deletions(-)

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 58c6c2df8b..8de52bf752 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -453,8 +453,8 @@ make prefix=/usr/local/pgsql.new install
       <para>
        All slots on the old cluster must be usable, i.e., there are no slots
        whose
-       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflict_reason</structfield>
-       is not <literal>NULL</literal>.
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
+       is not <literal>true</literal>.
       </para>
      </listitem>
      <listitem>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index be90edd0e2..e685921847 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2525,34 +2525,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>conflict_reason</structfield> <type>text</type>
+       <structfield>conflicting</structfield> <type>bool</type>
       </para>
       <para>
-       The reason for the logical slot's conflict with recovery. It is always
-       NULL for physical slots, as well as for logical slots which are not
-       invalidated. The non-NULL values indicate that the slot is marked
-       as invalidated. Possible values are:
-       <itemizedlist spacing="compact">
-        <listitem>
-         <para>
-          <literal>wal_removed</literal> means that the required WAL has been
-          removed.
-         </para>
-        </listitem>
-        <listitem>
-         <para>
-          <literal>rows_removed</literal> means that the required rows have
-          been removed.
-         </para>
-        </listitem>
-        <listitem>
-         <para>
-          <literal>wal_level_insufficient</literal> means that the
-          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
-          perform logical decoding.
-         </para>
-        </listitem>
-       </itemizedlist>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). When this column is true, check
+       <structfield>invalidation_reason</structfield> column for the conflict
+       reason.
       </para></entry>
      </row>
 
@@ -2581,6 +2560,38 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>invalidation_reason</structfield> <type>text</type>
+      </para>
+      <para>
+       The reason for the slot's invalidation. It is set for both logical and
+       physical slots. <literal>NULL</literal> if the slot is not invalidated.
+       Possible values are:
+       <itemizedlist spacing="compact">
+        <listitem>
+         <para>
+          <literal>wal_removed</literal> means that the required WAL has been
+          removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>rows_removed</literal> means that the required rows have
+          been removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>wal_level_insufficient</literal> means that the
+          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
+          perform logical decoding.
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..cd22dad959 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,9 +1023,10 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.conflict_reason,
+            L.conflicting,
             L.failover,
-            L.synced
+            L.synced,
+            L.invalidation_reason
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 5074c8409f..260632cfdd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -668,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, conflict_reason"
+		" database, invalidation_reason"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 91ca397857..4f1a17f6ce 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -2356,21 +2356,21 @@ RestoreSlotFromDisk(const char *name)
 }
 
 /*
- * Maps a conflict reason for a replication slot to
+ * Maps a invalidation reason for a replication slot to
  * ReplicationSlotInvalidationCause.
  */
 ReplicationSlotInvalidationCause
-GetSlotInvalidationCause(const char *conflict_reason)
+GetSlotInvalidationCause(const char *invalidation_reason)
 {
 	ReplicationSlotInvalidationCause cause;
 	ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
 	bool		found PG_USED_FOR_ASSERTS_ONLY = false;
 
-	Assert(conflict_reason);
+	Assert(invalidation_reason);
 
 	for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
 	{
-		if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
+		if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
 		{
 			found = true;
 			result = cause;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad79e1fccd..b5a638edea 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 17
+#define PG_GET_REPLICATION_SLOTS_COLS 18
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		int			i;
+		ReplicationSlotInvalidationCause cause;
 
 		if (!slot->in_use)
 			continue;
@@ -409,22 +410,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
-		if (slot_contents.data.database == InvalidOid)
+		cause = slot_contents.data.invalidated;
+
+		if (SlotIsPhysical(&slot_contents))
 			nulls[i++] = true;
 		else
 		{
-			ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated;
-
-			if (cause == RS_INVAL_NONE)
-				nulls[i++] = true;
+			/*
+			 * rows_removed and wal_level_insufficient are only two reasons
+			 * for the logical slot's conflict with recovery.
+			 */
+			if (cause == RS_INVAL_HORIZON ||
+				cause == RS_INVAL_WAL_LEVEL)
+				values[i++] = BoolGetDatum(true);
 			else
-				values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+				values[i++] = BoolGetDatum(false);
 		}
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
 
 		values[i++] = BoolGetDatum(slot_contents.data.synced);
 
+		if (cause == RS_INVAL_NONE)
+			nulls[i++] = true;
+		else
+			values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index b5b8d11602..34a157f792 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -676,13 +676,13 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, conflict_reason IS NOT NULL as invalid "
+							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
 							"temporary IS FALSE;",
 							live_check ? "FALSE" :
-							"(CASE WHEN conflict_reason IS NOT NULL THEN FALSE "
+							"(CASE WHEN conflicting THEN FALSE "
 							"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
 							"END)");
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 700f7daf7b..63fd0b4cd7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11123,9 +11123,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 425effad21..7f25a083ee 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -273,7 +273,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
-			GetSlotInvalidationCause(const char *conflict_reason);
+			GetSlotInvalidationCause(const char *invalidation_reason);
 
 extern bool SlotExistsInStandbySlotNames(const char *slot_name);
 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 88b03048c4..2203841ca1 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -168,7 +168,7 @@ sub change_hot_standby_feedback_and_wait_for_xmins
 	}
 }
 
-# Check conflict_reason in pg_replication_slots.
+# Check reason for conflict in pg_replication_slots.
 sub check_slots_conflict_reason
 {
 	my ($slot_prefix, $reason) = @_;
@@ -178,15 +178,15 @@ sub check_slots_conflict_reason
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = '$active_slot';));
+			 select invalidation_reason from pg_replication_slots where slot_name = '$active_slot' and conflicting;));
 
-	is($res, "$reason", "$active_slot conflict_reason is $reason");
+	is($res, "$reason", "$active_slot reason for conflict is $reason");
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = '$inactive_slot';));
+			 select invalidation_reason from pg_replication_slots where slot_name = '$inactive_slot' and conflicting;));
 
-	is($res, "$reason", "$inactive_slot conflict_reason is $reason");
+	is($res, "$reason", "$inactive_slot reason for conflict is $reason");
 }
 
 # Drop the slots, re-create them, change hot_standby_feedback,
@@ -293,13 +293,13 @@ $node_primary->safe_psql('testdb',
 	qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
 );
 
-# Check conflict_reason is NULL for physical slot
+# Check conflicting is NULL for physical slot
 $res = $node_primary->safe_psql(
 	'postgres', qq[
-		 SELECT conflict_reason is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
+		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
 );
 
-is($res, 't', "Physical slot reports conflict_reason as NULL");
+is($res, 't', "Physical slot reports conflicting as NULL");
 
 my $backup_name = 'b1';
 $node_primary->backup($backup_name);
@@ -524,7 +524,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 # Ensure that replication slot stats are not removed after invalidation.
@@ -551,7 +551,7 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 ##################################################
 $node_standby->restart;
 
-# Verify conflict_reason is retained across a restart.
+# Verify reason for conflict is retained across a restart.
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 ##################################################
@@ -560,7 +560,8 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 # Get the restart_lsn from an invalidated slot
 my $restart_lsn = $node_standby->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflict_reason is not null;"
+	"SELECT restart_lsn FROM pg_replication_slots
+		WHERE slot_name = 'vacuum_full_activeslot' AND conflicting;"
 );
 
 chomp($restart_lsn);
@@ -611,7 +612,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('row_removal_', 'rows_removed');
 
 $handle =
@@ -647,7 +648,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_for_invalidation('shared_row_removal_', $logstart,
 	'with vacuum on pg_authid');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('shared_row_removal_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
@@ -696,14 +697,14 @@ ok( $node_standby->poll_query_until(
 	'confl_active_logicalslot not updated'
 ) or die "Timed out waiting confl_active_logicalslot to be updated";
 
-# Verify slots are reported as non conflicting in pg_replication_slots
+# Verify slots are reported as valid in pg_replication_slots
 is( $node_standby->safe_psql(
 		'postgres',
 		q[select bool_or(conflicting) from
-		  (select conflict_reason is not NULL as conflicting
-		   from pg_replication_slots WHERE slot_type = 'logical')]),
+		  (select conflicting from pg_replication_slots
+			where slot_type = 'logical')]),
 	'f',
-	'Logical slots are reported as non conflicting');
+	'Logical slots are reported as valid');
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 0);
@@ -739,7 +740,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('pruning_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
@@ -783,7 +784,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
-# Verify conflict_reason is 'wal_level_insufficient' in pg_replication_slots
+# Verify reason for conflict is 'wal_level_insufficient' in pg_replication_slots
 check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
 
 $handle =
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 0ea1f3d323..f47bfd78eb 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -228,7 +228,7 @@ $standby1->safe_psql('postgres', "CHECKPOINT");
 # Check if the synced slot is invalidated
 is( $standby1->safe_psql(
 		'postgres',
-		q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+		q{SELECT invalidation_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
 	),
 	"t",
 	'synchronized slot has been invalidated');
@@ -274,7 +274,7 @@ $standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/
 # flagged as 'synced'
 is( $standby1->safe_psql(
 		'postgres',
-		q{SELECT conflict_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+		q{SELECT invalidation_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
 	),
 	"t",
 	'logical slot is re-synced');
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 0cd2c64fca..055bec068d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,10 +1473,11 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.conflict_reason,
+    l.conflicting,
     l.failover,
-    l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced)
+    l.synced,
+    l.invalidation_reason
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

