Hi,

On 2018-12-13 19:32:19 -0500, Robert Haas wrote:
> On Wed, Dec 12, 2018 at 3:41 PM Andres Freund <and...@anarazel.de> wrote:
> > I don't like the approach of managing the catalog horizon via those
> > periodically logged catalog xmin announcements.  I think we instead
> > should build ontop of the records we already have and use to compute
> > snapshot conflicts.  As of HEAD we don't know whether such tables are
> > catalog tables, but that's just a bool that we need to include in the
> > records, a basically immeasurable overhead given the size of those
> > records.
> 
> To me, this paragraph appears to say that you don't like Craig's
> approach without quite explaining why you don't like it.  Could you be
> a bit more explicit about that?

I think the conflict system introduced in Craig's patch is quite
complicated, relies on logging new wal records on a regular basis, adds
needs to be more conservative about the xmin horizon, which is obviously
not great for performance.

If you look at Craig's patch, it currently relies on blocking out
concurrent checkpoints:
               /*
                * We must prevent a concurrent checkpoint, otherwise the 
catalog xmin
                * advance xlog record with the new value might be written 
before the
                * checkpoint but the checkpoint may still see the old
                * oldestCatalogXmin value.
                */
               if (!LWLockConditionalAcquire(CheckpointLock, LW_SHARED))
                       /* Couldn't get checkpointer lock; will retry later */
                       return;
which on its own seems unacceptable, given that CheckpointLock can be
held by checkpointer for a very long time. While that's ongoing the
catalog xmin horizon doesn't advance.

Looking at the code it seems hard, to me, to make that approach work
nicely. But I might just be tired.


> > I'm wondering if it's time to move the latestRemovedXid computation for
> > this type of record to the primary - it's likely to be cheaper there and
> > avoids this kind of complication. Secondarily, it'd have the advantage
> > of making pluggable storage integration easier - there we have the
> > problem that we don't know which type of relation we're dealing with
> > during recovery, so such lookups make pluggability harder (zheap just
> > adds extra flags to signal that, but that's not extensible).
> 
> That doesn't look trivial.  It seems like _bt_delitems_delete() would
> need to get an array of XIDs, but that gets called from
> _bt_vacuum_one_page(), which doesn't have that information available.
> It doesn't look like there is a particularly cheap way of getting it,
> either.  What do you have in mind?

I've a prototype attached, but let's discuss the details in a separate
thread. This also needs to be changed for pluggable storage, as we don't
know about table access methods in the startup process, so we can't call
can't determine which AM the heap is from during
btree_xlog_delete_get_latestRemovedXid() (and sibling routines).

Writing that message right now.


> > - enforce hot-standby to be enabled on the standby when logical slots
> >   are created, and at startup if a logical slot exists
> 
> Why do we need this?

Currently the conflict routines are only called when hot standby is
on. There's also no way to use logical decoding (including just advancing the
slot), without hot-standby being enabled, so I think that'd be a pretty
harmless restriction.


> > - Have a nicer conflict handling than what I implemented here.  Craig's
> >   approach deleted the slots, but I'm not sure I like that.  Blocking
> >   seems more appropriately here, after all it's likely that the
> >   replication topology would be broken afterwards.
> 
> I guess the viable options are approximately --

> (1) drop the slot

Doable.


> (2)  advance the slot

That's not realistically possible, I think. We'd need to be able to use
most of the logical decoding infrastructure in that context, and we
don't have that available.  It's also possible to deadlock, where
advancing the slot's xmin horizon would need further WAL, but WAL replay
is blocked on advancing the slot.


> (3) mark the slot as "failed" but leave it in existence as a tombstone

We currently don't have that, but it'd be doable, I think.

> (4) wait until something changes.
> (4) seems pretty unfortunate unless there's some other system for
> having the slot advance automatically.  Seems like a way for
> replication to hang indefinitely without anybody understanding why
> it's happened (or, maybe, noticing).

On the other hand, it would often allow whatever user of the slot to
continue using it, till the conflict is "resolved". To me it seems about
as easy to debug physical replication being blocked, as somehow the slot
being magically deleted or marked as invalid.


Thanks for looking,

Andres Freund
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index ab5aaff1566..2f13a0fd2ad 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -969,155 +969,6 @@ hash_xlog_update_meta_page(XLogReaderState *record)
 		UnlockReleaseBuffer(metabuf);
 }
 
