Fixed race in tests caused
https://cirrus-ci.com/task/5815107659235328?logs=test_world#L324 to
fail.
From 2a6a121c7cfe4823db0f8aec931c5dbcab672616 Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Wed, 3 Sep 2025 19:08:55 +0200
Subject: [PATCH v14 2/2] Fix logical replication conflict detection during
tuple lookup
SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication.
Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq.
---
src/backend/executor/execReplication.c | 63 ++++++++------------------
1 file changed, 18 insertions(+), 45 deletions(-)
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b409d4ecbf5..0de40aec733 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -186,8 +186,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
ScanKeyData skey[INDEX_MAX_KEYS];
int skey_attoff;
IndexScanDesc scan;
- SnapshotData snap;
- TransactionId xwait;
Relation idxrel;
bool found;
TypeCacheEntry **eq = NULL;
@@ -198,17 +196,17 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
- InitDirtySnapshot(snap);
-
/* Build scan key. */
skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
- /* Start an index scan. */
- scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+ /* Start an index scan. SnapshotAny will be replaced below. */
+ scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
retry:
found = false;
-
+ PushActiveSnapshot(GetLatestSnapshot());
+ /* Update the actual scan snapshot each retry */
+ scan->xs_snapshot = GetActiveSnapshot();
index_rescan(scan, skey, skey_attoff, NULL, 0);
/* Try to find the tuple */
@@ -229,19 +227,6 @@ retry:
ExecMaterializeSlot(outslot);
- xwait = TransactionIdIsValid(snap.xmin) ?
- snap.xmin : snap.xmax;
-
- /*
- * If the tuple is locked, wait for locking transaction to finish and
- * retry.
- */
- if (TransactionIdIsValid(xwait))
- {
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
- }
-
/* Found our tuple and it's not locked */
found = true;
break;
@@ -253,8 +238,6 @@ retry:
TM_FailureData tmfd;
TM_Result res;
- PushActiveSnapshot(GetLatestSnapshot());
-
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
@@ -263,13 +246,15 @@ retry:
0 /* don't follow updates */ ,
&tmfd);
- PopActiveSnapshot();
-
if (should_refetch_tuple(res, &tmfd))
+ {
+ PopActiveSnapshot();
goto retry;
+ }
}
index_endscan(scan);
+ PopActiveSnapshot();
/* Don't release lock until commit. */
index_close(idxrel, NoLock);
@@ -370,9 +355,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
{
TupleTableSlot *scanslot;
TableScanDesc scan;
- SnapshotData snap;
TypeCacheEntry **eq;
- TransactionId xwait;
bool found;
TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
@@ -380,13 +363,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
- /* Start a heap scan. */
- InitDirtySnapshot(snap);
- scan = table_beginscan(rel, &snap, 0, NULL);
+ /* Start a heap scan. SnapshotAny will be replaced below. */
+ scan = table_beginscan(rel, SnapshotAny, 0, NULL);
scanslot = table_slot_create(rel, NULL);
retry:
found = false;
+ PushActiveSnapshot(GetLatestSnapshot());
+ /* Update the actual scan snapshot each retry */
+ scan->rs_snapshot = GetActiveSnapshot();
table_rescan(scan, NULL);
@@ -399,19 +384,6 @@ retry:
found = true;
ExecCopySlot(outslot, scanslot);
- xwait = TransactionIdIsValid(snap.xmin) ?
- snap.xmin : snap.xmax;
-
- /*
- * If the tuple is locked, wait for locking transaction to finish and
- * retry.
- */
- if (TransactionIdIsValid(xwait))
- {
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
- }
-
/* Found our tuple and it's not locked */
break;
}
@@ -422,8 +394,6 @@ retry:
TM_FailureData tmfd;
TM_Result res;
- PushActiveSnapshot(GetLatestSnapshot());
-
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
@@ -432,13 +402,16 @@ retry:
0 /* don't follow updates */ ,
&tmfd);
- PopActiveSnapshot();
if (should_refetch_tuple(res, &tmfd))
+ {
+ PopActiveSnapshot();
goto retry;
+ }
}
table_endscan(scan);
+ PopActiveSnapshot();
ExecDropSingleTupleTableSlot(scanslot);
return found;
--
2.48.1
From 66ede0f9001d9e841e9249091869dbde58df4d5c Mon Sep 17 00:00:00 2001
From: nkey <[email protected]>
Date: Sat, 23 Nov 2024 13:25:11 +0100
Subject: [PATCH v14 1/2] This patch introduces new injection points and TAP
tests to reproduce and verify conflict detection issues that arise during
SNAPSHOT_DIRTY index scans in logical replication.
---
src/backend/access/index/indexam.c | 9 ++
src/backend/access/nbtree/README | 9 ++
src/backend/executor/execIndexing.c | 7 +-
src/backend/replication/logical/worker.c | 4 +
src/include/utils/snapshot.h | 14 ++
src/test/subscription/meson.build | 4 +
.../subscription/t/036_delete_missing_race.pl | 139 +++++++++++++++++
.../subscription/t/037_update_missing_race.pl | 141 +++++++++++++++++
.../t/038_update_missing_with_retain.pl | 143 ++++++++++++++++++
.../t/039_update_missing_simulation.pl | 125 +++++++++++++++
10 files changed, 594 insertions(+), 1 deletion(-)
create mode 100644 src/test/subscription/t/036_delete_missing_race.pl
create mode 100644 src/test/subscription/t/037_update_missing_race.pl
create mode 100644 src/test/subscription/t/038_update_missing_with_retain.pl
create mode 100644 src/test/subscription/t/039_update_missing_simulation.pl
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 86d11f4ec79..a503fa02ac5 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -52,11 +52,13 @@
#include "catalog/pg_type.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
+#include "replication/logicalworker.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "utils/ruleutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+#include "utils/injection_point.h"
/* ----------------------------------------------------------------
@@ -751,6 +753,13 @@ index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *
* the index.
*/
Assert(ItemPointerIsValid(&scan->xs_heaptid));
+#ifdef USE_INJECTION_POINTS
+ if (!IsCatalogRelation(scan->heapRelation) && IsLogicalWorker())
+ {
+ INJECTION_POINT("index_getnext_slot_before_fetch_apply_dirty", NULL);
+ }
+#endif
+
if (index_fetch_heap(scan, slot))
return true;
}
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 53d4a61dc3f..634a3d10bb1 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -103,6 +103,15 @@ We also remember the left-link, and follow it when the scan moves backwards
(though this requires extra handling to account for concurrent splits of
the left sibling; see detailed move-left algorithm below).
+Despite the described mechanics in place, inconsistent results may still occur
+during non-MVCC scans (SnapshotDirty and SnapshotSelf). This issue can occur if a
+concurrent transaction deletes a tuple and inserts a new tuple with a new TID in the
+same page or to the left/right (depending on scan direction) of current scan position.
+If the scan has already visited the page and cached its content in the
+backend-local storage, it might skip the old tuple due to deletion and miss the new
+tuple because the scan does not re-read the page. Note it affects not only btree
+scan but also a heap scan.
+
In most cases we release our lock and pin on a page before attempting
to acquire pin and lock on the page we are moving to. In a few places
it is necessary to lock the next page before releasing the current one.
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index ca33a854278..61a5097f789 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -117,6 +117,7 @@
#include "utils/multirangetypes.h"
#include "utils/rangetypes.h"
#include "utils/snapmgr.h"
+#include "utils/injection_point.h"
/* waitMode argument to check_exclusion_or_unique_constraint() */
typedef enum
@@ -780,7 +781,9 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
/*
* Search the tuples that are in the index for any violations, including
* tuples that aren't visible yet.
- */
+ * Snapshot dirty may miss some tuples in the case of parallel updates,
+ * but it is acceptable here.
+ */
InitDirtySnapshot(DirtySnapshot);
for (i = 0; i < indnkeyatts; i++)
@@ -943,6 +946,8 @@ retry:
ExecDropSingleTupleTableSlot(existing_slot);
+ if (!conflict)
+ INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict", NULL);
return !conflict;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ee6ac22329f..cccbaeedfd7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -277,6 +277,7 @@
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/guc.h"
+#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -2946,7 +2947,10 @@ apply_handle_update_internal(ApplyExecutionData *edata,
conflicttuple.origin != replorigin_session_origin)
type = CT_UPDATE_DELETED;
else
+ {
+ INJECTION_POINT("apply_handle_update_internal_update_missing", NULL);
type = CT_UPDATE_MISSING;
+ }
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 0e546ec1497..189dfd71103 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -53,6 +53,13 @@ typedef enum SnapshotType
* - previous commands of this transaction
* - changes made by the current command
*
+ * Note: such a snapshot may miss an existing logical tuple in case of
+ * parallel update.
+ * If a new version of a tuple is inserted into an already processed page
+ * but the old one marked with committed xmax - snapshot will skip the old
+ * one and never meet the new one during that scan - resulting in skipping
+ * that tuple at all.
+ *
* Does _not_ include:
* - in-progress transactions (as of the current instant)
* -------------------------------------------------------------------------
@@ -82,6 +89,13 @@ typedef enum SnapshotType
* transaction and committed/aborted xacts are concerned. However, it
* also includes the effects of other xacts still in progress.
*
+ * Note: such a snapshot may miss an existing logical tuple in case of
+ * parallel update.
+ * If a new version of a tuple is inserted into an already processed page but the
+ * old one marked with committed/in-progress xmax - snapshot will skip the old one
+ * and never meet the new one during that scan - resulting in skipping that tuple
+ * at all.
+ *
* A special hack is that when a snapshot of this type is used to
* determine tuple visibility, the passed-in snapshot struct is used as an
* output argument to return the xids of concurrent xacts that affected
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 20b4e523d93..4f9a5c9209d 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -45,6 +45,10 @@ tests += {
't/033_run_as_table_owner.pl',
't/034_temporal.pl',
't/035_conflicts.pl',
+ 't/036_delete_missing_race.pl',
+ 't/037_update_missing_race.pl',
+ 't/038_update_missing_with_retain.pl',
+ 't/039_update_missing_simulation.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/036_delete_missing_race.pl b/src/test/subscription/t/036_delete_missing_race.pl
new file mode 100644
index 00000000000..51dd351dc10
--- /dev/null
+++ b/src/test/subscription/t/036_delete_missing_race.pl
@@ -0,0 +1,139 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+ plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text);
+ CREATE INDEX data_index ON conf_tab(data);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+ $node_subscriber->safe_psql('postgres',
+ "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Delete tuple on publisher
+$node_publisher->safe_psql('postgres', "DELETE FROM conf_tab WHERE a=1;");
+
+if ($simulate_race_condition)
+{
+ # Wait apply worker to start the search for the tuple using index
+ $node_subscriber->wait_for_event('logical replication apply worker',
+ 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Updater tuple on subscriber
+$psql_session_subscriber->query_until(
+ qr/start/, qq[
+ \\echo start
+ UPDATE conf_tab SET data = 'fromsubnew' WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+ # Wake up apply worker
+ $node_subscriber->safe_psql('postgres',"
+ SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+ SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+ ");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+ qr/conflict detected on relation \"public.conf_tab\"/,
+ $log_offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# But tuple should be deleted on subscriber any way
+is($node_subscriber->safe_psql('postgres', 'SELECT count(*) from conf_tab'), 0, 'record deleted on subscriber');
+
+ok(!$node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_missing/,
+ $log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=delete_origin_differs/,
+ $log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/037_update_missing_race.pl b/src/test/subscription/t/037_update_missing_race.pl
new file mode 100644
index 00000000000..1e120f74bbd
--- /dev/null
+++ b/src/test/subscription/t/037_update_missing_race.pl
@@ -0,0 +1,141 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+ plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+ CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+ $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+ # Wait apply worker to start the search for the tuple using index
+ $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+ qr/start/, qq[
+ \\echo start
+ UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+ # Wake up apply worker
+ $node_subscriber->safe_psql('postgres',"
+ SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+ SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+ ");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+ qr/conflict detected on relation \"public.conf_tab\"/,
+ $log_offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_missing/,
+ $log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+ $log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/038_update_missing_with_retain.pl b/src/test/subscription/t/038_update_missing_with_retain.pl
new file mode 100644
index 00000000000..7b225d45f7f
--- /dev/null
+++ b/src/test/subscription/t/038_update_missing_with_retain.pl
@@ -0,0 +1,143 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+############################## Set it to 0 to make set success; TODO: delete that for commit
+my $simulate_race_condition = 1;
+##############################
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->append_conf('postgresql.conf',
+ qq(wal_level = 'replica'));
+$node_subscriber->start;
+
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+ plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text);");
+
+# Create similar table on subscriber with additional index to disable HOT updates and additional column
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE TABLE conf_tab(a int PRIMARY key, data text, i int DEFAULT 0);
+ CREATE INDEX i_index ON conf_tab(i);");
+
+# Set up extension to simulate race condition
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE conf_tab");
+
+# Insert row to be updated later
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO conf_tab(a, data) VALUES (1,'frompub')");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub WITH (retain_dead_tuples = true)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+############################################
+# Race condition because of DirtySnapshot
+############################################
+
+my $psql_session_subscriber = $node_subscriber->background_psql('postgres');
+if ($simulate_race_condition)
+{
+ $node_subscriber->safe_psql('postgres', "SELECT injection_points_attach('index_getnext_slot_before_fetch_apply_dirty', 'wait')");
+}
+
+my $log_offset = -s $node_subscriber->logfile;
+
+# Update tuple on publisher
+$node_publisher->safe_psql('postgres',
+ "UPDATE conf_tab SET data = 'frompubnew' WHERE (a=1);");
+
+
+if ($simulate_race_condition)
+{
+ # Wait apply worker to start the search for the tuple using index
+ $node_subscriber->wait_for_event('logical replication apply worker', 'index_getnext_slot_before_fetch_apply_dirty');
+}
+
+# Update additional(!) column on the subscriber
+$psql_session_subscriber->query_until(
+ qr/start/, qq[
+ \\echo start
+ UPDATE conf_tab SET i = 1 WHERE (a=1);
+]);
+
+
+if ($simulate_race_condition)
+{
+ # Wake up apply worker
+ $node_subscriber->safe_psql('postgres',"
+ SELECT injection_points_detach('index_getnext_slot_before_fetch_apply_dirty');
+ SELECT injection_points_wakeup('index_getnext_slot_before_fetch_apply_dirty');
+ ");
+}
+
+# Tuple was updated - so, we have conflict
+$node_subscriber->wait_for_log(
+ qr/conflict detected on relation \"public.conf_tab\"/,
+ $log_offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# We need new column value be synced with subscriber
+is($node_subscriber->safe_psql('postgres', 'SELECT data from conf_tab WHERE a = 1'), 'frompubnew', 'record updated on subscriber');
+# And additional column maintain updated value
+is($node_subscriber->safe_psql('postgres', 'SELECT i from conf_tab WHERE a = 1'), 1, 'column record updated on subscriber');
+
+ok(!$node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_deleted/,
+ $log_offset), 'invalid conflict detected');
+
+ok($node_subscriber->log_contains(
+ qr/LOG: conflict detected on relation \"public.conf_tab\": conflict=update_origin_differs/,
+ $log_offset), 'correct conflict detected');
+
+done_testing();
diff --git a/src/test/subscription/t/039_update_missing_simulation.pl b/src/test/subscription/t/039_update_missing_simulation.pl
new file mode 100644
index 00000000000..21fcd1ceb53
--- /dev/null
+++ b/src/test/subscription/t/039_update_missing_simulation.pl
@@ -0,0 +1,125 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection and resolution in logical replication
+# Not intended to be committed because quite heavy
+# Here to demonstrate reproducibility with pgbench
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use IPC::Run qw(start finish);
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_publisher->start;
+
+# Create subscriber node with track_commit_timestamp enabled
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ qq(track_commit_timestamp = on));
+$node_subscriber->start;
+
+# Check if the extension injection_points is available, as it may be
+# possible that this script is run with installcheck, where the module
+# would not be installed by default.
+if (!$node_subscriber->check_extension('injection_points'))
+{
+ plan skip_all => 'Extension injection_points not installed';
+}
+
+# Create table on publisher
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE TABLE tbl(a int PRIMARY key, data_pub int);");
+
+# Create similar table on subscriber with additional index to disable HOT updates
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE TABLE tbl(a int PRIMARY key, data_pub int, data_sub int default 0);
+ CREATE INDEX data_index ON tbl(data_pub);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE tbl");
+
+# Create the subscription
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres',
+ "CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub");
+
+my $num_rows = 10;
+my $num_updates = 10000;
+my $num_clients = 10;
+$node_publisher->safe_psql('postgres', "INSERT INTO tbl SELECT i, i * i FROM generate_series(1,$num_rows) i");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Prepare small pgbench scripts as files
+my $sub_sql = $node_subscriber->basedir . '/sub_update.sql';
+my $pub_sql = $node_publisher->basedir . '/pub_delete.sql';
+
+open my $fh1, '>', $sub_sql or die $!;
+print $fh1 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_sub = data_sub + 1 WHERE a = :num;\n";
+close $fh1;
+
+open my $fh2, '>', $pub_sql or die $!;
+print $fh2 "\\set num random(1,$num_rows)\nUPDATE tbl SET data_pub = data_pub + 1 WHERE a = :num;\n";
+close $fh2;
+
+my @sub_cmd = (
+ 'pgbench',
+ '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+ '-p', $node_subscriber->port, '-h', $node_subscriber->host, '-f', $sub_sql, 'postgres'
+);
+
+my @pub_cmd = (
+ 'pgbench',
+ '--no-vacuum', "--client=$num_clients", '--jobs=4', '--exit-on-abort', "--transactions=$num_updates",
+ '-p', $node_publisher->port, '-h', $node_publisher->host, '-f', $pub_sql, 'postgres'
+);
+
+$node_subscriber->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+# This should never happen
+$node_subscriber->safe_psql('postgres',
+ "SELECT injection_points_attach('apply_handle_update_internal_update_missing', 'error')");
+my $log_offset = -s $node_subscriber->logfile;
+
+# Start both concurrently
+my ($sub_out, $sub_err, $pub_out, $pub_err) = ('', '', '', '');
+my $sub_h = start \@sub_cmd, '>', \$sub_out, '2>', \$sub_err;
+my $pub_h = start \@pub_cmd, '>', \$pub_out, '2>', \$pub_err;
+
+# Wait for completion
+finish $sub_h;
+finish $pub_h;
+
+like($sub_out, qr/actually processed/, 'subscriber pgbench completed');
+like($pub_out, qr/actually processed/, 'publisher pgbench completed');
+
+# Let subscription catch up, then check expectations
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+ok(!$node_subscriber->log_contains(
+ qr/ERROR: error triggered for injection point apply_handle_update_internal_update_missing/,
+ $log_offset), 'invalid conflict detected');
+
+done_testing();
--
2.48.1