Hi,

> While replaying the delete/vacuum record on standby, it can conflict
> with some already running queries.  Basically the replay can remove
> some row which can be visible on standby.  You need to resolve
> conflicts similar to what we do in btree delete records (refer
> btree_xlog_delete).

Agreed. Thanks for putting this point. I have taken care of it in the
attached v2 patch.

> + /*
> + * Write-lock the meta page so that we can decrement
> + * tuple count.
> + */
> + _hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
> +
> + _hash_vacuum_one_page(rel, metabuf, buf, bucket_buf,
> +  (buf == bucket_buf) ? true : false);
> +
> + _hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
>
> It seems here meta page lock is acquired for duration more than
> required and also it is not required when there are no deletable items
> on page. You can take the metapage lock before decrementing the count.

Ok. Corrected. Please refer to the attached v2 patch.


> Spurious space.  There are some other similar spurious white space
> changes in patch, remove them as well.

Corrected. Please refer attached v2 patch.
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index db73f05..4a4d614 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -157,7 +157,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	if (buildstate.spool)
 	{
 		/* sort the tuples and insert them into the index */
-		_h_indexbuild(buildstate.spool);
+		_h_indexbuild(buildstate.spool, heap->rd_node);
 		_h_spooldestroy(buildstate.spool);
 	}
 
