Hello, Amit!

> I think it is rather less likely or not possible in a parallel apply
> case because such conflicting updates (updates on the same tuple)
> should be serialized at the publisher itself. So one of the updates
> will be after the commit that has the second update.

Glad to hear! But anyway, such logic looks very fragile to me.

> I haven't tried the test based on your description of the general
> problem with DirtySnapshot scan. In case of logical replication, we
> will LOG update_missing type of conflict and the user may need to take
> some manual action based on that.

Current it is just DEBUG1, so it will be probably missed by the user.

> * XXX should this be promoted to ereport(LOG) perhaps?
> */
> elog(DEBUG1,
> "logical replication did not find row to be updated "
> "in replication target relation \"%s\"",
> RelationGetRelationName(localrel));
> }

> I have not tried a test so I could
> be wrong as well. I am not sure we can do anything specific to logical
> replication for this but feel free to suggest if you have ideas to
> solve this problem in general or specific to logical replication.

I've implemented a solution to address the problem more generally, attached
the patch (and also the link [1]).

Here's a summary of the changes:

* For each tuple skipped because it was deleted, we now accumulate the
maximum xmax.
* Before the scan begins, we store the value of the latest completed
transaction.
* If no tuples are found in the index, we check the max(xmax) value. If
this value is newer than the latest completed transaction stored before the
scan, it indicates that a tuple was deleted by another transaction after
the scan started. To ensure all tuples are correctly processed we then
rescan the index.


Also added a test case to cover this scenario using the new injection point
mechanism and
updated the b-tree index documentation to include a description of this
case.

I'll add this into the next commitfest.

Best regards,
Mikhail.

[1]:
https://github.com/postgres/postgres/compare/master...michail-nikolaev:postgres:concurrent_unique
From b639379c393c70ca322bab57222513e71c96ad78 Mon Sep 17 00:00:00 2001
From: nkey <nkey@toloka.ai>
Date: Fri, 2 Aug 2024 16:20:32 +0200
Subject: [PATCH v1] fix for lost record in case of DirtySnapshot index scans +
 docs + test

---
 contrib/pgstattuple/pgstattuple.c             |  2 +-
 src/backend/access/heap/heapam_handler.c      |  2 +-
 src/backend/access/heap/heapam_visibility.c   | 20 +++++++-
 src/backend/access/nbtree/README              | 10 ++++
 src/backend/access/nbtree/nbtinsert.c         |  2 +-
 src/backend/access/transam/varsup.c           | 11 +++++
 src/backend/executor/execIndexing.c           | 29 +++++++++++-
 src/backend/executor/execReplication.c        | 26 +++++++++-
 src/backend/replication/logical/origin.c      |  2 +-
 src/include/access/transam.h                  | 16 +++++++
 src/include/utils/snapmgr.h                   |  8 +++-
 src/include/utils/snapshot.h                  |  5 +-
 .../test_misc/t/006_dirty_index_scan.pl       | 47 +++++++++++++++++++
 13 files changed, 168 insertions(+), 12 deletions(-)
 create mode 100644 src/test/modules/test_misc/t/006_dirty_index_scan.pl

diff --git a/contrib/pgstattuple/pgstattuple.c b/contrib/pgstattuple/pgstattuple.c
index 3bd8b96197..4d1b469d8e 100644
--- a/contrib/pgstattuple/pgstattuple.c
+++ b/contrib/pgstattuple/pgstattuple.c
@@ -332,7 +332,7 @@ pgstat_heap(Relation rel, FunctionCallInfo fcinfo)
 	scan = table_beginscan_strat(rel, SnapshotAny, 0, NULL, true, false);
 	hscan = (HeapScanDesc) scan;
 
-	InitDirtySnapshot(SnapshotDirty);
+	InitDirtySnapshot(SnapshotDirty, NULL);
 
 	nblocks = hscan->rs_nblocks;	/* # blocks to be scanned */
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6f8b1b7929..7ea4b205d5 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -404,7 +404,7 @@ tuple_lock_retry:
 			 *
 			 * Loop here to deal with updated or busy tuples
 			 */
-			InitDirtySnapshot(SnapshotDirty);
+			InitDirtySnapshot(SnapshotDirty, NULL);
 			for (;;)
 			{
 				if (ItemPointerIndicatesMovedPartitions(tid))
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 9243feed01..91de5dcea1 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -719,6 +719,12 @@ HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
 		return TM_Deleted;		/* deleted by other */
 }
 
