Rebased again.
From 69c2d56899f8729dc1d1476f10da50d879f177d9 Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Sat, 23 Nov 2024 13:25:11 +0100
Subject: [PATCH v13 1/2] This patch introduces new injection points and TAP
 tests to reproduce and verify conflict detection issues that arise during
 SNAPSHOT_DIRTY index scans in logical replication.

---
 src/backend/access/index/indexam.c            |   9 ++
 src/backend/access/nbtree/README              |   9 ++
 src/backend/executor/execIndexing.c           |   7 +-
 src/backend/replication/logical/worker.c      |   4 +
 src/include/utils/snapshot.h                  |  14 ++
 src/test/subscription/meson.build             |   4 +
 .../subscription/t/037_delete_missing_race.pl | 137 +++++++++++++++++
 .../subscription/t/038_update_missing_race.pl | 139 +++++++++++++++++
 .../t/039_update_missing_with_retain.pl       | 141 ++++++++++++++++++
 .../t/040_update_missing_simulation.pl        | 123 +++++++++++++++
 10 files changed, 586 insertions(+), 1 deletion(-)
 create mode 100644 src/test/subscription/t/037_delete_missing_race.pl
 create mode 100644 src/test/subscription/t/038_update_missing_race.pl
 create mode 100644 src/test/subscription/t/039_update_missing_with_retain.pl
 create mode 100644 src/test/subscription/t/040_update_missing_simulation.pl

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 0492d92d23b..5987d90ee08 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -52,11 +52,13 @@
 #include "catalog/pg_type.h"
 #include "nodes/execnodes.h"
 #include "pgstat.h"
+#include "replication/logicalworker.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/injection_point.h"
 
 
 /* ----------------------------------------------------------------
@@ -751,6 +753,13 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *
 		 * the index.
 		 */
 		Assert(ItemPointerIsValid(&scan->xs_heaptid));
+#ifdef USE_INJECTION_POINTS
+		if (!IsCatalogRelation(scan->heapRelation) && IsLogicalWorker())
+		{
+			INJECTION_POINT("index_getnext_slot_before_fetch_apply_dirty", NULL);
+		}
+#endif
+
 		if (index_fetch_heap(scan, slot))
 			return true;
 	}
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 53d4a61dc3f..634a3d10bb1 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -103,6 +103,15 @@ We also remember the left-link, and follow it when the scan moves backwards
 (though this requires extra handling to account for concurrent splits of
 the left sibling; see detailed move-left algorithm below).
 
+Despite the described mechanics in place, inconsistent results may still occur
+during non-MVCC scans (SnapshotDirty and SnapshotSelf). This issue can occur if a 
+concurrent transaction deletes a tuple and inserts a new tuple with a new TID in the 
+same page or to the left/right (depending on scan direction) of current scan position.
+If the scan has already visited the page and cached its content in the
+backend-local storage, it might skip the old tuple due to deletion and miss the new 
+tuple because the scan does not re-read the page. Note it affects not only btree
+scan but also a heap scan.
+
 In most cases we release our lock and pin on a page before attempting
 to acquire pin and lock on the page we are moving to.  In a few places
 it is necessary to lock the next page before releasing the current one.
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index ca33a854278..61a5097f789 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -117,6 +117,7 @@
 #include "utils/multirangetypes.h"
 #include "utils/rangetypes.h"
 #include "utils/snapmgr.h"
+#include "utils/injection_point.h"
 
 /* waitMode argument to check_exclusion_or_unique_constraint() */
 typedef enum
@@ -780,7 +781,9 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	/*
 	 * Search the tuples that are in the index for any violations, including
 	 * tuples that aren't visible yet.
-	 */
+	 * Snapshot dirty may miss some tuples in the case of parallel updates,
+	 * but it is acceptable here.
+	*/
 	InitDirtySnapshot(DirtySnapshot);
 
 	for (i = 0; i < indnkeyatts; i++)
@@ -943,6 +946,8 @@ retry:
 
 	ExecDropSingleTupleTableSlot(existing_slot);
 
+	if (!conflict)
+		INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict", NULL);
 	return !conflict;
 }
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5df5a4612b6..4f6976b4af4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -286,6 +286,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/guc.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -2961,7 +2962,10 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 			conflicttuple.origin != replorigin_session_origin)
 			type = CT_UPDATE_DELETED;
 		else