@@ -196,6 +196,8 @@ hashbuildCallback(Relation index,
 	Datum		index_values[1];
 	bool		index_isnull[1];
 	IndexTuple	itup;
+	Relation	rel;
+	RelFileNode	rnode;
 
 	/* convert data to a hash key; on failure, do not insert anything */
 	if (!_hash_convert_tuple(index,
@@ -212,8 +214,12 @@ hashbuildCallback(Relation index,
 		/* form an index tuple and point it at the heap tuple */
 		itup = index_form_tuple(RelationGetDescr(index),
 								index_values, index_isnull);
+		/* Get RelfileNode from relation OID */
+		rel = relation_open(htup->t_tableOid, NoLock);
+		rnode = rel->rd_node;
+		relation_close(rel, NoLock);
 		itup->t_tid = htup->t_self;
-		_hash_doinsert(index, itup);
+		_hash_doinsert(index, itup, rnode);
 		pfree(itup);
 	}
 
@@ -245,7 +251,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull,
 	itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
 	itup->t_tid = *ht_ctid;
 
-	_hash_doinsert(rel, itup);
+	_hash_doinsert(rel, itup, heapRel->rd_node);
 
 	pfree(itup);
 
@@ -325,14 +331,21 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 		if (scan->kill_prior_tuple)
 		{
 			/*
-			 * Yes, so mark it by setting the LP_DEAD state in the item flags.
+			 * Yes, so remember it for later. (We'll deal with all such
+			 * tuples at once right after leaving the index page or at
+			 * end of scan.)
 			 */
-			ItemIdMarkDead(PageGetItemId(page, offnum));
+			if (so->killedItems == NULL)
+				so->killedItems = palloc(MaxIndexTuplesPerPage *
+										 sizeof(HashScanPosItem));
 
-			/*
-			 * Since this can be redone later if needed, mark as a hint.
-			 */
-			MarkBufferDirtyHint(buf, true);
+			if (so->numKilled < MaxIndexTuplesPerPage)
+			{
+				so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
+				so->killedItems[so->numKilled].indexOffset =
+							ItemPointerGetOffsetNumber(&(so->hashso_curpos));
+				so->numKilled++;
+			}
 		}
 
 		/*
@@ -439,6 +452,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
 
 	so->hashso_skip_moved_tuples = false;
 
+	so->killedItems = NULL;
+	so->numKilled = 0;
+
 	scan->opaque = so;
 
 	return scan;
@@ -454,6 +470,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
 	Relation	rel = scan->indexRelation;
 
+	/* Before leaving current page, deal with any killed items */
+	if (so->numKilled > 0)
+		hashkillitems(scan);
+
 	_hash_dropscanbuf(rel, so);
 
 	/* set position invalid (this will cause _hash_first call) */
@@ -480,6 +500,10 @@ hashendscan(IndexScanDesc scan)
 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
 	Relation	rel = scan->indexRelation;
 
+	/* Before leaving current page, deal with any killed items */
+	if (so->numKilled > 0)
+		hashkillitems(scan);
+
 	_hash_dropscanbuf(rel, so);
 
 	pfree(so);
@@ -809,6 +833,15 @@ hashbucketcleanup(Relation rel, Buffer bucket_buf,
 			PageIndexMultiDelete(page, deletable, ndeletable);
 			bucket_dirty = true;
 
+			/*
+			 * Let us mark the page as clean if vacuum removes the DEAD tuples
+			 * from an index page. We do this by clearing LH_PAGE_HAS_DEAD_TUPLES
+			 * flag.
+			 */
+			if (tuples_removed && *tuples_removed > 0 &&
+				opaque->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
+				opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
 			MarkBufferDirty(buf);
 
 			/* XLOG stuff */
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index d030a8d..c6dc20b 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -14,8 +14,13 @@
  */
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
 #include "access/hash_xlog.h"
 #include "access/xlogutils.h"
+#include "access/xlog.h"
+#include "access/transam.h"
+#include "storage/procarray.h"
+#include "miscadmin.h"
 
 /*
  * replay a hash index meta page
@@ -921,6 +926,247 @@ hash_xlog_update_meta_page(XLogReaderState *record)
 		UnlockReleaseBuffer(metabuf);
 }
 
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted. This puts the work for calculating latestRemovedXid
+ * into the recovery path rather than the primary path.
+ *
+ * It's possible that this generates a fair amount of I/O, since an index
+ * block may have hundreds of tuples being deleted. Repeat accesses to the
+ * same heap blocks are common, though are not yet optimised.
+ */
+static TransactionId
+hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
+{
+	xl_hash_vacuum	*xlrec = (xl_hash_vacuum *) XLogRecGetData(record);
+	OffsetNumber	*unused;
+	Buffer		ibuffer,
+				hbuffer;
+	Page		ipage,
+				hpage;
+	RelFileNode	rnode;
+	BlockNumber	blkno;
+	ItemId		iitemid,
+				hitemid;
+	IndexTuple	itup;
+	HeapTupleHeader	htuphdr;
+	BlockNumber	hblkno;
+	OffsetNumber	hoffnum;
+	TransactionId	latestRemovedXid = InvalidTransactionId;
+	int		i;
+	char *ptr;
+	Size len;
+
+	/*
+	 * If there's nothing running on the standby we don't need to derive a
+	 * full latestRemovedXid value, so use a fast path out of here.  This
+	 * returns InvalidTransactionId, and so will conflict with all HS
+	 * transactions; but since we just worked out that that's zero people,
+	 * it's OK.
+	 */
+	if (CountDBBackends(InvalidOid) == 0)
+		return latestRemovedXid;
+
+	/*
+	 * Get index page.  If the DB is consistent, this should not fail, nor
+	 * should any of the heap page fetches below.  If one does, we return
+	 * InvalidTransactionId to cancel all HS transactions.  That's probably
+	 * overkill, but it's safe, and certainly better than panicking here.
+	 */
+	XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
+	ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
+
+	if (!BufferIsValid(ibuffer))
+		return InvalidTransactionId;
+	LockBuffer(ibuffer, HASH_READ);
+	ipage = (Page) BufferGetPage(ibuffer);
+
+	/*
+	 * Loop through the deleted index items to obtain the TransactionId from
+	 * the heap items they point to.
+	 */
+	ptr = XLogRecGetBlockData(record, 1, &len);
+
+	unused = (OffsetNumber *) ptr;
+
+	for (i = 0; i < xlrec->ntuples; i++)
+	{
+		/*
+		 * Identify the index tuple about to be deleted.
+		 */
+		iitemid = PageGetItemId(ipage, unused[i]);
+		itup = (IndexTuple) PageGetItem(ipage, iitemid);
+
+		/*
+		 * Locate the heap page that the index tuple points at
+		 */
+		hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
+		hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
+										 hblkno, RBM_NORMAL);
+
+		if (!BufferIsValid(hbuffer))
+		{
+			UnlockReleaseBuffer(ibuffer);
+			return InvalidTransactionId;
+		}
+		LockBuffer(hbuffer, HASH_READ);
+		hpage = (Page) BufferGetPage(hbuffer);
+
+		/*
+		 * Look up the heap tuple header that the index tuple points at by
+		 * using the heap node supplied with the xlrec. We can't use
+		 * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
+		 * Note that we are not looking at tuple data here, just headers.
+		 */
+		hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
+		hitemid = PageGetItemId(hpage, hoffnum);
+
+		/*
+		 * Follow any redirections until we find something useful.
+		 */
+		while (ItemIdIsRedirected(hitemid))
+		{
+			hoffnum = ItemIdGetRedirect(hitemid);
+			hitemid = PageGetItemId(hpage, hoffnum);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/*
+		 * If the heap item has storage, then read the header and use that to
+		 * set latestRemovedXid.
+		 *
+		 * Some LP_DEAD items may not be accessible, so we ignore them.
+		 */
+		if (ItemIdHasStorage(hitemid))
+		{
+			htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
+			HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+		}
+		else if (ItemIdIsDead(hitemid))
+		{
+			/*
+			 * Conjecture: if hitemid is dead then it had xids before the xids
+			 * marked on LP_NORMAL items. So we just ignore this item and move
+			 * onto the next, for the purposes of calculating
+			 * latestRemovedxids.
+			 */
+		}
+		else
+			Assert(!ItemIdIsUsed(hitemid));
+
+		UnlockReleaseBuffer(hbuffer);
+	}
+
+	UnlockReleaseBuffer(ibuffer);
+
+	/*
+	 * If all heap tuples were LP_DEAD then we will be returning
+	 * InvalidTransactionId here, which avoids conflicts. This matches
+	 * existing logic which assumes that LP_DEAD tuples must already be older
+	 * than the latestRemovedXid on the cleanup record that set them as
+	 * LP_DEAD, hence must already have generated a conflict.
+	 */
+	return latestRemovedXid;
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+	XLogRecPtr lsn = record->EndRecPtr;
+	xl_hash_vacuum *xldata = (xl_hash_vacuum *) XLogRecGetData(record);
+	Buffer bucketbuf = InvalidBuffer;
+	Buffer buffer;
+	Buffer metabuf;
+	Page page;
+	XLogRedoAction action;
+
+	/*
+	 * If we have any conflict processing to do, it must happen before we
+	 * update the page.
+	 *
+	 * Hash Index delete records can conflict with standby queries.You might
+	 * think that vacuum records would conflict as well, but we've handled
+	 * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+	 * cleaned by the vacuum of the heap and so we can resolve any conflicts
+	 * just once when that arrives.  After that we know that no conflicts
+	 * exist from individual hash index vacuum records on that index.
+	 */
+	if (InHotStandby)
+	{
+		TransactionId latestRemovedXid =
+					hash_xlog_vacuum_get_latestRemovedXid(record);
+		RelFileNode rnode;
+
+		XLogRecGetBlockTag(record, 1, &rnode, NULL, NULL);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+	}
+
+	if (xldata->is_primary_bucket_page)
+		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL,
+											   true, &buffer);
+	else
+	{
+		RelFileNode rnode;
+		BlockNumber blkno;
+
+		XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+		bucketbuf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+										   RBM_NORMAL);
+
+		if (BufferIsValid(bucketbuf))
+			LockBufferForCleanup(bucketbuf);
+
+		action = XLogReadBufferForRedo(record, 1, &buffer);
+	}
+
+	if (action == BLK_NEEDS_REDO)
+	{
+		char *ptr;
+		Size len;
+
+		ptr = XLogRecGetBlockData(record, 1, &len);
+
+		page = (Page) BufferGetPage(buffer);
+
+		if (len > 0)
+		{
+			OffsetNumber *unused;
+			OffsetNumber *unend;
+
+			unused = (OffsetNumber *) ptr;
+			unend = (OffsetNumber *) ((char *) ptr + len);
+
+			if ((unend - unused) > 0)
+				PageIndexMultiDelete(page, unused, unend - unused);
+		}
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
+	{
+		Page metapage;
+		HashMetaPage metap;
+
+		metapage = BufferGetPage(metabuf);
+		metap = HashPageGetMeta(metapage);
+
+		metap->hashm_ntuples -= xldata->ntuples;
+
+		PageSetLSN(metapage, lsn);
+		MarkBufferDirty(metabuf);
+	}
+	if (BufferIsValid(metabuf))
+		UnlockReleaseBuffer(metabuf);
+}
+
 void
 hash_redo(XLogReaderState *record)
 {
@@ -964,6 +1210,9 @@ hash_redo(XLogReaderState *record)
 		case XLOG_HASH_UPDATE_META_PAGE:
 			hash_xlog_update_meta_page(record);
 			break;
+		case XLOG_HASH_VACUUM_ONE_PAGE:
+			hash_xlog_vacuum_one_page(record);
+			break;
 		default:
 			elog(PANIC, "hash_redo: unknown op code %u", info);
 	}
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 3514138..7435db0 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -19,7 +19,12 @@
 #include "access/hash_xlog.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
+#include "storage/lwlock.h"
+#include "storage/buf_internals.h"
 
+static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+								  Buffer bucket_buf, bool is_primary_bucket_page,
+								  RelFileNode hnode);
 
 /*
  *	_hash_doinsert() -- Handle insertion of a single index tuple.
@@ -28,7 +33,7 @@
  *		and hashinsert.  By here, itup is completely filled in.
  */
 void
-_hash_doinsert(Relation rel, IndexTuple itup)
+_hash_doinsert(Relation rel, IndexTuple itup, RelFileNode hnode)
 {
 	Buffer		buf = InvalidBuffer;
 	Buffer		bucket_buf;
@@ -206,6 +211,22 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 	while (PageGetFreeSpace(page) < itemsz)
 	{
 		/*
+		 * Check if current page has any DEAD tuples. If yes,
+		 * delete these tuples and see if we can get a space for
+		 * the new item to be inserted before moving to the next
+		 * page in the bucket chain.
+		 */
+		if (H_HAS_DEAD_TUPLES(pageopaque) && CheckBufferForCleanup(bucket_buf))
+		{
+			_hash_vacuum_one_page(rel, metabuf, buf, bucket_buf,
+								  (buf == bucket_buf) ? true : false,
+								  hnode);
+
+			if (PageGetFreeSpace(page) >= itemsz)
+				break;				/* OK, now we have enough space */
+		}
+
+		/*
 		 * no space on this page; check for an overflow page
 		 */
 		BlockNumber nextblkno = pageopaque->hasho_nextblkno;
@@ -247,7 +268,8 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 			Assert(PageGetFreeSpace(page) >= itemsz);
 		}
 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-		Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
+		Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE ||
+			   pageopaque->hasho_flag == (LH_OVERFLOW_PAGE | LH_PAGE_HAS_DEAD_TUPLES));
 		Assert(pageopaque->hasho_bucket == bucket);
 	}
 
@@ -390,3 +412,98 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
 				 RelationGetRelationName(rel));
 	}
 }
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ * Try to remove LP_DEAD items from the given page.  We
+ * must acquire cleanup lock on the primary bucket page
+ * before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+					  Buffer bucket_buf, bool is_primary_bucket_page,
+					  RelFileNode hnode)
+{
+	OffsetNumber	deletable[MaxOffsetNumber];
+	int ndeletable = 0;
+	OffsetNumber offnum,
+				 maxoff;
+	Page	page = BufferGetPage(buf);
+	HashPageOpaque	pageopaque;
+	HashMetaPage	metap;
+	double tuples_removed = 0;
+
+	/* Scan each tuple in page to see if it is marked as LP_DEAD */
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (offnum = FirstOffsetNumber;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId	itemId = PageGetItemId(page, offnum);
+
+		if (ItemIdIsDead(itemId))
+		{
+			deletable[ndeletable++] = offnum;
+			tuples_removed += 1;
+		}
+	}
+
+	if (ndeletable > 0)
+	{
+		/* No ereport(ERROR) until changes are logged */
+		START_CRIT_SECTION();
+
+		PageIndexMultiDelete(page, deletable, ndeletable);
+
+		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+		pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+		/*
+		 * Write-lock the meta page so that we can decrement
+		 * tuple count.
+		 */
+		_hash_chgbufaccess(rel, metabuf, HASH_NOLOCK, HASH_WRITE);
+
+		metap = HashPageGetMeta(BufferGetPage(metabuf));
+		metap->hashm_ntuples -= tuples_removed;
+
+		MarkBufferDirty(buf);
+		MarkBufferDirty(metabuf);
+
+		/* XLOG stuff */
+		if (RelationNeedsWAL(rel))
+		{
+			xl_hash_vacuum	xlrec;
+			XLogRecPtr	recptr;
+
+			xlrec.hnode = hnode;
+			xlrec.is_primary_bucket_page = is_primary_bucket_page;
+			xlrec.ntuples = tuples_removed;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, SizeOfHashVacuum);
+
+			/*
+			 * primary bucket buffer needs to be registered to ensure
+			 * that we acquire cleanup lock during replay.
+			 */
+			if (!xlrec.is_primary_bucket_page)
+				XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+			XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+			XLogRegisterBufData(1, (char *) deletable,
+						ndeletable * sizeof(OffsetNumber));
+
+			XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
+
+			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+			PageSetLSN(BufferGetPage(buf), recptr);
+			PageSetLSN(BufferGetPage(metabuf), recptr);
+		}
+
+		END_CRIT_SECTION();
+		_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_NOLOCK);
+	}
+}
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 0df64a8..316f891 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -473,6 +473,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 							break;		/* yes, so exit for-loop */
 					}
 