+inline static void UpdateDirtyMaxXmax(Snapshot snapshot, TransactionId xmax)
+{
+	if (snapshot->xip != NULL)
+		snapshot->xip[0] = TransactionIdNewer(xmax, snapshot->xip[0]);
+}
+
 /*
  * HeapTupleSatisfiesDirty
  *		True iff heap tuple is valid including effects of open transactions.
@@ -737,7 +743,9 @@ HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
  * Similarly for snapshot->xmax and the tuple's xmax.  If the tuple was
  * inserted speculatively, meaning that the inserter might still back down
  * on the insertion without aborting the whole transaction, the associated
- * token is also returned in snapshot->speculativeToken.
+ * token is also returned in snapshot->speculativeToken. If xip is != NULL
+ * xip[0] may be set to xid of deleter if it newer than previously store
+ * value.
  */
 static bool
 HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
@@ -750,6 +758,10 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 
 	snapshot->xmin = snapshot->xmax = InvalidTransactionId;
 	snapshot->speculativeToken = 0;
+	/*
+	 * We intentionally keep snapshot->xip values unchanged as they should
+	 * be reset by logic out of the single heap fetch.
+	 */
 
 	if (!HeapTupleHeaderXminCommitted(tuple))
 	{
@@ -870,6 +882,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 	{
 		if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
 			return true;
+		UpdateDirtyMaxXmax(snapshot, HeapTupleHeaderGetRawXmax(tuple));
 		return false;			/* updated by other */
 	}
 
@@ -893,7 +906,10 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 			return true;
 		}
 		if (TransactionIdDidCommit(xmax))
+		{
+			UpdateDirtyMaxXmax(snapshot, xmax);
 			return false;
+		}
 		/* it must have aborted or crashed */
 		return true;
 	}
@@ -902,6 +918,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 	{
 		if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
 			return true;
+		UpdateDirtyMaxXmax(snapshot, HeapTupleHeaderGetRawXmax(tuple));
 		return false;
 	}
 
@@ -931,6 +948,7 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 
 	SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
 				HeapTupleHeaderGetRawXmax(tuple));
+	UpdateDirtyMaxXmax(snapshot, HeapTupleHeaderGetRawXmax(tuple));
 	return false;				/* updated by other */
 }
 
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 52e646c7f7..6de72a29ca 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -489,6 +489,16 @@ on the leaf page at all when the page's LSN has changed.  (That won't work
 with an unlogged index, so for now we don't ever apply the "don't hold
 onto pin" optimization there.)
 
+Despite the locking protocol in place, it is still possible to receive an
+incorrect result during non-MVCC scans. This issue can occur if a concurrent
+transaction deletes a tuple and inserts a new tuple with a new TID in the
+same page. If the scan has already visited the page and cached its content
+in the buffer cache, it might skip the old tuple due to deletion and miss
+the new tuple because of the cache. This is a known limitation of the
+SnapshotDirty and SnapshotAny non-MVCC scans. However, for SnapshotDirty,
+it is possible to work around this limitation by using the returned max(xmax)
+to compare it with the latest committed transaction before the scan started.
+
 Fastpath For Index Insertion
 ----------------------------
 
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 7e8902e48c..943aee087a 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -427,7 +427,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 	/* Assume unique until we find a duplicate */
 	*is_unique = true;
 
-	InitDirtySnapshot(SnapshotDirty);
+	InitDirtySnapshot(SnapshotDirty, NULL);
 
 	page = BufferGetPage(insertstate->buf);
 	opaque = BTPageGetOpaque(page);
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index fb6a86afcb..52109635b4 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -296,6 +296,17 @@ ReadNextFullTransactionId(void)
 	return fullXid;
 }
 
+FullTransactionId ReadLastCompletedFullTransactionId(void)
+{
+	FullTransactionId fullXid;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	fullXid = TransamVariables->latestCompletedXid;
+	LWLockRelease(XidGenLock);
+
+	return fullXid;
+}
+
 /*
  * Advance nextXid to the value after a given xid.  The epoch is inferred.
  * This must only be called during recovery or from two-phase start-up code.
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 9f05b3654c..45767b4e20 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -115,6 +115,7 @@
 #include "nodes/nodeFuncs.h"
 #include "storage/lmgr.h"
 #include "utils/snapmgr.h"
+#include "utils/injection_point.h"
 
 /* waitMode argument to check_exclusion_or_unique_constraint() */
 typedef enum
@@ -702,6 +703,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	IndexScanDesc index_scan;
 	ScanKeyData scankeys[INDEX_MAX_KEYS];
 	SnapshotData DirtySnapshot;
