From 00f6a761a0a704e6e3e256bb54232e6be57ec39e Mon Sep 17 00:00:00 2001
From: Amit Kapila <amit.kapila@enterprisedb.com>
Date: Thu, 16 Feb 2017 17:54:51 +0530
Subject: [PATCH 1/5] Expose a new API _hash_pgaddmultitup.

The purpose of this API is to add multiple tuples on a page
at one shot.  This helps in making the free overflow operation
atomic which is a prerequisite for WAL logging.
---
 src/backend/access/hash/hashinsert.c |  41 ++++++++
 src/backend/access/hash/hashovfl.c   | 196 +++++++++++++++++++++++------------
 src/backend/access/hash/hashpage.c   |   1 -
 src/backend/storage/page/bufpage.c   |  27 +++++
 src/include/access/hash.h            |   7 +-
 src/include/storage/bufpage.h        |   1 +
 6 files changed, 203 insertions(+), 70 deletions(-)

diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index dc63063..354e733 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -228,3 +228,44 @@ _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
 
 	return itup_off;
 }
+
+/*
+ *	_hash_pgaddmultitup() -- add a tuple vector to a particular page in the
+ *							 index.
+ *
+ * This routine has same requirements for locking and tuple ordering as
+ * _hash_pgaddtup().
+ *
+ * Returns the offset number array at which the tuples were inserted.
+ */
+void
+_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+					OffsetNumber *itup_offsets, uint16 nitups)
+{
+	OffsetNumber itup_off;
+	Page		page;
+	uint32		hashkey;
+	int			i;
+
+	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+	page = BufferGetPage(buf);
+
+	for (i = 0; i < nitups; i++)
+	{
+		Size		itemsize;
+
+		itemsize = IndexTupleDSize(*itups[i]);
+		itemsize = MAXALIGN(itemsize);
+
+		/* Find where to insert the tuple (preserving page's hashkey ordering) */
+		hashkey = _hash_get_indextuple_hashkey(itups[i]);
+		itup_off = _hash_binsearch(page, hashkey);
+
+		itup_offsets[i] = itup_off;
+
+		if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
+			== InvalidOffsetNumber)
+			elog(ERROR, "failed to add index item to \"%s\"",
+				 RelationGetRelationName(rel));
+	}
+}
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
index 3334089..52491e0 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -391,6 +391,8 @@ _hash_firstfreebit(uint32 map)
  *	Remove this overflow page from its bucket's chain, and mark the page as
  *	free.  On entry, ovflbuf is write-locked; it is released before exiting.
  *
+ *	Add the tuples (itups) to wbuf.
+ *
  *	Since this function is invoked in VACUUM, we provide an access strategy
  *	parameter that controls fetches of the bucket pages.
  *
@@ -403,13 +405,16 @@ _hash_firstfreebit(uint32 map)
  *	has a lock on same.
  */
 BlockNumber
-_hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
+_hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
+				   Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
+				   Size *tups_size, uint16 nitups,
 				   BufferAccessStrategy bstrategy)
 {
 	HashMetaPage metap;
 	Buffer		metabuf;
 	Buffer		mapbuf;
 	Buffer		prevbuf = InvalidBuffer;
+	Buffer		nextbuf = InvalidBuffer;
 	BlockNumber ovflblkno;
 	BlockNumber prevblkno;
 	BlockNumber blkno;
@@ -435,15 +440,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
 	bucket = ovflopaque->hasho_bucket;
 
 	/*
-	 * Zero the page for debugging's sake; then write and release it. (Note:
-	 * if we failed to zero the page here, we'd have problems with the Assert
-	 * in _hash_pageinit() when the page is reused.)
-	 */
-	MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
-	MarkBufferDirty(ovflbuf);
-	_hash_relbuf(rel, ovflbuf);
-
-	/*
 	 * Fix up the bucket chain.  this is a doubly-linked list, so we must fix
 	 * up the bucket chain members behind and ahead of the overflow page being
 	 * deleted.  Concurrency issues are avoided by using lock chaining as
@@ -451,8 +447,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
 	 */
 	if (BlockNumberIsValid(prevblkno))
 	{
-		Page		prevpage;
-		HashPageOpaque prevopaque;
 
 		if (prevblkno == writeblkno)
 			prevbuf = wbuf;
@@ -462,32 +456,13 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
 												 HASH_WRITE,
 										   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
 												 bstrategy);
-
-		prevpage = BufferGetPage(prevbuf);
-		prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
-
-		Assert(prevopaque->hasho_bucket == bucket);
-		prevopaque->hasho_nextblkno = nextblkno;
-
-		MarkBufferDirty(prevbuf);
-		if (prevblkno != writeblkno)
-			_hash_relbuf(rel, prevbuf);
 	}
 	if (BlockNumberIsValid(nextblkno))