+					/* Before leaving current page, deal with any killed items */
+					if (so->numKilled > 0)
+						hashkillitems(scan);
+
 					/*
 					 * ran off the end of this page, try the next
 					 */
@@ -562,6 +566,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 							break;		/* yes, so exit for-loop */
 					}
 
+					/* Before leaving current page, deal with any killed items */
+					if (so->numKilled > 0)
+						hashkillitems(scan);
+
 					/*
 					 * ran off the end of this page, try the next
 					 */
diff --git a/src/backend/access/hash/hashsort.c b/src/backend/access/hash/hashsort.c
index 8938ab5..aa4c7b7 100644
--- a/src/backend/access/hash/hashsort.c
+++ b/src/backend/access/hash/hashsort.c
@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
  * create an entire index.
  */
 void
-_h_indexbuild(HSpool *hspool)
+_h_indexbuild(HSpool *hspool, RelFileNode rnode)
 {
 	IndexTuple	itup;
 	bool		should_free;
@@ -128,7 +128,7 @@ _h_indexbuild(HSpool *hspool)
 		Assert(hashkey >= lasthashkey);
 #endif
 
-		_hash_doinsert(hspool->index, itup);
+		_hash_doinsert(hspool->index, itup, rnode);
 		if (should_free)
 			pfree(itup);
 	}
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index b5164d7..4350e32 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -19,6 +19,7 @@
 #include "access/relscan.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "storage/buf_internals.h"
 
 
 /*
@@ -489,3 +490,72 @@ _hash_get_newbucket(Relation rel, Bucket curr_bucket,
 
 	return new_bucket;
 }
+
+/*
+ * hashkillitems - set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * scan->opaque, referenced locally through so, contains information about the
+ * current page and killed tuples thereon (generally, this should only be
+ * called if so->numKilled > 0).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete. If an item has moved off the current page due to a split, we'll
+ * fail to find it and do nothing (this is not an error case --- we assume
+ * the item will eventually get marked in a future indexscan).
+ */
+void
+hashkillitems(IndexScanDesc scan)
+{
+	HashScanOpaque	so = (HashScanOpaque) scan->opaque;
+	Page	page;
+	HashPageOpaque	opaque;
+	OffsetNumber	offnum, maxoff;
+	int	numKilled = so->numKilled;
+	int		i;
+	bool	killedsomething = false;
+
+	Assert(so->numKilled > 0);
+	Assert(so->killedItems != NULL);
+
+	/*
+	 * Always reset the scan state, so we don't look for same
+	 * items on other pages.
+	 */
+	so->numKilled = 0;
+
+	page = BufferGetPage(so->hashso_curbuf);
+	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	for (i = 0; i < numKilled; i++)
+	{
+		offnum = so->killedItems[i].indexOffset;
+
+		while (offnum <= maxoff)
+		{
+			ItemId	iid = PageGetItemId(page, offnum);
+			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
+
+			if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
+			{
+				/* found the item */
+				ItemIdMarkDead(iid);
+				killedsomething = true;
+				break;		/* out of inner search loop */
+			}
+			offnum = OffsetNumberNext(offnum);
+		}
+	}
+
+	/*
+	 * Since this can be redone later if needed, mark as dirty hint.
+	 * Whenever we mark anything LP_DEAD, we also set the page's
+	 * LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
+	 */
+	if (killedsomething)
+	{
+		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
+		MarkBufferDirtyHint(so->hashso_curbuf, true);
+	}
+}
diff --git a/src/backend/access/rmgrdesc/hashdesc.c b/src/backend/access/rmgrdesc/hashdesc.c
index 245ce97..7fc5721 100644
--- a/src/backend/access/rmgrdesc/hashdesc.c
+++ b/src/backend/access/rmgrdesc/hashdesc.c
@@ -155,6 +155,8 @@ hash_identify(uint8 info)
 		case XLOG_HASH_UPDATE_META_PAGE:
 			id = "UPDATE_META_PAGE";
 			break;