-/*
- * Get the latestRemovedXid from the heap pages pointed at by the index
- * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
- * on which this function is based.
- */
-static TransactionId
-hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
-{
-	xl_hash_vacuum_one_page *xlrec;
-	OffsetNumber *unused;
-	Buffer		ibuffer,
-				hbuffer;
-	Page		ipage,
-				hpage;
-	RelFileNode rnode;
-	BlockNumber blkno;
-	ItemId		iitemid,
-				hitemid;
-	IndexTuple	itup;
-	HeapTupleHeader htuphdr;
-	BlockNumber hblkno;
-	OffsetNumber hoffnum;
-	TransactionId latestRemovedXid = InvalidTransactionId;
-	int			i;
-
-	xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
-
-	/*
-	 * If there's nothing running on the standby we don't need to derive a
-	 * full latestRemovedXid value, so use a fast path out of here.  This
-	 * returns InvalidTransactionId, and so will conflict with all HS
-	 * transactions; but since we just worked out that that's zero people,
-	 * it's OK.
-	 *
-	 * XXX There is a race condition here, which is that a new backend might
-	 * start just after we look.  If so, it cannot need to conflict, but this
-	 * coding will result in throwing a conflict anyway.
-	 */
-	if (CountDBBackends(InvalidOid) == 0)
-		return latestRemovedXid;
-
-	/*
-	 * Check if WAL replay has reached a consistent database state. If not, we
-	 * must PANIC. See the definition of
-	 * btree_xlog_delete_get_latestRemovedXid for more details.
-	 */
-	if (!reachedConsistency)
-		elog(PANIC, "hash_xlog_vacuum_get_latestRemovedXid: cannot operate with inconsistent data");
-
-	/*
-	 * Get index page.  If the DB is consistent, this should not fail, nor
-	 * should any of the heap page fetches below.  If one does, we return
-	 * InvalidTransactionId to cancel all HS transactions.  That's probably
-	 * overkill, but it's safe, and certainly better than panicking here.
-	 */
-	XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
-	ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
-
-	if (!BufferIsValid(ibuffer))
-		return InvalidTransactionId;
-	LockBuffer(ibuffer, HASH_READ);
-	ipage = (Page) BufferGetPage(ibuffer);
-
-	/*
-	 * Loop through the deleted index items to obtain the TransactionId from
-	 * the heap items they point to.
-	 */
-	unused = (OffsetNumber *) ((char *) xlrec + SizeOfHashVacuumOnePage);
-
-	for (i = 0; i < xlrec->ntuples; i++)
-	{
-		/*
-		 * Identify the index tuple about to be deleted.
-		 */
-		iitemid = PageGetItemId(ipage, unused[i]);
-		itup = (IndexTuple) PageGetItem(ipage, iitemid);
-
-		/*
-		 * Locate the heap page that the index tuple points at
-		 */
-		hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
-		hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
-										 hblkno, RBM_NORMAL);
-
-		if (!BufferIsValid(hbuffer))
-		{
-			UnlockReleaseBuffer(ibuffer);
-			return InvalidTransactionId;
-		}
-		LockBuffer(hbuffer, HASH_READ);
-		hpage = (Page) BufferGetPage(hbuffer);
-
-		/*
-		 * Look up the heap tuple header that the index tuple points at by
-		 * using the heap node supplied with the xlrec. We can't use
-		 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
-		 * Note that we are not looking at tuple data here, just headers.
-		 */
-		hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
-		hitemid = PageGetItemId(hpage, hoffnum);
-
-		/*
-		 * Follow any redirections until we find something useful.
-		 */
-		while (ItemIdIsRedirected(hitemid))
-		{
-			hoffnum = ItemIdGetRedirect(hitemid);
-			hitemid = PageGetItemId(hpage, hoffnum);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * If the heap item has storage, then read the header and use that to
-		 * set latestRemovedXid.
-		 *
-		 * Some LP_DEAD items may not be accessible, so we ignore them.
-		 */
-		if (ItemIdHasStorage(hitemid))
-		{
-			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
-			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
-		}
-		else if (ItemIdIsDead(hitemid))
-		{
-			/*
-			 * Conjecture: if hitemid is dead then it had xids before the xids
-			 * marked on LP_NORMAL items. So we just ignore this item and move
-			 * onto the next, for the purposes of calculating
-			 * latestRemovedxids.
-			 */
-		}
-		else
-			Assert(!ItemIdIsUsed(hitemid));
-
-		UnlockReleaseBuffer(hbuffer);
-	}
-
-	UnlockReleaseBuffer(ibuffer);
-
-	/*
-	 * If all heap tuples were LP_DEAD then we will be returning
-	 * InvalidTransactionId here, which avoids conflicts. This matches
-	 * existing logic which assumes that LP_DEAD tuples must already be older
-	 * than the latestRemovedXid on the cleanup record that set them as
-	 * LP_DEAD, hence must already have generated a conflict.
-	 */
-	return latestRemovedXid;
-}
-
 /*
  * replay delete operation in hash index to remove
  * tuples marked as DEAD during index tuple insertion.
@@ -1149,12 +1000,10 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 	{
-		TransactionId latestRemovedXid =
-		hash_xlog_vacuum_get_latestRemovedXid(record);
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 3eb722ce266..f9a261a713f 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -24,8 +24,8 @@
 #include "storage/buf_internals.h"
 #include "storage/predicate.h"
 
-static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
-					  RelFileNode hnode);
+static void _hash_vacuum_one_page(Relation rel, Relation hrel,
+								  Buffer metabuf, Buffer buf);
 
 /*
  *	_hash_doinsert() -- Handle insertion of a single index tuple.
@@ -138,7 +138,7 @@ restart_insert:
 
 			if (IsBufferCleanupOK(buf))
 			{
-				_hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
+				_hash_vacuum_one_page(rel, heapRel, metabuf, buf);
 
 				if (PageGetFreeSpace(page) >= itemsz)
 					break;		/* OK, now we have enough space */