+	TransactionId maxXmax,
+				  latestCompletedXid;
 	int			i;
 	bool		conflict;
 	bool		found_self;
@@ -738,9 +741,10 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 
 	/*
 	 * Search the tuples that are in the index for any violations, including
-	 * tuples that aren't visible yet.
+	 * tuples that aren't visible yet. Also, detect cases index scan skip the
+	 * tuple in case of parallel update after index page content was cached.
 	 */
-	InitDirtySnapshot(DirtySnapshot);
+	InitDirtySnapshot(DirtySnapshot, &maxXmax);
 
 	for (i = 0; i < indnkeyatts; i++)
 	{
@@ -774,6 +778,12 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 retry:
 	conflict = false;
 	found_self = false;
+	/*
+	 * Each time we retry - remember last completed transaction before start
+	 * of the scan. Aso reset maxXmax.
+	 */
+	latestCompletedXid = XidFromFullTransactionId(ReadLastCompletedFullTransactionId());
+	maxXmax = InvalidTransactionId;
 	index_scan = index_beginscan(heap, index, &DirtySnapshot, indnkeyatts, 0);
 	index_rescan(index_scan, scankeys, indnkeyatts, NULL, 0);
 
@@ -889,6 +899,19 @@ retry:
 	}
 
 	index_endscan(index_scan);
+	/*
+	 * Check for the case when index scan fetched records before some other
+	 * transaction deleted tuple and inserted a new one.
+	 */
+	if (!conflict && TransactionIdIsValid(maxXmax) && !TransactionIdIsCurrentTransactionId(maxXmax))
+	{
+		/*
+		 * If we have skipped some tuple because it was deleted, but deletion happened after
+		 * start of the index scan - retry to be sure.
+		 */
+		if (TransactionIdPrecedes(latestCompletedXid, maxXmax))
+			goto retry;
+	}
 
 	/*
 	 * Ordinarily, at this point the search should have found the originally
@@ -902,6 +925,8 @@ retry:
 
 	ExecDropSingleTupleTableSlot(existing_slot);
 
+	if (!conflict)
+		INJECTION_POINT("check_exclusion_or_unique_constraint_no_conflict");
 	return !conflict;
 }
 
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..fbddb6442b 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -183,6 +183,8 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	IndexScanDesc scan;
 	SnapshotData snap;
 	TransactionId xwait;
+	TransactionId maxXmax,
+				  latestCompletedXid;
 	Relation	idxrel;
 	bool		found;
 	TypeCacheEntry **eq = NULL;
@@ -193,7 +195,7 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 
-	InitDirtySnapshot(snap);
+	InitDirtySnapshot(snap, &maxXmax);
 
 	/* Build scan key. */
 	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
@@ -203,6 +205,12 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 retry:
 	found = false;
+	/*
+	 * Each time we retry - remember last completed transaction before start
+ 	 * of the scan. Aso reset maxXmax.
+ 	 */
+	maxXmax = InvalidTransactionId;
+	latestCompletedXid = XidFromFullTransactionId(ReadLastCompletedFullTransactionId());
 
 	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
@@ -242,6 +250,20 @@ retry:
 		break;
 	}
 
+	/*
+	 * Check for the case when index scan fetched records before some other
+	 * transaction deleted tuple and inserted a new one.
+	 */
+	if (!found && TransactionIdIsValid(maxXmax) && !TransactionIdIsCurrentTransactionId(maxXmax))
+	{
+		/*
+		 * If we have skipped some tuple because it was deleted, but deletion happened after
+		 * start of the index scan - retry to be sure.
+		 */
+		if (TransactionIdPrecedes(latestCompletedXid, maxXmax))
+			goto retry;
+	}
+
 	/* Found tuple, try to lock it in the lockmode. */
 	if (found)
 	{
@@ -391,7 +413,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
 	/* Start a heap scan. */
-	InitDirtySnapshot(snap);
+	InitDirtySnapshot(snap, NULL);
 	scan = table_beginscan(rel, &snap, 0, NULL);
 	scanslot = table_slot_create(rel, NULL);
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 419e4814f0..04ba0d9ba1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -278,7 +278,7 @@ replorigin_create(const char *roname)
 	 * to the exclusive lock there's no danger that new rows can appear while
 	 * we're checking.
 	 */
-	InitDirtySnapshot(SnapshotDirty);
+	InitDirtySnapshot(SnapshotDirty, NULL);
 
 	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
 
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 28a2d287fd..aae0ad90c2 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -288,6 +288,7 @@ extern void VarsupShmemInit(void);
 extern FullTransactionId GetNewTransactionId(bool isSubXact);
 extern void AdvanceNextFullTransactionIdPastXid(TransactionId xid);
 extern FullTransactionId ReadNextFullTransactionId(void);
+extern FullTransactionId ReadLastCompletedFullTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 								  Oid oldest_datoid);
 extern void AdvanceOldestClogXid(TransactionId oldest_datfrozenxid);