+		case XLOG_HASH_VACUUM_ONE_PAGE:
+			id = "VACUUM_ONE_PAGE";
 	}
 
 	return id;
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index c0434f5..6fc7cd0 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -57,6 +57,7 @@ typedef uint32 Bucket;
 #define LH_BUCKET_NEW_PAGE_SPLIT	(1 << 4)
 #define LH_BUCKET_OLD_PAGE_SPLIT	(1 << 5)
 #define LH_BUCKET_PAGE_HAS_GARBAGE	(1 << 6)
+#define LH_PAGE_HAS_DEAD_TUPLES	(1 << 7)
 
 typedef struct HashPageOpaqueData
 {
@@ -74,6 +75,7 @@ typedef HashPageOpaqueData *HashPageOpaque;
 #define H_NEW_INCOMPLETE_SPLIT(opaque)	((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT)
 #define H_INCOMPLETE_SPLIT(opaque)		(((opaque)->hasho_flag & LH_BUCKET_NEW_PAGE_SPLIT) || \
 										 ((opaque)->hasho_flag & LH_BUCKET_OLD_PAGE_SPLIT))
+#define H_HAS_DEAD_TUPLES(opaque)		((opaque)->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
 
 /*
  * The page ID is for the convenience of pg_filedump and similar utilities,
@@ -83,6 +85,13 @@ typedef HashPageOpaqueData *HashPageOpaque;
  */
 #define HASHO_PAGE_ID		0xFF80
 