+		{
+			INJECTION_POINT("apply_handle_update_internal_update_missing", NULL);
 			type = CT_UPDATE_MISSING;
+		}
 
 		/* Store the new tuple for conflict reporting */
 		slot_store_data(newslot, relmapentry, newtup);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 0e546ec1497..189dfd71103 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -53,6 +53,13 @@ typedef enum SnapshotType
 	 * - previous commands of this transaction
 	 * - changes made by the current command
 	 *
+	 * Note: such a snapshot may miss an existing logical tuple in case of
+	 * parallel update.
+	 * If a new version of a tuple is inserted into an already processed page
+	 * but the old one marked with committed xmax - snapshot will skip the old
+	 * one and never meet the new one during that scan - resulting in skipping
+	 * that tuple at all.
+	 *
 	 * Does _not_ include:
 	 * - in-progress transactions (as of the current instant)
 	 * -------------------------------------------------------------------------
@@ -82,6 +89,13 @@ typedef enum SnapshotType
 	 * transaction and committed/aborted xacts are concerned.  However, it
 	 * also includes the effects of other xacts still in progress.
 	 *
+	 * Note: such a snapshot may miss an existing logical tuple in case of
+	 * parallel update.
+	 * If a new version of a tuple is inserted into an already processed page but the
+	 * old one marked with committed/in-progress xmax - snapshot will skip the old one
+	 * and never meet the new one during that scan - resulting in skipping that tuple
+	 * at all.
+	 *
 	 * A special hack is that when a snapshot of this type is used to
 	 * determine tuple visibility, the passed-in snapshot struct is used as an
 	 * output argument to return the xids of concurrent xacts that affected
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 85d10a89994..7f29647b538 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -46,6 +46,10 @@ tests += {
       't/034_temporal.pl',
       't/035_conflicts.pl',
       't/036_sequences.pl',
+      't/037_delete_missing_race.pl',
+      't/038_update_missing_race.pl',
+      't/039_update_missing_with_retain.pl',
+      't/040_update_missing_simulation.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/037_delete_missing_race.pl b/src/test/subscription/t/037_delete_missing_race.pl
new file mode 100644
index 00000000000..a319513fd60
--- /dev/null
+++ b/src/test/subscription/t/037_delete_missing_race.pl
@@ -0,0 +1,137 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Delete tuple on publisher
+$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;");
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker',
+		'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Updater tuple on subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# But tuple should be deleted on subscriber any way
+is($node_subscriber->safe_psql('postgres', 'SELECT count(*) from conf_tab'), 0, 'record deleted on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=delete_missing/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=delete_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/038_update_missing_race.pl b/src/test/subscription/t/038_update_missing_race.pl
new file mode 100644
index 00000000000..b71fdc0c136
--- /dev/null
+++ b/src/test/subscription/t/038_update_missing_race.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+	 CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_missing/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/039_update_missing_with_retain.pl b/src/test/subscription/t/039_update_missing_with_retain.pl
new file mode 100644
index 00000000000..6f7dfd28d37
--- /dev/null
+++ b/src/test/subscription/t/039_update_missing_with_retain.pl
@@ -0,0 +1,141 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->append_conf('postgresql.conf',
+	qq(wal_level = 'replica'));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+	 CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub WITH (retain_dead_tuples = true)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+	# Wait apply worker to start the search for the tuple using index
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	# Wake up apply worker
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+		");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_deleted/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+		$log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/040_update_missing_simulation.pl b/src/test/subscription/t/040_update_missing_simulation.pl
new file mode 100644
index 00000000000..322e931c171
--- /dev/null
+++ b/src/test/subscription/t/040_update_missing_simulation.pl
@@ -0,0 +1,123 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use IPC::Run qw(start finish);
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE tbl(a int PRIMARY key, data_pub int);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE tbl(a int PRIMARY key, data_pub int, data_sub int default 0);
+	 CREATE INDEX data_index ON tbl(data_pub);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE tbl");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub");
+
+my $num_rows = 10;
+my $num_updates = 10000;
+my $num_clients = 10;
+$node_publisher->safe_psql('postgres', "INSERT INTO tbl SELECT i, i * i FROM generate_series(1,$num_rows) i");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Prepare small pgbench scripts as files
+my $sub_sql = $node_subscriber->basedir . '/sub_update.sql';
+my $pub_sql = $node_publisher->basedir . '/pub_delete.sql';
+
+open my $fh1, '>', $sub_sql or die $!;
+print $fh1 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_sub = data_sub + 1 WHERE a = :num;\n";
+close $fh1;
+
+open my $fh2, '>', $pub_sql or die $!;
+print $fh2 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_pub = data_pub + 1 WHERE a = :num;\n";
+close $fh2;
+
+my @sub_cmd = (
+	'pgbench',
+	'--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+	'-p', $node_subscriber->port, '-h', $node_subscriber->host, '-f', $sub_sql, 'postgres'
+);
+
+my @pub_cmd = (
+	'pgbench',
+	'--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+	'-p', $node_publisher->port, '-h', $node_publisher->host, '-f', $pub_sql, 'postgres'
+);
+
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# This should never happen
+$node_subscriber->safe_psql('postgres',
+		"SELECT injection_points_attach('apply_handle_update_internal_update_missing', 'error')");
+my $log_offset = -s $node_subscriber->logfile;
+
+# Start both concurrently
+my ($sub_out, $sub_err, $pub_out, $pub_err) = ('', '', '', '');
+my $sub_h = start \@sub_cmd, '>', \$sub_out, '2>', \$sub_err;
+my $pub_h = start \@pub_cmd, '>', \$pub_out, '2>', \$pub_err;
+
+# Wait for completion
+finish $sub_h;
+finish $pub_h;
+
+like($sub_out, qr/actually processed/, 'subscriber pgbench completed');
+like($pub_out, qr/actually processed/, 'publisher pgbench completed');
+
+# Let subscription catch up, then check expectations
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+ok(!$node_subscriber->log_contains(
+		qr/ERROR:  error triggered for injection point apply_handle_update_internal_update_missing/,
+		$log_offset), 'invalid conflict detected');
+
+done_testing();
-- 
2.43.0

From 26baa8be7cfaebef1af04b25a4ea7ca1b1e6d4eb Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Wed, 3 Sep 2025 19:08:55 +0200
Subject: [PATCH v13 2/2] Fix logical replication conflict detection during
 tuple lookup

SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication.
Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq.
---
 src/backend/executor/execReplication.c | 63 ++++++++------------------
 1 file changed, 18 insertions(+), 45 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index def32774c90..1e434ab697a 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -186,8 +186,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	ScanKeyData skey[INDEX_MAX_KEYS];
 	int			skey_attoff;
 	IndexScanDesc scan;
-	SnapshotData snap;
-	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
 	TypeCacheEntry **eq = NULL;
@@ -198,17 +196,17 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 
-	InitDirtySnapshot(snap);
-
 	/* Build scan key. */
 	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
-	/* Start an index scan. */
-	scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+	/* Start an index scan. SnapshotAny will be replaced below. */
+	scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
 
 retry:
 	found = false;
-
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->xs_snapshot = GetActiveSnapshot();
 	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
@@ -229,19 +227,6 @@ retry:
 
 		ExecMaterializeSlot(outslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		found = true;
 		break;
@@ -253,8 +238,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -263,13 +246,15 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
-
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	index_endscan(scan);
+	PopActiveSnapshot();
 
 	/* Don't release lock until commit. */
 	index_close(idxrel, NoLock);
@@ -370,9 +355,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 {
 	TupleTableSlot *scanslot;
 	TableScanDesc scan;
-	SnapshotData snap;
 	TypeCacheEntry **eq;
-	TransactionId xwait;
 	bool		found;
 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 
@@ -380,13 +363,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-	/* Start a heap scan. */
-	InitDirtySnapshot(snap);
-	scan = table_beginscan(rel, &snap, 0, NULL);
+	/* Start a heap scan. SnapshotAny will be replaced below. */
+	scan = table_beginscan(rel, SnapshotAny, 0, NULL);
 	scanslot = table_slot_create(rel, NULL);
 
 retry:
 	found = false;
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->rs_snapshot = GetActiveSnapshot();
 
 	table_rescan(scan, NULL);
 
@@ -399,19 +384,6 @@ retry:
 		found = true;
 		ExecCopySlot(outslot, scanslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		break;
 	}
@@ -422,8 +394,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -432,13 +402,16 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
 
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	table_endscan(scan);
+	PopActiveSnapshot();
 	ExecDropSingleTupleTableSlot(scanslot);
 
 	return found;
-- 
2.43.0

Reply via email to