From 961251fd3fe5e462c4eccda56ad54842ea300956 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Tue, 7 Jan 2025 00:00:32 +0100
Subject: [PATCH v4 3/3] RFC: Extend buffer pinning for SP-GIST IOS

This should fix issues with incorrect results when a SP-GIST
IOS encounters tuples removed from pages by a concurrent vacuum
operation.
---
 src/include/access/spgist_private.h   |   3 +
 src/backend/access/spgist/spgscan.c   | 112 ++++++++++++++++++++++----
 src/backend/access/spgist/spgvacuum.c |   2 +-
 3 files changed, 100 insertions(+), 17 deletions(-)

diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index e7cbe10a89b..c29d1d58c47 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -175,6 +175,8 @@ typedef struct SpGistSearchItem
 	bool		isLeaf;			/* SearchItem is heap item */
 	bool		recheck;		/* qual recheck is needed */
 	bool		recheckDistances;	/* distance recheck is needed */
+	Buffer		buffer;			/* buffer pinned for this leaf tuple
+								 * (IOS-only) */
 
 	/* array with numberOfOrderBys entries */
 	double		distances[FLEXIBLE_ARRAY_MEMBER];
@@ -226,6 +228,7 @@ typedef struct SpGistScanOpaqueData
 	TupleDesc	reconTupDesc;	/* if so, descriptor for reconstructed tuples */
 	int			nPtrs;			/* number of TIDs found on current page */
 	int			iPtr;			/* index for scanning through same */
+	Buffer		pagePin;		/* output tuple's pinned buffer, if IOS */
 	ItemPointerData heapPtrs[MaxIndexTuplesPerPage];	/* TIDs from cur page */
 	bool		recheck[MaxIndexTuplesPerPage]; /* their recheck flags */
 	bool		recheckDistances[MaxIndexTuplesPerPage];	/* distance recheck
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 3017861859f..ea344740f49 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -30,7 +30,8 @@
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
 							   Datum leafValue, bool isNull,
 							   SpGistLeafTuple leafTuple, bool recheck,
-							   bool recheckDistances, double *distances);
+							   bool recheckDistances, double *distances,
+							   Buffer pin);
 
 /*
  * Pairing heap comparison function for the SpGistSearchItem queue.
@@ -300,6 +301,38 @@ spgPrepareScanKeys(IndexScanDesc scan)
 	}
 }
 
+/*
+ * Note: This removes all items from the pairingheap.
+ */
+static void
+spgScanEndDropAllPagePins(IndexScanDesc scan, SpGistScanOpaque so)
+{
+	/* Guaranteed no pinned pages */
+	if (so->scanQueue == NULL || !scan->xs_want_itup)
+		return;
+
+	if (so->nPtrs > 0)
+	{
+		Assert(BufferIsValid(so->pagePin));
+		ReleaseBuffer(so->pagePin);
+		so->pagePin = InvalidBuffer;
+	}
+
+	while (!pairingheap_is_empty(so->scanQueue))
+	{
+		pairingheap_node *node;
+		SpGistSearchItem *item;
+
+		node = pairingheap_remove_first(so->scanQueue);
+		item = pairingheap_container(SpGistSearchItem, phNode, node);
+		if (!item->isLeaf)
+			continue;
+
+		Assert(BufferIsValid(item->buffer));
+		ReleaseBuffer(item->buffer);
+	}
+}
+
 IndexScanDesc
 spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
@@ -416,6 +449,9 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 	/* preprocess scankeys, set up the representation in *so */
 	spgPrepareScanKeys(scan);
 
+	/* release any pinned buffers from earlier rescans */
+	spgScanEndDropAllPagePins(scan, so);
+
 	/* set up starting queue entries */
 	resetSpGistScanOpaque(so);
 
@@ -428,6 +464,12 @@ spgendscan(IndexScanDesc scan)
 {
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
 
+	/*
+	 * release any pinned buffers from earlier rescans, before we drop their
+	 * data by dropping the memory contexts.
+	 */
+	spgScanEndDropAllPagePins(scan, so);
+
 	MemoryContextDelete(so->tempCxt);
 	MemoryContextDelete(so->traversalCxt);
 
@@ -460,7 +502,7 @@ spgendscan(IndexScanDesc scan)
 static SpGistSearchItem *
 spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 			   Datum leafValue, bool recheck, bool recheckDistances,
-			   bool isnull, double *distances)
+			   bool isnull, double *distances, Buffer addPin)
 {
 	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
 
@@ -479,6 +521,10 @@ spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 			datumCopy(leafValue, so->state.attType.attbyval,
 					  so->state.attType.attlen);
 
+		Assert(BufferIsValid(addPin));
+		IncrBufferRefCount(addPin);
+		item->buffer = addPin;
+
 		/*
 		 * If we're going to need to reconstruct INCLUDE attributes, store the
 		 * whole leaf tuple so we can get the INCLUDE attributes out of it.
@@ -495,6 +541,7 @@ spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 	{
 		item->value = (Datum) 0;
 		item->leafTuple = NULL;
+		item->buffer = InvalidBuffer;
 	}
 	item->traversalValue = NULL;
 	item->isLeaf = true;
@@ -513,7 +560,7 @@ spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
 static bool
 spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			SpGistLeafTuple leafTuple, bool isnull,
-			bool *reportedSome, storeRes_func storeRes)
+			bool *reportedSome, storeRes_func storeRes, Buffer buffer)
 {
 	Datum		leafValue;
 	double	   *distances;
@@ -580,7 +627,8 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 														recheck,
 														recheckDistances,
 														isnull,
-														distances);
+														distances,
+														buffer);
 
 			spgAddSearchItemToQueue(so, heapItem);
 
@@ -591,7 +639,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			/* non-ordered scan, so report the item right away */
 			Assert(!recheckDistances);
 			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
-					 leafTuple, recheck, false, NULL);
+					 leafTuple, recheck, false, NULL, InvalidBuffer);
 			*reportedSome = true;
 		}
 	}