+typedef struct HashScanPosItem    /* what we remember about each match */
+{
+	ItemPointerData heapTid;	/* TID of referenced heap item */
+	OffsetNumber indexOffset;	/* index item's location within page */
+} HashScanPosItem;
+
+
 /*
  *	HashScanOpaqueData is private state for a hash index scan.
  */
@@ -116,6 +125,10 @@ typedef struct HashScanOpaqueData
 
 	/* Whether scan needs to skip tuples that are moved by split */
 	bool		hashso_skip_moved_tuples;
+
+	/* info about killed items if any (killedItems is NULL if never used) */
+	HashScanPosItem	*killedItems;	/* tids and offset numbers of killed items */
+	int			numKilled;		/* number of currently stored items */
 } HashScanOpaqueData;
 
 typedef HashScanOpaqueData *HashScanOpaque;
@@ -177,6 +190,7 @@ typedef struct HashMetaPageData
 
 typedef HashMetaPageData *HashMetaPage;
 
+
 /*
  * Maximum size of a hash index item (it's okay to have only one per page)
  */
@@ -303,7 +317,7 @@ extern Datum hash_uint32(uint32 k);
 /* private routines */
 
 /* hashinsert.c */
-extern void _hash_doinsert(Relation rel, IndexTuple itup);
+extern void _hash_doinsert(Relation rel, IndexTuple itup, RelFileNode hnode);
 extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
 			   Size itemsize, IndexTuple itup);
 extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