@@ -344,6 +345,21 @@ TransactionIdOlder(TransactionId a, TransactionId b)
 	return b;
 }
 
+/* return the newer of the two IDs */
+static inline TransactionId
+TransactionIdNewer(TransactionId a, TransactionId b)
+{
+	if (!TransactionIdIsValid(a))
+		return b;
+
+	if (!TransactionIdIsValid(b))
+		return a;
+
+	if (TransactionIdPrecedes(a, b))
+		return b;
+	return a;
+}
+
 /* return the older of the two IDs, assuming they're both normal */
 static inline TransactionId
 NormalTransactionIdOlder(TransactionId a, TransactionId b)
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 9398a84051..5e6f3a7e76 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -36,9 +36,13 @@ extern PGDLLIMPORT SnapshotData CatalogSnapshotData;
  * We don't provide a static SnapshotDirty variable because it would be
  * non-reentrant.  Instead, users of that snapshot type should declare a
  * local variable of type SnapshotData, and initialize it with this macro.
+ * pxid is optional and can be NULL. If it is not NULL, pxid[0] will be set
+ * to the transaction ID of deleting transaction if the tuple is deleted
+ * and it newer than pxid[0].
  */
-#define InitDirtySnapshot(snapshotdata)  \
-	((snapshotdata).snapshot_type = SNAPSHOT_DIRTY)
+#define InitDirtySnapshot(snapshotdata, pxid)  \
+	((snapshotdata).snapshot_type = SNAPSHOT_DIRTY, \
+	 (snapshotdata).xip = (pxid))
 
 /*
  * Similarly, some initialization is required for a NonVacuumable snapshot.
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 8d1e31e888..a68114e500 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -96,7 +96,10 @@ typedef enum SnapshotType
 	 * xmax.  If the tuple was inserted speculatively, meaning that the
 	 * inserter might still back down on the insertion without aborting the
 	 * whole transaction, the associated token is also returned in
-	 * snapshot->speculativeToken.  See also InitDirtySnapshot().
+	 * snapshot->speculativeToken. If xip is non-NULL, the xid of the
+	 * deleting transaction is stored into xip[0] if it newer than existing
+	 * xip[0] value.
+	 * See also InitDirtySnapshot().
 	 * -------------------------------------------------------------------------
 	 */
 	SNAPSHOT_DIRTY,
diff --git a/src/test/modules/test_misc/t/006_dirty_index_scan.pl b/src/test/modules/test_misc/t/006_dirty_index_scan.pl
new file mode 100644
index 0000000000..4d116e659e
--- /dev/null
+++ b/src/test/modules/test_misc/t/006_dirty_index_scan.pl
@@ -0,0 +1,47 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test issue with lost tuple in case of DirtySnapshot index scans
+use strict;
+use warnings;
+
+use Config;
+use Errno;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+if ($ENV{enable_injection_points} ne 'yes')
+{
+	plan skip_all => 'Injection points not supported by this build';
+}
+
+my ($node, $result);
+$node = PostgreSQL::Test::Cluster->new('DirtyScan_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'fsync = off');
+$node->append_conf('postgresql.conf', 'autovacuum = off');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION injection_points));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int primary key, n int)));
+
+$node->safe_psql('postgres', q(INSERT INTO tbl VALUES(42,1)));
+$node->safe_psql('postgres', q(SELECT injection_points_attach('check_exclusion_or_unique_constraint_no_conflict', 'error')));
+
+$node->pgbench(
+	'--no-vacuum --client=40 --transactions=1000',
+	0,
+	[qr{actually processed}],
+	[qr{^$}],
+	'concurrent UPSERT',
+	{
+		'on_conflicts' => q(
+			INSERT INTO tbl VALUES(42,1) on conflict(i) do update set n = EXCLUDED.n + 1;
+		)
+	});
+
+$node->safe_psql('postgres', q(SELECT injection_points_detach('check_exclusion_or_unique_constraint_no_conflict')));
+
+$node->stop;
+done_testing();
\ No newline at end of file
-- 
2.34.1

Reply via email to