@@ -336,8 +336,7 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
  */
 
 static void
-_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
-					  RelFileNode hnode)
+_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
 {
 	OffsetNumber deletable[MaxOffsetNumber];
 	int			ndeletable = 0;
@@ -361,6 +360,10 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
 
 	if (ndeletable > 0)
 	{
+		TransactionId latestRemovedXid;
+
+		latestRemovedXid = index_compute_xid_horizon_for_tuples(rel, hrel, buf, deletable, ndeletable);
+
 		/*
 		 * Write-lock the meta page so that we can decrement tuple count.
 		 */
@@ -394,7 +397,8 @@ _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
 			xl_hash_vacuum_one_page xlrec;
 			XLogRecPtr	recptr;
 
-			xlrec.hnode = hnode;
+			xlrec.latestRemovedXid = latestRemovedXid;
+			xlrec.hnode = hrel->rd_node;
 			xlrec.ntuples = ndeletable;
 
 			XLogBeginInsert();
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 96501456422..049a8498e8f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7558,6 +7558,135 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
 	/* *latestRemovedXid may still be invalid at end */
 }
 
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted.
+ *
+ * This puts the work for calculating latestRemovedXid into the recovery path
+ * rather than the primary path.
+ *
+ * It's possible that this generates a fair amount of I/O, since an index
+ * block may have hundreds of tuples being deleted. Repeat accesses to the
+ * same heap blocks are common, though are not yet optimised.
+ *
+ * XXX optimise later with something like XLogPrefetchBuffer()
+ */
+TransactionId
+heap_compute_xid_horizon_for_tuples(Relation rel,
+									ItemPointerData *tids,
+									int nitems)
+{
+	TransactionId latestRemovedXid = InvalidTransactionId;
+	BlockNumber hblkno;
+	Buffer		buf = InvalidBuffer;
+	Page		hpage;
+
+	/*
+	 * Sort to avoid repeated lookups for the same page, and to make it more
+	 * likely to access items in an efficient order. In particular this
+	 * ensures thaf if there are multiple pointers to the same page, they all
+	 * get processed looking up and locking the page just once.
+	 */
+	qsort((void *) tids, nitems, sizeof(ItemPointerData),
+		  (int (*) (const void *, const void *)) ItemPointerCompare);
+
+	/* prefetch all pages */
+#ifdef USE_PREFETCH
+	hblkno = InvalidBlockNumber;
+	for (int i = 0; i < nitems; i++)
+	{
+		ItemPointer htid = &tids[i];
+
+		if (hblkno == InvalidBlockNumber ||
+			ItemPointerGetBlockNumber(htid) != hblkno)
+		{
+			hblkno = ItemPointerGetBlockNumber(htid);
+
+			PrefetchBuffer(rel, MAIN_FORKNUM, hblkno);
+		}
+	}
+#endif
+
+	/* Iterate over all tids, and check their horizon */
+	hblkno = InvalidBlockNumber;
+	for (int i = 0; i < nitems; i++)
+	{
+		ItemPointer htid = &tids[i];
+		ItemId hitemid;
+		OffsetNumber hoffnum;
+
+		/*
+		 * Read heap buffer, but avoid refetching if it's the same block as
+		 * required for the last tid.
+		 */
+		if (hblkno == InvalidBlockNumber ||
+			ItemPointerGetBlockNumber(htid) != hblkno)
+		{
+			/* release old buffer */
+			if (BufferIsValid(buf))
+			{
+				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+				ReleaseBuffer(buf);
+			}
+
+			hblkno = ItemPointerGetBlockNumber(htid);
+
+			buf = ReadBuffer(rel, hblkno);
+			hpage = BufferGetPage(buf);
+
+			LockBuffer(buf, BUFFER_LOCK_SHARE);
+		}
+
+		hoffnum = ItemPointerGetOffsetNumber(htid);
+		hitemid = PageGetItemId(hpage, hoffnum);
+
+		/*
+		 * Follow any redirections until we find something useful.
+		 */
+		while (ItemIdIsRedirected(hitemid))
+		{
+			hoffnum = ItemIdGetRedirect(hitemid);
+			hitemid = PageGetItemId(hpage, hoffnum);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/*
+		 * If the heap item has storage, then read the header and use that to
+		 * set latestRemovedXid.
+		 *
+		 * Some LP_DEAD items may not be accessible, so we ignore them.
+		 */
+		if (ItemIdHasStorage(hitemid))
+		{
+			HeapTupleHeader htuphdr;
+
+			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
+
+			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+		}
+		else if (ItemIdIsDead(hitemid))
+		{
+			/*
+			 * Conjecture: if hitemid is dead then it had xids before the xids
+			 * marked on LP_NORMAL items. So we just ignore this item and move
+			 * onto the next, for the purposes of calculating
+			 * latestRemovedxids.
+			 */
+		}
+		else
+			Assert(!ItemIdIsUsed(hitemid));
+
+	}
+
+	if (BufferIsValid(buf))
+	{
+		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+		ReleaseBuffer(buf);
+	}
+
+	return latestRemovedXid;
+}
+
 /*
  * Perform XLogInsert to register a heap cleanup info message. These
  * messages are sent once per VACUUM and are required because
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 9d087756879..c4064b7c02e 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -275,6 +275,42 @@ BuildIndexValueDescription(Relation indexRelation,
 	return buf.data;
 }
 
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted.
+ */
+TransactionId
+index_compute_xid_horizon_for_tuples(Relation irel,
+									 Relation hrel,
+									 Buffer ibuf,
+									 OffsetNumber *itemnos,
+									 int nitems)
+{
+	ItemPointerData *htids = (ItemPointerData *) palloc(sizeof(ItemPointerData) * nitems);
+	TransactionId latestRemovedXid = InvalidTransactionId;
+	Page		ipage = BufferGetPage(ibuf);
+	IndexTuple	itup;
+
+	/* identify what the index tuples about to be deleted point to */
+	for (int i = 0; i < nitems; i++)
+	{
+		ItemId iitemid;
+
+		iitemid = PageGetItemId(ipage, itemnos[i]);
+		itup = (IndexTuple) PageGetItem(ipage, iitemid);
+
+		ItemPointerCopy(&itup->t_tid, &htids[i]);
+	}
+
+	/* determine the actual xid horizon */
+	latestRemovedXid =
+		heap_compute_xid_horizon_for_tuples(hrel, htids, nitems);
+
+	pfree(htids);
+
+	return latestRemovedXid;
+}
+
 
 /* ----------------------------------------------------------------
  *		heap-or-index-scan access to system catalogs
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 4082103fe2d..7228c012ad5 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1032,10 +1032,16 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 {
 	Page		page = BufferGetPage(buf);
 	BTPageOpaque opaque;
+	TransactionId latestRemovedXid = InvalidTransactionId;
 
 	/* Shouldn't be called unless there's something to do */
 	Assert(nitems > 0);
 
+	if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
+		latestRemovedXid =
+			index_compute_xid_horizon_for_tuples(rel, heapRel, buf,
+												 itemnos, nitems);
+
 	/* No ereport(ERROR) until changes are logged */
 	START_CRIT_SECTION();
 
@@ -1065,6 +1071,7 @@ _bt_delitems_delete(Relation rel, Buffer buf,
 		XLogRecPtr	recptr;
 		xl_btree_delete xlrec_delete;
 
+		xlrec_delete.latestRemovedXid = latestRemovedXid;
 		xlrec_delete.hnode = heapRel->rd_node;
 		xlrec_delete.nitems = nitems;
 
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 67a94cb80a2..052de4b2f3d 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -518,159 +518,6 @@ btree_xlog_vacuum(XLogReaderState *record)
 		UnlockReleaseBuffer(buffer);
 }
 
-/*
- * Get the latestRemovedXid from the heap pages pointed at by the index
- * tuples being deleted. This puts the work for calculating latestRemovedXid
- * into the recovery path rather than the primary path.
- *
- * It's possible that this generates a fair amount of I/O, since an index
- * block may have hundreds of tuples being deleted. Repeat accesses to the
- * same heap blocks are common, though are not yet optimised.
- *
- * XXX optimise later with something like XLogPrefetchBuffer()
- */
-static TransactionId
-btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
-{
-	xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
-	OffsetNumber *unused;
-	Buffer		ibuffer,
-				hbuffer;
-	Page		ipage,
-				hpage;
-	RelFileNode rnode;
-	BlockNumber blkno;
-	ItemId		iitemid,
-				hitemid;
-	IndexTuple	itup;
-	HeapTupleHeader htuphdr;
-	BlockNumber hblkno;
-	OffsetNumber hoffnum;
-	TransactionId latestRemovedXid = InvalidTransactionId;
-	int			i;
-
-	/*
-	 * If there's nothing running on the standby we don't need to derive a
-	 * full latestRemovedXid value, so use a fast path out of here.  This
-	 * returns InvalidTransactionId, and so will conflict with all HS
-	 * transactions; but since we just worked out that that's zero people,
-	 * it's OK.
-	 *
-	 * XXX There is a race condition here, which is that a new backend might
-	 * start just after we look.  If so, it cannot need to conflict, but this
-	 * coding will result in throwing a conflict anyway.
-	 */
-	if (CountDBBackends(InvalidOid) == 0)
-		return latestRemovedXid;
-
-	/*
-	 * In what follows, we have to examine the previous state of the index
-	 * page, as well as the heap page(s) it points to.  This is only valid if
-	 * WAL replay has reached a consistent database state; which means that
-	 * the preceding check is not just an optimization, but is *necessary*. We
-	 * won't have let in any user sessions before we reach consistency.
-	 */
-	if (!reachedConsistency)
-		elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
-
-	/*
-	 * Get index page.  If the DB is consistent, this should not fail, nor
-	 * should any of the heap page fetches below.  If one does, we return
-	 * InvalidTransactionId to cancel all HS transactions.  That's probably
-	 * overkill, but it's safe, and certainly better than panicking here.
-	 */
-	XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
-	ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
-	if (!BufferIsValid(ibuffer))
-		return InvalidTransactionId;
-	LockBuffer(ibuffer, BT_READ);
-	ipage = (Page) BufferGetPage(ibuffer);
-
-	/*
-	 * Loop through the deleted index items to obtain the TransactionId from
-	 * the heap items they point to.
-	 */
-	unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
-
-	for (i = 0; i < xlrec->nitems; i++)
-	{
-		/*
-		 * Identify the index tuple about to be deleted
-		 */
-		iitemid = PageGetItemId(ipage, unused[i]);
-		itup = (IndexTuple) PageGetItem(ipage, iitemid);
-
-		/*
-		 * Locate the heap page that the index tuple points at
-		 */
-		hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
-		hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM, hblkno, RBM_NORMAL);
-		if (!BufferIsValid(hbuffer))
-		{
-			UnlockReleaseBuffer(ibuffer);
-			return InvalidTransactionId;
-		}
-		LockBuffer(hbuffer, BT_READ);
-		hpage = (Page) BufferGetPage(hbuffer);
-
-		/*
-		 * Look up the heap tuple header that the index tuple points at by
-		 * using the heap node supplied with the xlrec. We can't use
-		 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
-		 * Note that we are not looking at tuple data here, just headers.
-		 */
-		hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
-		hitemid = PageGetItemId(hpage, hoffnum);
-
-		/*
-		 * Follow any redirections until we find something useful.
-		 */
-		while (ItemIdIsRedirected(hitemid))
-		{
-			hoffnum = ItemIdGetRedirect(hitemid);
-			hitemid = PageGetItemId(hpage, hoffnum);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * If the heap item has storage, then read the header and use that to
-		 * set latestRemovedXid.
-		 *
-		 * Some LP_DEAD items may not be accessible, so we ignore them.
-		 */
-		if (ItemIdHasStorage(hitemid))
-		{
-			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
-
-			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
-		}
-		else if (ItemIdIsDead(hitemid))
-		{
-			/*
-			 * Conjecture: if hitemid is dead then it had xids before the xids
-			 * marked on LP_NORMAL items. So we just ignore this item and move
-			 * onto the next, for the purposes of calculating
-			 * latestRemovedxids.
-			 */
-		}
-		else
-			Assert(!ItemIdIsUsed(hitemid));
-
-		UnlockReleaseBuffer(hbuffer);
-	}
-
-	UnlockReleaseBuffer(ibuffer);
-
-	/*
-	 * If all heap tuples were LP_DEAD then we will be returning
-	 * InvalidTransactionId here, which avoids conflicts. This matches
-	 * existing logic which assumes that LP_DEAD tuples must already be older
-	 * than the latestRemovedXid on the cleanup record that set them as
-	 * LP_DEAD, hence must already have generated a conflict.
-	 */
-	return latestRemovedXid;
-}
-
 static void
 btree_xlog_delete(XLogReaderState *record)
 {
@@ -693,12 +540,11 @@ btree_xlog_delete(XLogReaderState *record)
 	 */
 	if (InHotStandby)
 	{
-		TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(record);
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
 	}
 
 	/*
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 534fac7bf2f..0318da88bc2 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -186,6 +186,11 @@ extern IndexScanDesc RelationGetIndexScan(Relation indexRelation,
 extern void IndexScanEnd(IndexScanDesc scan);
 extern char *BuildIndexValueDescription(Relation indexRelation,
 						   Datum *values, bool *isnull);
+extern TransactionId index_compute_xid_horizon_for_tuples(Relation irel,
+														  Relation hrel,
+														  Buffer ibuf,
+														  OffsetNumber *itemnos,
+														  int nitems);
 
 /*
  * heap-or-index access to system catalogs (in genam.c)
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 527138440b3..d46dc1a85b3 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -263,6 +263,7 @@ typedef struct xl_hash_init_bitmap_page
  */
 typedef struct xl_hash_vacuum_one_page
 {
+	TransactionId latestRemovedXid;
 	RelFileNode hnode;
 	int			ntuples;
 
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 64cfdbd2f06..af8612e625b 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -184,6 +184,10 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
 extern void heap_sync(Relation relation);
 extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
+extern TransactionId heap_compute_xid_horizon_for_tuples(Relation rel,
+														 ItemPointerData *items,
+														 int nitems);
+
 /* in heap/pruneheap.c */
 extern void heap_page_prune_opt(Relation relation, Buffer buffer);
 extern int heap_page_prune(Relation relation, Buffer buffer,
diff --git a/src/include/access/nbtxlog.h b/src/include/access/nbtxlog.h
index 819373031cd..ca2a729169a 100644
--- a/src/include/access/nbtxlog.h
+++ b/src/include/access/nbtxlog.h
@@ -123,6 +123,7 @@ typedef struct xl_btree_split
  */
 typedef struct xl_btree_delete
 {
+	TransactionId latestRemovedXid;
 	RelFileNode hnode;			/* RelFileNode of the heap the index currently
 								 * points at */
 	int			nitems;

Reply via email to