-	{
-		Buffer		nextbuf = _hash_getbuf_with_strategy(rel,
-														 nextblkno,
-														 HASH_WRITE,
-														 LH_OVERFLOW_PAGE,
-														 bstrategy);
-		Page		nextpage = BufferGetPage(nextbuf);
-		HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
-
-		Assert(nextopaque->hasho_bucket == bucket);
-		nextopaque->hasho_prevblkno = prevblkno;
-		MarkBufferDirty(nextbuf);
-		_hash_relbuf(rel, nextbuf);
-	}
+		nextbuf = _hash_getbuf_with_strategy(rel,
+											 nextblkno,
+											 HASH_WRITE,
+											 LH_OVERFLOW_PAGE,
+											 bstrategy);
 
 	/* Note: bstrategy is intentionally not used for metapage and bitmap */
 
@@ -508,25 +483,75 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
 	/* Release metapage lock while we access the bitmap page */
 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
 
-	/* Clear the bitmap bit to indicate that this overflow page is free */
+	/* read the bitmap page to clear the bitmap bit */
 	mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
 	mappage = BufferGetPage(mapbuf);
 	freep = HashPageGetBitmap(mappage);
 	Assert(ISSET(freep, bitmapbit));
-	CLRBIT(freep, bitmapbit);
-	MarkBufferDirty(mapbuf);
-	_hash_relbuf(rel, mapbuf);
 
 	/* Get write-lock on metapage to update firstfree */
 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
 
+	/*
+	 * we have to insert tuples on the "write" page, being careful to preserve
+	 * hashkey ordering.  (If we insert many tuples into the same "write" page
+	 * it would be worth qsort'ing them).
+	 */
+	if (nitups > 0)
+	{
+		_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+		MarkBufferDirty(wbuf);
+	}
+
+	/* Initialise the freed overflow page. */
+	_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+	MarkBufferDirty(ovflbuf);
+
+	if (BufferIsValid(prevbuf))
+	{
+		Page		prevpage = BufferGetPage(prevbuf);
+		HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+		Assert(prevopaque->hasho_bucket == bucket);
+		prevopaque->hasho_nextblkno = nextblkno;
+		MarkBufferDirty(prevbuf);
+	}
+	if (BufferIsValid(nextbuf))
+	{
+		Page		nextpage = BufferGetPage(nextbuf);
+		HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+		Assert(nextopaque->hasho_bucket == bucket);
+		nextopaque->hasho_prevblkno = prevblkno;
+		MarkBufferDirty(nextbuf);
+	}
+
+	/* Clear the bitmap bit to indicate that this overflow page is free */
+	CLRBIT(freep, bitmapbit);
+	MarkBufferDirty(mapbuf);
+
 	/* if this is now the first free page, update hashm_firstfree */
 	if (ovflbitno < metap->hashm_firstfree)
 	{
 		metap->hashm_firstfree = ovflbitno;
 		MarkBufferDirty(metabuf);
 	}
-	_hash_relbuf(rel, metabuf);
+
+	/* release previous bucket if it is not same as write bucket */
+	if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
+		_hash_relbuf(rel, prevbuf);
+
+	if (BufferIsValid(ovflbuf))
+		_hash_relbuf(rel, ovflbuf);
+
+	if (BufferIsValid(nextbuf))
+		_hash_relbuf(rel, nextbuf);
+
+	if (BufferIsValid(mapbuf))
+		_hash_relbuf(rel, mapbuf);
+
+	if (BufferIsValid(metabuf))
+		_hash_relbuf(rel, metabuf);
 
 	return nextblkno;
 }
