Hi hackers,

Please find attached a patch to $SUBJECT.

In rare circumstances (and on slow machines) it is possible that a 
xl_running_xacts
is emitted and that the catalog_xmin of a logical slot on the standby advances
past the conflict point. In that case, no conflict is reported and the test
fails. It has been observed several times and the last discussion can be found
in [1].

To avoid the race condition to occur this commit adds an injection point to 
prevent
the catalog_xmin of a logical slot to advance past the conflict point.

While working on this patch, some adjustements have been needed for injection
points (they are proposed in 0001):

- Adds the ability to wakeup() and detach() while ensuring that no process can
wait in between. It's done thanks to a new injection_points_wakeup_detach() 
function that is holding the spinlock during the whole duration.

- If the walsender is waiting on the injection point and that the logical slot
is conflicting, then the walsender process is killed and so it is not able to
"empty" it's injection slot. So the next injection_wait() should reuse this slot
(instead of using an empty one). injection_wait() has been modified that way 
in 0001.

With 0001 in place, then we can make use of an injection point in 
LogicalConfirmReceivedLocation() and update 035_standby_logical_decoding.pl to
prevent the catalog_xmin of a logical slot to advance past the conflict point.

Remarks:

R1. The issue still remains in v16 though (as injection points are available 
since
v17).
R2. 0001 should probably bump the injection point module to 1.1, but shouldn't
have been the case in d28cd3e7b21c?

[1]: 
https://www.postgresql.org/message-id/flat/386386.1737736935%40sss.pgh.pa.us

Looking forward to your feedback,

Regards,

-- 
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
>From 5008207f28c68360ec3d466852697797994bb330 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 10 Feb 2025 13:19:54 +0000
Subject: [PATCH v1 1/2] Add injection_points_wakeup_detach() and modify
 injection_wait()

This commit adds:

- injection_points_wakeup_detach() to be able to wakeup() and detach() while
ensuring that no process can wait in between (holding the spinlock during the
whole duration).
- A check in injection_wait() to search if an existing injection slot with the
same name already exists (If so, reuse it).
---
 .../injection_points--1.0.sql                 | 10 +++
 .../injection_points/injection_points.c       | 65 ++++++++++++++++---
 2 files changed, 65 insertions(+), 10 deletions(-)
 100.0% src/test/modules/injection_points/

diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql
index 5d83f08811b..b4ae67fd97b 100644
--- a/src/test/modules/injection_points/injection_points--1.0.sql
+++ b/src/test/modules/injection_points/injection_points--1.0.sql
@@ -75,6 +75,16 @@ RETURNS void
 AS 'MODULE_PATHNAME', 'injection_points_detach'
 LANGUAGE C STRICT PARALLEL UNSAFE;
 