@@ -760,7 +808,7 @@ enum SpGistSpecialOffsetNumbers
 static OffsetNumber
 spgTestLeafTuple(SpGistScanOpaque so,
 				 SpGistSearchItem *item,
-				 Page page, OffsetNumber offset,
+				 Page page, OffsetNumber offset, Buffer buffer,
 				 bool isnull, bool isroot,
 				 bool *reportedSome,
 				 storeRes_func storeRes)
@@ -799,7 +847,8 @@ spgTestLeafTuple(SpGistScanOpaque so,
 
 	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
 
-	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes,
+				buffer);
 
 	return SGLT_GET_NEXTOFFSET(leafTuple);
 }
@@ -835,7 +884,8 @@ redirect:
 			Assert(so->numberOfNonNullOrderBys > 0);
 			storeRes(so, &item->heapPtr, item->value, item->isNull,
 					 item->leafTuple, item->recheck,
-					 item->recheckDistances, item->distances);
+					 item->recheckDistances, item->distances,
+					 item->buffer);
 			reportedSome = true;
 		}
 		else
@@ -873,7 +923,7 @@ redirect:
 					/* When root is a leaf, examine all its tuples */
 					for (offset = FirstOffsetNumber; offset <= max; offset++)
 						(void) spgTestLeafTuple(so, item, page, offset,
-												isnull, true,
+												buffer, isnull, true,
 												&reportedSome, storeRes);
 				}
 				else
@@ -883,10 +933,24 @@ redirect:
 					{
 						Assert(offset >= FirstOffsetNumber && offset <= max);
 						offset = spgTestLeafTuple(so, item, page, offset,
-												  isnull, false,
+												  buffer, isnull, false,
 												  &reportedSome, storeRes);
 						if (offset == SpGistRedirectOffsetNumber)
+						{
+							Assert(so->nPtrs == 0);
 							goto redirect;
+						}
+					}
+
+					/*
+					 * IOS: Make sure we have one additional pin on the buffer,
+					 * so that vacuum won't remove any deleted TIDs and mark
+					 * their pages ALL_VISIBLE while we still have a copy.
+					 */
+					if (so->want_itup && reportedSome)
+					{
+						IncrBufferRefCount(buffer);
+						so->pagePin = buffer;
 					}
 				}
 			}
@@ -929,9 +993,10 @@ static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
 			Datum leafValue, bool isnull,
 			SpGistLeafTuple leafTuple, bool recheck,
-			bool recheckDistances, double *distances)
+			bool recheckDistances, double *distances,
+			Buffer pin)
 {
-	Assert(!recheckDistances && !distances);
+	Assert(!recheckDistances && !distances && !BufferIsValid(pin));
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
 	so->ntids++;
 }
@@ -954,10 +1019,9 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 
 /* storeRes subroutine for gettuple case */
 static void
-storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-			  Datum leafValue, bool isnull,
-			  SpGistLeafTuple leafTuple, bool recheck,
-			  bool recheckDistances, double *nonNullDistances)
+storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, Datum leafValue,
+			  bool isnull, SpGistLeafTuple leafTuple, bool recheck,
+			  bool recheckDistances, double *nonNullDistances, Buffer pin)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
@@ -1016,6 +1080,10 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 		so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc,
 												   leafDatums,
 												   leafIsnulls);
+
+		/* move the buffer pin, if required */
+		if (BufferIsValid(pin))
+			so->pagePin = pin;
 	}
 	so->nPtrs++;
 }
@@ -1065,7 +1133,19 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 
 			for (i = 0; i < so->nPtrs; i++)
 				pfree(so->reconTups[i]);
+
+			if (so->nPtrs > 0)
+			{
+				Assert(BufferIsValid(so->pagePin));
+				ReleaseBuffer(so->pagePin);
+				so->pagePin = InvalidBuffer;
+			}
 		}
+		else
+		{
+			Assert(!BufferIsValid(so->pagePin));
+		}
+
 		so->iPtr = so->nPtrs = 0;
 
 		spgWalk(scan->indexRelation, so, false, storeGettuple);
diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c
index 0da069fd4d7..d0680a5073e 100644
--- a/src/backend/access/spgist/spgvacuum.c
+++ b/src/backend/access/spgist/spgvacuum.c
@@ -629,7 +629,7 @@ spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
 
 	buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
 								RBM_NORMAL, bds->info->strategy);
-	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+	LockBufferForCleanup(buffer);
 	page = (Page) BufferGetPage(buffer);
 
 	if (PageIsNew(page))
-- 
2.45.2