@@ -640,7 +665,6 @@ _hash_squeezebucket(Relation rel,
 	Page		rpage;
 	HashPageOpaque wopaque;
 	HashPageOpaque ropaque;
-	bool		wbuf_dirty;
 
 	/*
 	 * start squeezing into the primary bucket page.
@@ -686,15 +710,23 @@ _hash_squeezebucket(Relation rel,
 	/*
 	 * squeeze the tuples.
 	 */
-	wbuf_dirty = false;
 	for (;;)
 	{
 		OffsetNumber roffnum;
 		OffsetNumber maxroffnum;
 		OffsetNumber deletable[MaxOffsetNumber];
-		int			ndeletable = 0;
+		IndexTuple	itups[MaxIndexTuplesPerPage];
+		Size		tups_size[MaxIndexTuplesPerPage];
+		OffsetNumber *itup_offsets;
+		uint16		ndeletable = 0;
+		uint16		nitups = 0;
+		Size		all_tups_size = 0;
+		int			i;
 		bool		retain_pin = false;
 
+		itup_offsets = (OffsetNumber *) palloc(MaxIndexTuplesPerPage * sizeof(OffsetNumber));
+
+readpage:
 		/* Scan each tuple in "read" page */
 		maxroffnum = PageGetMaxOffsetNumber(rpage);
 		for (roffnum = FirstOffsetNumber;
@@ -715,11 +747,13 @@ _hash_squeezebucket(Relation rel,
 
 			/*
 			 * Walk up the bucket chain, looking for a page big enough for
-			 * this item.  Exit if we reach the read page.
+			 * this item and all other accumulated items.  Exit if we reach
+			 * the read page.
 			 */
-			while (PageGetFreeSpace(wpage) < itemsz)
+			while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz))
 			{
 				Buffer		next_wbuf = InvalidBuffer;
+				bool		tups_moved = false;
 
 				Assert(!PageIsEmpty(wpage));
 
@@ -737,12 +771,30 @@ _hash_squeezebucket(Relation rel,
 														   LH_OVERFLOW_PAGE,
 														   bstrategy);
 
+				if (nitups > 0)
+				{
+					Assert(nitups == ndeletable);
+
+					/*
+					 * we have to insert tuples on the "write" page, being
+					 * careful to preserve hashkey ordering.  (If we insert
+					 * many tuples into the same "write" page it would be
+					 * worth qsort'ing them).
+					 */
+					_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+					MarkBufferDirty(wbuf);
+
+					/* Delete tuples we already moved off read page */
+					PageIndexMultiDelete(rpage, deletable, ndeletable);
+					MarkBufferDirty(rbuf);
+
+					tups_moved = true;
+				}
+
 				/*
 				 * release the lock on previous page after acquiring the lock
 				 * on next page
 				 */
-				if (wbuf_dirty)
-					MarkBufferDirty(wbuf);
 				if (retain_pin)
 					LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
 				else
@@ -751,12 +803,6 @@ _hash_squeezebucket(Relation rel,
 				/* nothing more to do if we reached the read page */
 				if (rblkno == wblkno)
 				{
-					if (ndeletable > 0)
-					{
-						/* Delete tuples we already moved off read page */
-						PageIndexMultiDelete(rpage, deletable, ndeletable);
-						MarkBufferDirty(rbuf);
-					}
 					_hash_relbuf(rel, rbuf);
 					return;
 				}
@@ -765,21 +811,33 @@ _hash_squeezebucket(Relation rel,
 				wpage = BufferGetPage(wbuf);
 				wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
 				Assert(wopaque->hasho_bucket == bucket);
-				wbuf_dirty = false;
 				retain_pin = false;
-			}
 
-			/*
-			 * we have found room so insert on the "write" page, being careful
-			 * to preserve hashkey ordering.  (If we insert many tuples into
-			 * the same "write" page it would be worth qsort'ing instead of
-			 * doing repeated _hash_pgaddtup.)
-			 */
-			(void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
-			wbuf_dirty = true;
+				/* be tidy */
+				for (i = 0; i < nitups; i++)
+					pfree(itups[i]);
+				nitups = 0;
+				all_tups_size = 0;
+				ndeletable = 0;
 
+				/*
+				 * after moving the tuples, rpage would have been compacted,
+				 * so we need to rescan it.
+				 */
+				if (tups_moved)
+					goto readpage;
+			}
 			/* remember tuple for deletion from "read" page */
 			deletable[ndeletable++] = roffnum;
+
+			/*
+			 * we need a copy of index tuples as they can be freed as part of
+			 * overflow page, however we need them to write a WAL record in
+			 * _hash_freeovflpage.
+			 */
+			itups[nitups] = CopyIndexTuple(itup);
+			tups_size[nitups++] = itemsz;
+			all_tups_size += itemsz;
 		}
 
 		/*
@@ -797,10 +855,14 @@ _hash_squeezebucket(Relation rel,
 		Assert(BlockNumberIsValid(rblkno));
 
 		/* free this overflow page (releases rbuf) */
-		_hash_freeovflpage(rel, rbuf, wbuf, bstrategy);
+		_hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets,
+						   tups_size, nitups, bstrategy);
+
+		/* be tidy */
+		for (i = 0; i < nitups; i++)
+			pfree(itups[i]);
 
-		if (wbuf_dirty)
-			MarkBufferDirty(wbuf);
+		pfree(itup_offsets);
 
 		/* are we freeing the page adjacent to wbuf? */
 		if (rblkno == wblkno)
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 9485978..00f3ea8 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -470,7 +470,6 @@ _hash_metapinit(Relation rel, double num_tuples, ForkNumber forkNum)
 void
 _hash_pageinit(Page page, Size size)
 {
-	Assert(PageIsNew(page));
 	PageInit(page, size, sizeof(HashPageOpaqueData));
 }
 
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 6fc5fa4..fdf045a 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -598,6 +598,33 @@ PageGetFreeSpace(Page page)
 }
 
 /*
+ * PageGetFreeSpaceForMultipleTuples
+ *		Returns the size of the free (allocatable) space on a page,
+ *		reduced by the space needed for multiple new line pointers.
+ *
+ * Note: this should usually only be used on index pages.  Use
+ * PageGetHeapFreeSpace on heap pages.
+ */
+Size
+PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
+{
+	int			space;
+
+	/*
+	 * Use signed arithmetic here so that we behave sensibly if pd_lower >
+	 * pd_upper.
+	 */
+	space = (int) ((PageHeader) page)->pd_upper -
+		(int) ((PageHeader) page)->pd_lower;
+
+	if (space < (int) (ntups * sizeof(ItemIdData)))
+		return 0;
+	space -= ntups * sizeof(ItemIdData);
+
+	return (Size) space;
+}
+
+/*
  * PageGetExactFreeSpace
  *		Returns the size of the free (allocatable) space on a page,
  *		without any consideration for adding/removing line pointers.
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 3bf587b..5767deb 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -303,11 +303,14 @@ extern Datum hash_uint32(uint32 k);
 extern void _hash_doinsert(Relation rel, IndexTuple itup);
 extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
 			   Size itemsize, IndexTuple itup);
+extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+					OffsetNumber *itup_offsets, uint16 nitups);
 
 /* hashovfl.c */
 extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin);
-extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
-				   BufferAccessStrategy bstrategy);
+extern BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
+				   Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
+			 Size *tups_size, uint16 nitups, BufferAccessStrategy bstrategy);
 extern void _hash_initbitmap(Relation rel, HashMetaPage metap,
 				 BlockNumber blkno, ForkNumber forkNum);
 extern void _hash_squeezebucket(Relation rel,
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 294f9cb..e956dc3 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -425,6 +425,7 @@ extern Page PageGetTempPageCopySpecial(Page page);
 extern void PageRestoreTempPage(Page tempPage, Page oldPage);
 extern void PageRepairFragmentation(Page page);
 extern Size PageGetFreeSpace(Page page);
+extern Size PageGetFreeSpaceForMultipleTuples(Page page, int ntups);
 extern Size PageGetExactFreeSpace(Page page);
 extern Size PageGetHeapFreeSpace(Page page);
 extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
-- 
1.8.4.msysgit.0