+--
+-- injection_points_wakeup_detach()
+--
+-- Wakes up and detaches the current action, if any, from the given injection point.
+--
+CREATE FUNCTION injection_points_wakeup_detach(IN point_name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'injection_points_wakeup_detach'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
 --
 -- injection_points_stats_numcalls()
 --
diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index ad528d77752..97397449dfb 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -93,6 +93,9 @@ typedef struct InjectionPointSharedState
 /* Pointer to shared-memory state. */
 static InjectionPointSharedState *inj_state = NULL;
 
+static Datum injection_points_wakeup_internal(FunctionCallInfo fcinfo, bool lock,
+											  bool have_to_wait);
+
 extern PGDLLEXPORT void injection_error(const char *name,
 										const void *private_data);
 extern PGDLLEXPORT void injection_notice(const char *name,
@@ -294,19 +297,30 @@ injection_wait(const char *name, const void *private_data)
 	SpinLockAcquire(&inj_state->lock);
 	for (int i = 0; i < INJ_MAX_WAIT; i++)
 	{
-		if (inj_state->name[i][0] == '\0')
+		/*
+		 * It might be that a waiting process has been killed before being
+		 * able to reset inj_state->name[i][0] to '\0', so checking if there
+		 * is a slot with the same name.
+		 */
+		if (strcmp(name, inj_state->name[i]) == 0)
 		{
 			index = i;
-			strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
-			old_wait_counts = inj_state->wait_counts[i];
 			break;
 		}
+		else if (inj_state->name[i][0] == '\0' && index < 0)
+			index = i;
 	}
+
 	SpinLockRelease(&inj_state->lock);
 
 	if (index < 0)
 		elog(ERROR, "could not find free slot for wait of injection point %s ",
 			 name);
+	else
+	{
+		strlcpy(inj_state->name[index], name, INJ_NAME_MAXLEN);
+		old_wait_counts = inj_state->wait_counts[index];
+	}
 
 	/* And sleep.. */
 	ConditionVariablePrepareToSleep(&inj_state->wait_point);
@@ -427,10 +441,12 @@ injection_points_cached(PG_FUNCTION_ARGS)
 
 /*
  * SQL function for waking up an injection point waiting in injection_wait().
+ * If "lock" is true then the function handles the locking.
+ * If "have_to_wait" is true then the function returns an error if no process
+ * is waiting.
  */
-PG_FUNCTION_INFO_V1(injection_points_wakeup);
-Datum
-injection_points_wakeup(PG_FUNCTION_ARGS)
+static Datum
+injection_points_wakeup_internal(FunctionCallInfo fcinfo, bool lock, bool have_to_wait)
 {
 	char	   *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
 	int			index = -1;
@@ -439,7 +455,8 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
 		injection_init_shmem();
 
 	/* First bump the wait counter for the injection point to wake up */
-	SpinLockAcquire(&inj_state->lock);
+	if (lock)
+		SpinLockAcquire(&inj_state->lock);
 	for (int i = 0; i < INJ_MAX_WAIT; i++)
 	{
 		if (strcmp(name, inj_state->name[i]) == 0)
@@ -450,17 +467,29 @@ injection_points_wakeup(PG_FUNCTION_ARGS)
 	}
 	if (index < 0)
 	{
-		SpinLockRelease(&inj_state->lock);
-		elog(ERROR, "could not find injection point %s to wake up", name);
+		if (lock)
+			SpinLockRelease(&inj_state->lock);
+		if (have_to_wait)
+			elog(ERROR, "could not find injection point %s to wake up", name);
+		else
+			PG_RETURN_VOID();
 	}
 	inj_state->wait_counts[index]++;
-	SpinLockRelease(&inj_state->lock);
+	if (lock)
+		SpinLockRelease(&inj_state->lock);
 
 	/* And broadcast the change to the waiters */
 	ConditionVariableBroadcast(&inj_state->wait_point);
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+	return injection_points_wakeup_internal(fcinfo, true, true);
+}
+
 /*
  * injection_points_set_local
  *
@@ -516,6 +545,22 @@ injection_points_detach(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * SQL function for waking up and dropping an injection point.
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup_detach);
+Datum
+injection_points_wakeup_detach(PG_FUNCTION_ARGS)
+{
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	SpinLockAcquire(&inj_state->lock);
+	injection_points_wakeup_internal(fcinfo, false, false);
+	injection_points_detach(fcinfo);
+	SpinLockRelease(&inj_state->lock);
+	PG_RETURN_VOID();
+}
 
 void
 _PG_init(void)
-- 
2.34.1

>From df29ebe3121e3b924f9e0fe40b05e55dad2bd4c8 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Mon, 10 Feb 2025 13:36:48 +0000
Subject: [PATCH v1 2/2] Fix race conditions in 035_standby_logical_decoding.pl

In rare circumstances (and on slow machines) it is possible that a xl_running_xacts
is emitted and that the catalog_xmin of a logical slot advances past the conflict
point. In that case no conflict is reported and the test fails.

This commit adds a new injection point to prevent the catalog_xmin to advance
past the conflict point.
---
 src/backend/replication/logical/logical.c     |  3 +++
 .../t/035_standby_logical_decoding.pl         | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)
  11.7% src/backend/replication/logical/
  88.2% src/test/recovery/t/

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8ea846bfc3b..578837bfc1c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -41,6 +41,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
 
@@ -1826,6 +1827,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
 
+		INJECTION_POINT("before-confirm-xmin-location");
+
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
 		MyReplicationSlot->data.confirmed_flush = lsn;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 505e85d1eb6..d6b8d28a7e0 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -10,6 +10,11 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
 my ($stdout, $stderr, $cascading_stdout, $cascading_stderr, $handle);
 
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
@@ -256,6 +261,10 @@ sub wait_until_vacuum_can_remove
 	my $xid_horizon = $node_primary->safe_psql('testdb',
 		qq[select pg_snapshot_xmin(pg_current_snapshot());]);
 
+	# Ensure catalog_xmin can not advance
+	$node_standby->safe_psql('testdb',
+		"SELECT injection_points_attach('before-confirm-xmin-location', 'wait');");
+
 	# Launch our sql.
 	$node_primary->safe_psql('testdb', qq[$sql]);
 
@@ -269,6 +278,10 @@ sub wait_until_vacuum_can_remove
 	$node_primary->safe_psql(
 		'testdb', qq[VACUUM $vac_option verbose $to_vac;
 										  INSERT INTO flush_wal DEFAULT VALUES;]);
+
+	# Unlock the catalog_xmin update (if any)
+	$node_standby->safe_psql('testdb',
+		"SELECT injection_points_wakeup_detach('before-confirm-xmin-location');");
 }
 
 ########################
@@ -490,6 +503,12 @@ is($result, qq(10), 'check replicated inserts after subscription on standby');
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 $node_subscriber->stop;
 
+# Create the injection_points extension
+$node_primary->safe_psql('testdb', 'CREATE EXTENSION injection_points;');
+
+# Wait until the extension has been created on the standby
+$node_primary->wait_for_replay_catchup($node_standby);
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL
-- 
2.34.1

Reply via email to