@@ -361,7 +375,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets);
 extern void _h_spooldestroy(HSpool *hspool);
 extern void _h_spool(HSpool *hspool, ItemPointer self,
 		 Datum *values, bool *isnull);
-extern void _h_indexbuild(HSpool *hspool);
+extern void _h_indexbuild(HSpool *hspool, RelFileNode rnode);
 
 /* hashutil.c */
 extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
@@ -381,6 +395,7 @@ extern BlockNumber _hash_get_oldblk(Relation rel, HashPageOpaque opaque);
 extern BlockNumber _hash_get_newblk(Relation rel, HashPageOpaque opaque);
 extern Bucket _hash_get_newbucket(Relation rel, Bucket curr_bucket,
 					uint32 lowmask, uint32 maxbucket);
+extern void hashkillitems(IndexScanDesc scan);
 
 /* hash.c */
 extern void hashbucketcleanup(Relation rel, Buffer bucket_buf,
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index 30e16c0..b4d2bf2 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -43,6 +43,7 @@
 #define XLOG_HASH_UPDATE_META_PAGE	0xB0		/* update meta page after
 												 * vacuum */
 
+#define XLOG_HASH_VACUUM_ONE_PAGE	0xC0	/* remove dead tuples from index page */
 
 /*
  * xl_hash_split_allocpage flag values, 8 bits are available.
@@ -257,6 +258,25 @@ typedef struct xl_hash_init_bitmap_page
 #define SizeOfHashInitBitmapPage	\
 	(offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
 
+/*
+ * This is what we need for index tuple deletion and to
+ * update the meta page.
+ *
+ * This data record is used for XLOG_HASH_VACUUM_ONE_PAGE
+ *
+ * Backup Blk 0/1: bucket page
+ * Backup Blk 2: meta page
+ */
+typedef struct xl_hash_vacuum
+{
+	RelFileNode	hnode;
+	double		ntuples;
+	bool		is_primary_bucket_page;
+}	xl_hash_vacuum;
+
+#define SizeOfHashVacuum	\
+	(offsetof(xl_hash_vacuum, is_primary_bucket_page) + sizeof(bool))
+
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
 extern const char *hash_identify(uint8 info);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to