Hello!

> Sorry for being noisy, just for the case, want to notice that [1] needs
to be addressed before any real usage of conflict resolution.
> [1]:
https://www.postgresql.org/message-id/flat/OS0PR01MB5716E30952F542E256DD72E294802%40OS0PR01MB5716.jpnprd01.prod.outlook.com#8aa2083efa76e6a65f51b8a7fd579a23

I created two tests to reproduce the issue using the new conflict
resolution mechanics (based on the v16 patch).
One test detects an invalid delete_missing instead of
delete_origin_differs. Another test detects an invalid update_missing
instead of update_origin_differs.

You can change my $simulate_race_condition = 1; to 0 to make them pass.

[2] contains an explanation of the issue, and a possible fix can be found
in [3].

Best regards,
Mikhail

[2]:
https://www.postgresql.org/message-id/flat/CANtu0ohUB9ky45iiMAYN1fGyt82%2Bcg%3D%2BUYBom%3DP7drb%2B%3D97G9w%40mail.gmail.com#05a8d2ff9712f477cb92a9dfdab1aab6
[3]:
https://www.postgresql.org/message-id/flat/CANtu0oiziTBM8%2BWDtkktMZv0rhGBroYGWwqSQW%2BMzOWpmk-XEw%40mail.gmail.com#74f5f05594bb6f10b1d882a1ebce377c
From 39497157ac7a5572a698b500001b9b6e2fbf69b4 Mon Sep 17 00:00:00 2001
From: nkey <n...@toloka.ai>
Date: Mon, 21 Oct 2024 02:05:33 +0200
Subject: [PATCH v16] TAP tests to reproduce issue with DirtySnapshot scan [1]
 with new conflict resolution system.

[1]: https://www.postgresql.org/message-id/flat/CANtu0oiziTBM8%2BWDtkktMZv0rhGBroYGWwqSQW%2BMzOWpmk-XEw%40mail.gmail.com#74f5f05594bb6f10b1d882a1ebce377c
---
 src/backend/access/index/indexam.c            |  11 ++
 src/test/subscription/meson.build             |   7 +-
 .../subscription/t/035_delete_missing_race.pl | 138 +++++++++++++++++
 .../subscription/t/036_update_missing_race.pl | 139 ++++++++++++++++++
 4 files changed, 294 insertions(+), 1 deletion(-)
 create mode 100644 src/test/subscription/t/035_delete_missing_race.pl
 create mode 100644 src/test/subscription/t/036_update_missing_race.pl

diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 1859be614c..ff7cad47c7 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -57,6 +57,8 @@
 #include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/injection_point.h"
+#include "replication/worker_internal.h"
 
 
 /* ----------------------------------------------------------------
@@ -696,6 +698,15 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *
 		 * the index.
 		 */
 		Assert(ItemPointerIsValid(&scan->xs_heaptid));
+#ifdef USE_INJECTION_POINTS
+		if (!IsCatalogRelationOid(scan->indexRelation->rd_id)
+				&& scan->xs_snapshot->snapshot_type == SNAPSHOT_DIRTY
+				&& MySubscription)
+		{
+			INJECTION_POINT("index_getnext_slot_before_fetch_apply");
+		}
+#endif
+
 		if (index_fetch_heap(scan, slot))
 			return true;
 	}
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 00ade29b02..aa7e2bd406 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -5,7 +5,10 @@ tests += {
   'sd': meson.current_source_dir(),
   'bd': meson.current_build_dir(),
   'tap': {
-    'env': {'with_icu': icu.found() ? 'yes' : 'no'},
+    'env': {
+      'with_icu': icu.found() ? 'yes' : 'no',
+      'enable_injection_points': get_option('injection_points') ? 'yes' : 'no'
+    },
     'tests': [
       't/001_rep_changes.pl',
       't/002_types.pl',
@@ -41,6 +44,8 @@ tests += {
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
       't/034_conflict_resolver.pl',
+      't/035_delete_missing_race.pl',
+      't/036_update_missing_race.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/035_delete_missing_race.pl b/src/test/subscription/t/035_delete_missing_race.pl
new file mode 100644
index 0000000000..d7cba0fa0c
--- /dev/null
+++ b/src/test/subscription/t/035_delete_missing_race.pl
@@ -0,0 +1,138 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_publisher->check_extension('injection_points') || !$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher with additional index to disable HOT updates
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Create similar table on subscriber
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+$node_publisher->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub
+	 WITH (disable_on_error = true);"); # mark subscription as disable_on_error to keep test simple
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+# Setup CONFLICT RESOLVER
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (delete_missing = 'error', delete_origin_differs = 'apply_remote');"
+);
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;");
+
+
+if ($simulate_race_condition)
+{
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply');
+}
+
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply');
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply');
+		");
+}
+
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+ok(!$node_subscriber->log_contains(
+		qr/ERROR:  conflict detected on relation \"public.conf_tab\": conflict=delete_missing, resolution=error/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation "public.conf_tab": conflict=delete_origin_differs, resolution=apply_remote/,
+		$log_offset), 'correct conflict detected');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  subscription \"tap_sub\" has been disabled because of an error/,
+		$log_offset), 'subscription is disabled');
+
+done_testing();
diff --git a/src/test/subscription/t/036_update_missing_race.pl b/src/test/subscription/t/036_update_missing_race.pl
new file mode 100644
index 0000000000..6316c23a78
--- /dev/null
+++ b/src/test/subscription/t/036_update_missing_race.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_publisher->check_extension('injection_points') || !$node_subscriber->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher with additional index to disable HOT updates
+$node_publisher->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Create similar table on subscriber
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE TABLE conf_tab(a int PRIMARY key, data text);
+	 CREATE INDEX data_index ON conf_tab(data);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+$node_publisher->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+	'postgres',
+	"CREATE SUBSCRIPTION tap_sub
+	 CONNECTION '$publisher_connstr application_name=$appname'
+	 PUBLICATION tap_pub
+	 WITH (disable_on_error = true);"); # mark subscription as disable_on_error to keep test simple
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+# Setup CONFLICT RESOLVER
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub CONFLICT RESOLVER (update_origin_differs = 'apply_remote', update_missing = 'error');"
+);
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+	$node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply');
+}
+
+$psql_session_subscriber->query_until(
+	qr/start/, qq[
+	\\echo start
+	UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+	$node_subscriber->safe_psql('postgres',"
+		SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply');
+		SELECT injection_points_detach('index_getnext_slot_before_fetch_apply');
+		");
+}
+
+$node_subscriber->wait_for_log(
+	qr/conflict detected on relation \"public.conf_tab\"/,
+	$log_offset);
+
+ok(!$node_subscriber->log_contains(
+		qr/ERROR:  conflict detected on relation \"public.conf_tab\": conflict=update_missing, resolution=error/,
+		$log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+		qr/LOG:  conflict detected on relation "public.conf_tab": conflict=update_origin_differs, resolution=apply_remote./,
+		$log_offset), 'correct conflict detected');
+
+ok(!$node_subscriber->log_contains(
+		qr/LOG:  subscription \"tap_sub\" has been disabled because of an error/,
+		$log_offset), 'subscription is disabled');
+
+done_testing();
-- 
2.43.0

Reply via email to