From 8fa4be92fcc91c40c45d1f6de9dfa70ccdd62f8f Mon Sep 17 00:00:00 2001
From: Amit Kapila <amit.kapila@enterprisedb.com>
Date: Fri, 17 Feb 2017 11:08:29 +0530
Subject: [PATCH] Enable WAL for Hash Indexes.

This will make hash index durable which means that data
for hash indexes can be recovered after crash.  This will
also allow us to replicate hash indexes.
---
 contrib/pageinspect/expected/hash.out          |   1 -
 contrib/pgstattuple/expected/pgstattuple.out   |   1 -
 doc/src/sgml/backup.sgml                       |  13 -
 doc/src/sgml/config.sgml                       |   7 +-
 doc/src/sgml/high-availability.sgml            |   6 -
 doc/src/sgml/indices.sgml                      |  12 -
 doc/src/sgml/ref/create_index.sgml             |  13 -
 src/backend/access/hash/Makefile               |   2 +-
 src/backend/access/hash/README                 | 144 +++-
 src/backend/access/hash/hash.c                 |  81 +-
 src/backend/access/hash/hash_xlog.c            | 974 +++++++++++++++++++++++++
 src/backend/access/hash/hashinsert.c           |  59 +-
 src/backend/access/hash/hashovfl.c             | 243 +++++-
 src/backend/access/hash/hashpage.c             | 219 +++++-
 src/backend/access/hash/hashsearch.c           |   5 +
 src/backend/access/rmgrdesc/hashdesc.c         | 134 +++-
 src/backend/commands/indexcmds.c               |   5 -
 src/backend/utils/cache/relcache.c             |  12 +-
 src/include/access/hash_xlog.h                 | 232 ++++++
 src/test/regress/expected/create_index.out     |   5 -
 src/test/regress/expected/enum.out             |   1 -
 src/test/regress/expected/hash_index.out       |   2 -
 src/test/regress/expected/macaddr.out          |   1 -
 src/test/regress/expected/replica_identity.out |   1 -
 src/test/regress/expected/uuid.out             |   1 -
 25 files changed, 2046 insertions(+), 128 deletions(-)
 create mode 100644 src/backend/access/hash/hash_xlog.c

diff --git a/contrib/pageinspect/expected/hash.out b/contrib/pageinspect/expected/hash.out
index 8ed60bc..3ba01f6 100644
--- a/contrib/pageinspect/expected/hash.out
+++ b/contrib/pageinspect/expected/hash.out
@@ -1,7 +1,6 @@
 CREATE TABLE test_hash (a int, b text);
 INSERT INTO test_hash VALUES (1, 'one');
 CREATE INDEX test_hash_a_idx ON test_hash USING hash (a);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 \x
 SELECT hash_page_type(get_raw_page('test_hash_a_idx', 0));
 -[ RECORD 1 ]--+---------
diff --git a/contrib/pgstattuple/expected/pgstattuple.out b/contrib/pgstattuple/expected/pgstattuple.out
index 169d193..7d7ade8 100644
--- a/contrib/pgstattuple/expected/pgstattuple.out
+++ b/contrib/pgstattuple/expected/pgstattuple.out
@@ -131,7 +131,6 @@ select * from pgstatginindex('test_ginidx');
 (1 row)
 
 create index test_hashidx on test using hash (b);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 select * from pgstathashindex('test_hashidx');
  version | bucket_pages | overflow_pages | bitmap_pages | zero_pages | live_items | dead_items | free_percent 
 ---------+--------------+----------------+--------------+------------+------------+------------+--------------
diff --git a/doc/src/sgml/backup.sgml b/doc/src/sgml/backup.sgml
index 5f009ee..5a7c730 100644
--- a/doc/src/sgml/backup.sgml
+++ b/doc/src/sgml/backup.sgml
@@ -1537,19 +1537,6 @@ archive_command = 'local_backup_script.sh "%p" "%f"'
   <itemizedlist>
    <listitem>
     <para>
-     Operations on hash indexes are not presently WAL-logged, so
-     replay will not update these indexes.  This will mean that any new inserts
-     will be ignored by the index, updated rows will apparently disappear and
-     deleted rows will still retain pointers. In other words, if you modify a
-     table with a hash index on it then you will get incorrect query results
-     on a standby server.  When recovery completes it is recommended that you
-     manually <xref linkend="sql-reindex">
-     each such index after completing a recovery operation.
-    </para>
-   </listitem>
-
-   <listitem>
-    <para>
      If a <xref linkend="sql-createdatabase">
      command is executed while a base backup is being taken, and then
      the template database that the <command>CREATE DATABASE</> copied
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 95afc2c..e970671 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2152,10 +2152,9 @@ include_dir 'conf.d'
          has materialized a result set, no error will be generated even if the
          underlying rows in the referenced table have been vacuumed away.
          Some tables cannot safely be vacuumed early, and so will not be
-         affected by this setting.  Examples include system catalogs and any
-         table which has a hash index.  For such tables this setting will
-         neither reduce bloat nor create a possibility of a <literal>snapshot
-         too old</> error on scanning.
+         affected by this setting.  Example include system catalogs.  For
+         such tables this setting will neither reduce bloat nor create a
+         possibility of a <literal>snapshot too old</> error on scanning.
         </para>
        </listitem>
       </varlistentry>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 48de2ce..0e61991 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -2353,12 +2353,6 @@ LOG:  database system is ready to accept read only connections
   <itemizedlist>
    <listitem>
     <para>
-     Operations on hash indexes are not presently WAL-logged, so
-     replay will not update these indexes.
-    </para>
-   </listitem>
-   <listitem>
-    <para>
      Full knowledge of running transactions is required before snapshots
      can be taken. Transactions that use large numbers of subtransactions
      (currently greater than 64) will delay the start of read only
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 271c135..e40750e 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -193,18 +193,6 @@ CREATE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable>
 </synopsis>
   </para>
 
-  <caution>
-   <para>
-    Hash index operations are not presently WAL-logged,
-    so hash indexes might need to be rebuilt with <command>REINDEX</>
-    after a database crash if there were unwritten changes.
-    Also, changes to hash indexes are not replicated over streaming or
-    file-based replication after the initial base backup, so they
-    give wrong answers to queries that subsequently use them.
-    For these reasons, hash index use is presently discouraged.
-   </para>
-  </caution>
-
   <para>
    <indexterm>
     <primary>index</primary>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index fcb7a60..7163b03 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -510,19 +510,6 @@ Indexes:
    they can be useful.
   </para>
 
-  <caution>
-   <para>
-    Hash index operations are not presently WAL-logged,
-    so hash indexes might need to be rebuilt with <command>REINDEX</>
-    after a database crash if there were unwritten changes.
-    Also, changes to hash indexes are not replicated over streaming or
-    file-based replication after the initial base backup, so they
-    give wrong answers to queries that subsequently use them.
-    Hash indexes are also not properly restored during point-in-time
-    recovery.  For these reasons, hash index use is presently discouraged.
-   </para>
-  </caution>
-
   <para>
    Currently, only the B-tree, GiST, GIN, and BRIN index methods support
    multicolumn indexes. Up to 32 fields can be specified by default.
diff --git a/src/backend/access/hash/Makefile b/src/backend/access/hash/Makefile
index e2e7e91..b154569 100644
--- a/src/backend/access/hash/Makefile
+++ b/src/backend/access/hash/Makefile
@@ -13,6 +13,6 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = hash.o hashfunc.o hashinsert.o hashovfl.o hashpage.o hashsearch.o \
-       hashsort.o hashutil.o hashvalidate.o
+       hashsort.o hashutil.o hashvalidate.o hash_xlog.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/hash/README b/src/backend/access/hash/README
index 703ae98..1c1f0db 100644
--- a/src/backend/access/hash/README
+++ b/src/backend/access/hash/README
@@ -287,13 +287,17 @@ The insertion algorithm is rather similar:
 	if current page is full, release lock but not pin, read/exclusive-lock
      next page; repeat as needed
 	>> see below if no space in any page of bucket
+	take buffer content lock in exclusive mode on metapage
 	insert tuple at appropriate place in page
-	mark current page dirty and release buffer content lock and pin
-	if the current page is not a bucket page, release the pin on bucket page
-	pin meta page and take buffer content lock in exclusive mode
+	mark current page dirty
 	increment tuple count, decide if split needed
-	mark meta page dirty and release buffer content lock and pin
-	done if no split needed, else enter Split algorithm below
+	mark meta page dirty
+	write WAL for insertion of tuple
+	release the buffer content lock on metapage
+	release buffer content lock on current page
+	if current page is not a bucket page, release the pin on bucket page
+	if split is needed, enter Split algorithm below
+	release the pin on metapage
 
 To speed searches, the index entries within any individual index page are
 kept sorted by hash code; the insertion code must take care to insert new
@@ -328,12 +332,17 @@ existing bucket in two, thereby lowering the fill ratio:
        try to finish the split and the cleanup work
        if that succeeds, start over; if it fails, give up
 	mark the old and new buckets indicating split is in progress
+	mark both old and new buckets as dirty
+	write WAL for allocation of new page for split
 	copy the tuples that belongs to new bucket from old bucket, marking
      them as moved-by-split
+	write WAL record for moving tuples to new page once the new page is full
+	or all the pages of old bucket are finished
 	release lock but not pin for primary bucket page of old bucket,
 	 read/shared-lock next page; repeat as needed
 	clear the bucket-being-split and bucket-being-populated flags
 	mark the old bucket indicating split-cleanup
+	write WAL for changing the flags on both old and new buckets
 
 The split operation's attempt to acquire cleanup-lock on the old bucket number
 could fail if another process holds any lock or pin on it.  We do not want to
@@ -369,6 +378,8 @@ The fourth operation is garbage collection (bulk deletion):
 		acquire cleanup lock on primary bucket page
 		loop:
 			scan and remove tuples
+			mark the target page dirty
+			write WAL for deleting tuples from target page
 			if this is the last bucket page, break out of loop
 			pin and x-lock next page
 			release prior lock and pin (except keep pin on primary bucket page)
@@ -383,7 +394,8 @@ The fourth operation is garbage collection (bulk deletion):
 	check if number of buckets changed
 	if so, release content lock and pin and return to for-each-bucket loop
 	else update metapage tuple count
-	mark meta page dirty and release buffer content lock and pin
+	 mark meta page dirty and write WAL for update of metapage
+	 release buffer content lock and pin
 
 Note that this is designed to allow concurrent splits and scans.  If a split
 occurs, tuples relocated into the new bucket will be visited twice by the
@@ -425,18 +437,16 @@ Obtaining an overflow page:
 	search for a free page (zero bit in bitmap)
 	if found:
 		set bit in bitmap
-		mark bitmap page dirty and release content lock
+		mark bitmap page dirty
 		take metapage buffer content lock in exclusive mode
 		if first-free-bit value did not change,
 			update it and mark meta page dirty
-		release meta page buffer content lock
-		return page number
 	else (not found):
 	release bitmap page buffer content lock
 	loop back to try next bitmap page, if any
 -- here when we have checked all bitmap pages; we hold meta excl. lock
 	extend index to add another overflow page; update meta information
-	mark meta page dirty and release buffer content lock
+	mark meta page dirty
 	return page number
 
 It is slightly annoying to release and reacquire the metapage lock
@@ -456,12 +466,15 @@ like this:
 
 	-- having determined that no space is free in the target bucket:
 	remember last page of bucket, drop write lock on it
-	call free-page-acquire routine
 	re-write-lock last page of bucket
 	if it is not last anymore, step to the last page
-	update (former) last page to point to new page
+	execute free-page-acquire (Obtaining an overflow page) mechanism described above
+	update (former) last page to point to the new page and mark the  buffer dirty.
 	write-lock and initialize new page, with back link to former last page
-	write and release former last page
+	write WAL for addition of overflow page
+	release the locks on meta page and bitmap page acquired in free-page-acquire algorithm
+	release the lock on former last page
+	release the lock on new overflow page
 	insert tuple into new page
 	-- etc.
 
@@ -488,12 +501,14 @@ accessors of pages in the bucket.  The algorithm is:
 	determine which bitmap page contains the free space bit for page
 	release meta page buffer content lock
 	pin bitmap page and take buffer content lock in exclusive mode
-	update bitmap bit
-	mark bitmap page dirty and release buffer content lock and pin
-	if page number is less than what we saw as first-free-bit in meta:
 	retake meta page buffer content lock in exclusive mode
+	move (insert) tuples that belong to the overflow page being freed
+	update bitmap bit
+	mark bitmap page dirty
 	if page number is still less than first-free-bit,
 		update first-free-bit field and mark meta page dirty
+	write WAL for delinking overflow page operation
+	release buffer content lock and pin
 	release meta page buffer content lock and pin
 
 We have to do it this way because we must clear the bitmap bit before
@@ -504,8 +519,101 @@ page acquirer will scan more bitmap bits than he needs to.  What must be
 avoided is having first-free-bit greater than the actual first free bit,
 because then that free page would never be found by searchers.
 
-All the freespace operations should be called while holding no buffer
-locks.  Since they need no lmgr locks, deadlock is not possible.
+The reason of moving tuples from overflow page while delinking the later is
+to make that as an atomic operation.  Not doing so could lead to spurious reads
+on standby.  Basically, the user might see the same tuple twice.
+
+
+WAL Considerations
+------------------
+
+The hash index operations like create index, insert, delete, bucket split,
+allocate overflow page, squeeze (overflow pages are freed in this operation)
+in themselves doesn't guarantee hash index consistency after a crash.  To
+provide robustness, we write WAL for each of these operations which get
+replayed on crash recovery.
+
+Multiple WAL records are being written for create index operation, first for
+initializing the metapage, followed by one for each new bucket created during
+operation followed by one for initializing the bitmap page.  If the system
+crashes after any operation, the whole operation is rolled back.  We have
+considered to write a single WAL record for the whole operation, but for that
+we need to limit the number of initial buckets that can be created during the
+operation. As we can log only the fixed number of pages XLR_MAX_BLOCK_ID (32)
+with current XLog machinery, it is better to write multiple WAL records for this
+operation.  The downside of restricting the number of buckets is that we need
+to perform the split operation if the number of tuples are more than what can be
+accomodated in the initial set of buckets and it is not unusual to have large
+number of tuples during create index operation.
+
+Ordinary item insertions (that don't force a page split or need a new overflow
+page) are single WAL entries.  They touch a single bucket page and meta page,
+metapage is updated during replay as it is updated during original operation.
+
+An insertion that causes an addition of an overflow page is logged as a single
+WAL entry preceded by a WAL entry for new overflow page required to insert
+a tuple.  There is a corner case where by the time we try to use newly
+allocated overflow page, it already gets used by concurrent insertions, for
+such a case, a new overflow page will be allocated and a separate WAL entry
+will be made for the same.
+
+An insertion that causes a bucket split is logged as a single WAL entry,
+followed by a WAL entry for allocating a new bucket, followed by a WAL entry
+for each overflow bucket page in the new bucket to which the tuples are moved
+from old bucket, followed by a WAL entry to indicate that split is complete
+for both old and new buckets.
+
+A split operation which requires overflow pages to complete the operation
+will need to write a WAL record for each new allocation of an overflow page.
+
+As splitting involves multiple atomic actions, it's possible that the
+system crashes between moving tuples from bucket pages of the old bucket to new
+bucket.  In such a case, after recovery, both the old and new buckets will be
+marked with bucket-being-split and bucket-being-populated flags respectively
+which indicates that split is in progress for those buckets.  The reader
+algorithm works correctly, as it will scan both the old and new buckets when
+the split is in progress as explained in the reader algorithm section above.
+
+We finish the split at next insert or split operation on the old bucket as
+explained in insert and split algorithm above.  It could be done during
+searches, too, but it seems best not to put any extra updates in what would
+otherwise be a read-only operation (updating is not possible in hot standby
+mode anyway).  It would seem natural to complete the split in VACUUM, but since
+splitting a bucket might require allocating a new page, it might fail if you
+run out of disk space.  That would be bad during VACUUM - the reason for
+running VACUUM in the first place might be that you run out of disk space,
+and now VACUUM won't finish because you're out of disk space.  In contrast,
+an insertion can require enlarging the physical file anyway.
+
+Deletion of tuples from a bucket is performed for two reasons, one for
+removing the dead tuples and other for removing the tuples that are moved by
+split.  WAL entry is made for each bucket page from which tuples are removed,
+followed by a WAL entry to clear the garbage flag if the tuples moved by split
+are removed.  Another separate WAL entry is made for updating the metapage if
+the deletion is performed for removing the dead tuples by vacuum.
+
+As deletion involves multiple atomic operations, it is quite possible that
+system crashes after (a) removing tuples from some of the bucket pages
+(b) before clearing the garbage flag (c) before updating the metapage.  If the
+system crashes before completing (b), it will again try to clean the bucket
+during next vacuum or insert after recovery which can have some performance
+impact, but it will work fine. If the system crashes before completing (c),
+after recovery there could be some additional splits till the next vacuum
+updates the metapage, but the other operations like insert, delete and scan
+will work correctly.  We can fix this problem by actually updating the metapage
+based on delete operation during replay, but not sure if it is worth the
+complication.
+
+Squeeze operation moves tuples from one of the buckets later in the chain to
+one of the bucket earlier in chain and writes WAL record when either the
+bucket to which it is writing tuples is filled or bucket from which it
+is removing the tuples becomes empty.
+
+As Squeeze operation involves writing multiple atomic operations, it is
+quite possible, that system crashes before completing the operation on
+entire bucket.  After recovery, the operations will work correctly, but
+the index will remain bloated and can impact performance of read and
+insert operations until the next vacuum squeeze the bucket completely.
 
 
 Other Notes
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 1f8a7f6..7f030f8 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -28,6 +28,7 @@
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/rel.h"
+#include "miscadmin.h"
 
 
 /* Working state for hashbuild and its callback */
@@ -303,6 +304,11 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
 		buf = so->hashso_curbuf;
 		Assert(BufferIsValid(buf));
 		page = BufferGetPage(buf);
+
+		/*
+		 * We don't need test for old snapshot here as the current buffer is
+		 * pinned, so vacuum can't clean the page.
+		 */
 		maxoffnum = PageGetMaxOffsetNumber(page);
 		for (offnum = ItemPointerGetOffsetNumber(current);
 			 offnum <= maxoffnum;
@@ -623,6 +629,7 @@ loop_top:
 	}
 
 	/* Okay, we're really done.  Update tuple count in metapage. */
+	START_CRIT_SECTION();
 
 	if (orig_maxbucket == metap->hashm_maxbucket &&
 		orig_ntuples == metap->hashm_ntuples)
@@ -649,6 +656,26 @@ loop_top:
 	}
 
 	MarkBufferDirty(metabuf);
+
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_update_meta_page xlrec;
+		XLogRecPtr	recptr;
+
+		xlrec.ntuples = metap->hashm_ntuples;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, sizeof(SizeOfHashUpdateMetaPage));
+
+		XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
+		PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
+	END_CRIT_SECTION();
+
 	_hash_relbuf(rel, metabuf);
 
 	/* return statistics */
@@ -816,9 +843,40 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
 		 */
 		if (ndeletable > 0)
 		{
+			/* No ereport(ERROR) until changes are logged */
+			START_CRIT_SECTION();
+
 			PageIndexMultiDelete(page, deletable, ndeletable);
 			bucket_dirty = true;
 			MarkBufferDirty(buf);
+
+			/* XLOG stuff */
+			if (RelationNeedsWAL(rel))
+			{
+				xl_hash_delete xlrec;
+				XLogRecPtr	recptr;
+
+				xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
+
+				XLogBeginInsert();
+				XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
+
+				/*
+				 * bucket buffer needs to be registered to ensure that we can
+				 * acquire a cleanup lock on it during replay.
+				 */
+				if (!xlrec.is_primary_bucket_page)
+					XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+				XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+				XLogRegisterBufData(1, (char *) deletable,
+									ndeletable * sizeof(OffsetNumber));
+
+				recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
+				PageSetLSN(BufferGetPage(buf), recptr);
+			}
+
+			END_CRIT_SECTION();
 		}
 
 		/* bail out if there are no more pages to scan. */
@@ -866,8 +924,25 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
 		page = BufferGetPage(bucket_buf);
 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 
+		/* No ereport(ERROR) until changes are logged */
+		START_CRIT_SECTION();
+
 		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
 		MarkBufferDirty(bucket_buf);
+
+		/* XLOG stuff */
+		if (RelationNeedsWAL(rel))
+		{
+			XLogRecPtr	recptr;
+
+			XLogBeginInsert();
+			XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
+			PageSetLSN(page, recptr);
+		}
+
+		END_CRIT_SECTION();
 	}
 
 	/*
@@ -881,9 +956,3 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
 	else
 		LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
 }
-
-void
-hash_redo(XLogReaderState *record)
-{
-	elog(PANIC, "hash_redo: unimplemented");
-}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
new file mode 100644
index 0000000..aff9b53
--- /dev/null
+++ b/src/backend/access/hash/hash_xlog.c
@@ -0,0 +1,974 @@
+/*-------------------------------------------------------------------------
+ *
+ * hash_xlog.c
+ *	  WAL replay logic for hash index.
+ *
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/hash/hash_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "access/xlogutils.h"
+
+/*
+ * replay a hash index meta page
+ */
+static void
+hash_xlog_init_meta_page(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	Page		page;
+	Buffer		metabuf;
+
+	xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
+
+	/* create the index' metapage */
+	metabuf = XLogInitBufferForRedo(record, 0);
+	Assert(BufferIsValid(metabuf));
+	_hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
+						  xlrec->ffactor, true);
+	page = (Page) BufferGetPage(metabuf);
+	PageSetLSN(page, lsn);
+	MarkBufferDirty(metabuf);
+	/* all done */
+	UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index bitmap page
+ */
+static void
+hash_xlog_init_bitmap_page(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	Buffer		bitmapbuf;
+	Buffer		metabuf;
+	Page		page;
+	HashMetaPage metap;
+	uint32		num_buckets;
+
+	xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
+
+	/*
+	 * Initialize bitmap page
+	 */
+	bitmapbuf = XLogInitBufferForRedo(record, 0);
+	_hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
+	PageSetLSN(BufferGetPage(bitmapbuf), lsn);
+	MarkBufferDirty(bitmapbuf);
+	UnlockReleaseBuffer(bitmapbuf);
+
+	/* add the new bitmap page to the metapage's list of bitmaps */
+	if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+	{
+		/*
+		 * Note: in normal operation, we'd update the metapage while still
+		 * holding lock on the page we inserted into.  But during replay it's
+		 * not necessary to hold that lock, since no other index updates can
+		 * be happening concurrently.
+		 */
+		page = BufferGetPage(metabuf);
+		metap = HashPageGetMeta(page);
+
+		num_buckets = metap->hashm_maxbucket + 1;
+		metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
+		metap->hashm_nmaps++;
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(metabuf);
+	}
+	if (BufferIsValid(metabuf))
+		UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index insert without split
+ */
+static void
+hash_xlog_insert(XLogReaderState *record)
+{
+	HashMetaPage metap;
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		Size		datalen;
+		char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);
+
+		page = BufferGetPage(buffer);
+
+		if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
+						false, false) == InvalidOffsetNumber)
+			elog(PANIC, "hash_insert_redo: failed to add item");
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
+	{
+		/*
+		 * Note: in normal operation, we'd update the metapage while still
+		 * holding lock on the page we inserted into.  But during replay it's
+		 * not necessary to hold that lock, since no other index updates can
+		 * be happening concurrently.
+		 */
+		page = BufferGetPage(buffer);
+		metap = HashPageGetMeta(page);
+		metap->hashm_ntuples += 1;
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay addition of overflow page for hash index
+ */
+static void
+hash_xlog_addovflpage(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_addovflpage *xlrec = (xl_hash_addovflpage *) XLogRecGetData(record);
+	Buffer		leftbuf;
+	Buffer		ovflbuf;
+	Buffer		metabuf;
+	BlockNumber leftblk;
+	BlockNumber rightblk;
+	BlockNumber newmapblk = InvalidBlockNumber;
+	Page		ovflpage;
+	HashPageOpaque ovflopaque;
+	uint32	   *num_bucket;
+	char	   *data;
+	Size datalen PG_USED_FOR_ASSERTS_ONLY;
+	bool		new_bmpage = false;
+
+	XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
+	XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
+
+	ovflbuf = XLogInitBufferForRedo(record, 0);
+	Assert(BufferIsValid(ovflbuf));
+
+	data = XLogRecGetBlockData(record, 0, &datalen);
+	num_bucket = (uint32 *) data;
+	Assert(datalen == sizeof(uint32));
+	_hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
+				  true);
+	/* update backlink */
+	ovflpage = BufferGetPage(ovflbuf);
+	ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+	ovflopaque->hasho_prevblkno = leftblk;
+
+	PageSetLSN(ovflpage, lsn);
+	MarkBufferDirty(ovflbuf);
+
+	if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
+	{
+		Page		leftpage;
+		HashPageOpaque leftopaque;
+
+		leftpage = BufferGetPage(leftbuf);
+		leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
+		leftopaque->hasho_nextblkno = rightblk;
+
+		PageSetLSN(leftpage, lsn);
+		MarkBufferDirty(leftbuf);
+	}
+
+	/*
+	 * We need to release the locks once the prev pointer of overflow bucket
+	 * and next of left bucket are set, otherwise concurrent read might skip
+	 * the bucket.
+	 */
+	if (BufferIsValid(leftbuf))
+		UnlockReleaseBuffer(leftbuf);
+	UnlockReleaseBuffer(ovflbuf);
+
+	/*
+	 * Note: in normal operation, we'd update the bitmap and meta page while
+	 * still holding lock on the overflow pages we updated.  But during replay
+	 * it's not necessary to hold those locks, since no other index updates
+	 * can be happening concurrently.
+	 */
+	if (XLogRecHasBlockRef(record, 2))
+	{
+		Buffer		mapbuffer;
+
+		if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
+		{
+			Page		mappage = (Page) BufferGetPage(mapbuffer);
+			uint32	   *freep = NULL;
+			char	   *data;
+			uint32	   *bitmap_page_bit;
+
+			freep = HashPageGetBitmap(mappage);
+
+			data = XLogRecGetBlockData(record, 2, &datalen);
+			bitmap_page_bit = (uint32 *) data;
+
+			SETBIT(freep, *bitmap_page_bit);
+
+			PageSetLSN(mappage, lsn);
+			MarkBufferDirty(mapbuffer);
+		}
+		if (BufferIsValid(mapbuffer))
+			UnlockReleaseBuffer(mapbuffer);
+	}
+
+	if (XLogRecHasBlockRef(record, 3))
+	{
+		Buffer		newmapbuf;
+
+		newmapbuf = XLogInitBufferForRedo(record, 3);
+
+		_hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
+
+		new_bmpage = true;
+		newmapblk = BufferGetBlockNumber(newmapbuf);
+
+		MarkBufferDirty(newmapbuf);
+		PageSetLSN(BufferGetPage(newmapbuf), lsn);
+
+		UnlockReleaseBuffer(newmapbuf);
+	}
+
+	if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
+	{
+		HashMetaPage metap;
+		Page		page;
+		uint32	   *firstfree_ovflpage;
+
+		data = XLogRecGetBlockData(record, 4, &datalen);
+		firstfree_ovflpage = (uint32 *) data;
+
+		page = BufferGetPage(metabuf);
+		metap = HashPageGetMeta(page);
+		metap->hashm_firstfree = *firstfree_ovflpage;
+
+		if (!xlrec->bmpage_found)
+		{
+			metap->hashm_spares[metap->hashm_ovflpoint]++;
+
+			if (new_bmpage)
+			{
+				Assert(BlockNumberIsValid(newmapblk));
+
+				metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
+				metap->hashm_nmaps++;
+				metap->hashm_spares[metap->hashm_ovflpoint]++;
+			}
+		}
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(metabuf);
+	}
+	if (BufferIsValid(metabuf))
+		UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay allocation of page for split operation
+ */
+static void
+hash_xlog_split_allocpage(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_split_allocpage *xlrec = (xl_hash_split_allocpage *) XLogRecGetData(record);
+	Buffer		oldbuf;
+	Buffer		newbuf;
+	Buffer		metabuf;
+	Size datalen PG_USED_FOR_ASSERTS_ONLY;
+	char	   *data;
+	XLogRedoAction action;
+
+	/* replay the record for old bucket */
+	action = XLogReadBufferForRedo(record, 0, &oldbuf);
+
+	/*
+	 * Note that we still update the page even if it was restored from a full
+	 * page image, because the bucket flag is not included in the image.
+	 */
+	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+	{
+		Page		oldpage;
+		HashPageOpaque oldopaque;
+
+		oldpage = BufferGetPage(oldbuf);
+		oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+		oldopaque->hasho_flag = xlrec->old_bucket_flag;
+		oldopaque->hasho_prevblkno = xlrec->new_bucket;
+
+		PageSetLSN(oldpage, lsn);
+		MarkBufferDirty(oldbuf);
+	}
+
+	/*
+	 * There is no harm in releasing the lock on old bucket before new bucket
+	 * during replay as no other bucket splits can be happening concurrently.
+	 */
+	if (BufferIsValid(oldbuf))
+		UnlockReleaseBuffer(oldbuf);
+
+	/* replay the record for new bucket */
+	newbuf = XLogInitBufferForRedo(record, 1);
+	_hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
+				  xlrec->new_bucket_flag, true);
+	MarkBufferDirty(newbuf);
+	PageSetLSN(BufferGetPage(newbuf), lsn);
+
+	UnlockReleaseBuffer(newbuf);
+
+	/*
+	 * Note: in normal operation, we'd update the meta page while still
+	 * holding lock on the old and new bucket pages.  But during replay it's
+	 * not necessary to hold those locks, since no other bucket splits can be
+	 * happening concurrently.
+	 */
+
+	/* replay the record for metapage changes */
+	if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
+	{
+		Page		page;
+		HashMetaPage metap;
+
+		page = BufferGetPage(metabuf);
+		metap = HashPageGetMeta(page);
+		metap->hashm_maxbucket = xlrec->new_bucket;
+
+		data = XLogRecGetBlockData(record, 2, &datalen);
+
+		if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
+		{
+			uint32		lowmask;
+			uint32	   *highmask;
+
+			/* extract low and high masks. */
+			memcpy(&lowmask, data, sizeof(uint32));
+			highmask = (uint32 *) ((char *) data + sizeof(uint32));
+
+			/* update metapage */
+			metap->hashm_lowmask = lowmask;
+			metap->hashm_highmask = *highmask;
+
+			data += sizeof(uint32) * 2;
+		}
+
+		if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
+		{
+			uint32		ovflpoint;
+			uint32	   *ovflpages;
+
+			/* extract information of overflow pages. */
+			memcpy(&ovflpoint, data, sizeof(uint32));
+			ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
+
+			/* update metapage */
+			metap->hashm_spares[ovflpoint] = *ovflpages;
+			metap->hashm_ovflpoint = ovflpoint;
+		}
+
+		MarkBufferDirty(metabuf);
+		PageSetLSN(BufferGetPage(metabuf), lsn);
+	}
+
+	if (BufferIsValid(metabuf))
+		UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay of split operation
+ */
+static void
+hash_xlog_split_page(XLogReaderState *record)
+{
+	Buffer		buf;
+
+	if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
+		elog(ERROR, "Hash split record did not contain a full-page image");
+
+	UnlockReleaseBuffer(buf);
+}
+
+/*
+ * replay completion of split operation
+ */
+static void
+hash_xlog_split_complete(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
+	Buffer		oldbuf;
+	Buffer		newbuf;
+	XLogRedoAction action;
+
+	/* replay the record for old bucket */
+	action = XLogReadBufferForRedo(record, 0, &oldbuf);
+
+	/*
+	 * Note that we still update the page even if it was restored from a full
+	 * page image, because the bucket flag is not included in the image.
+	 */
+	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+	{
+		Page		oldpage;
+		HashPageOpaque oldopaque;
+
+		oldpage = BufferGetPage(oldbuf);
+		oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+		oldopaque->hasho_flag = xlrec->old_bucket_flag;
+
+		PageSetLSN(oldpage, lsn);
+		MarkBufferDirty(oldbuf);
+	}
+	if (BufferIsValid(oldbuf))
+		UnlockReleaseBuffer(oldbuf);
+
+	/* replay the record for new bucket */
+	action = XLogReadBufferForRedo(record, 1, &newbuf);
+
+	/*
+	 * Note that we still update the page even if it was restored from a full
+	 * page image, because the bucket flag is not included in the image.
+	 */
+	if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+	{
+		Page		newpage;
+		HashPageOpaque nopaque;
+
+		newpage = BufferGetPage(newbuf);
+		nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
+
+		nopaque->hasho_flag = xlrec->new_bucket_flag;
+
+		PageSetLSN(newpage, lsn);
+		MarkBufferDirty(newbuf);
+	}
+	if (BufferIsValid(newbuf))
+		UnlockReleaseBuffer(newbuf);
+}
+
+/*
+ * replay move of page contents for squeeze operation of hash index
+ */
+static void
+hash_xlog_move_page_contents(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
+	Buffer		bucketbuf = InvalidBuffer;
+	Buffer		writebuf = InvalidBuffer;
+	Buffer		deletebuf = InvalidBuffer;
+	XLogRedoAction action;
+
+	/*
+	 * Ensure to have a cleanup lock on primary bucket page before we start
+	 * with the actual replay operation.  This is to ensure that neither a
+	 * scan can start nor a scan can be already-in-progress during the replay
+	 * of this operation.  If we allow scans during this operation, then they
+	 * can miss some records or show the same record multiple times.
+	 */
+	if (xldata->is_prim_bucket_same_wrt)
+		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+	else
+	{
+		RelFileNode rnode;
+		BlockNumber blkno;
+
+		XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+
+		bucketbuf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+										   RBM_NORMAL);
+		if (BufferIsValid(bucketbuf))
+			LockBufferForCleanup(bucketbuf);
+
+		action = XLogReadBufferForRedo(record, 1, &writebuf);
+	}
+
+	/* replay the record for adding entries in overflow buffer */
+	if (action == BLK_NEEDS_REDO)
+	{
+		Page		writepage;
+		char	   *begin;
+		char	   *data;
+		Size		datalen;
+		uint16		ninserted = 0;
+
+		data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+		writepage = (Page) BufferGetPage(writebuf);
+
+		if (xldata->ntups > 0)
+		{
+			OffsetNumber *towrite = (OffsetNumber *) data;
+
+			data += sizeof(OffsetNumber) * xldata->ntups;
+
+			while (data - begin < datalen)
+			{
+				IndexTuple	itup = (IndexTuple) data;
+				Size		itemsz;
+				OffsetNumber l;
+
+				itemsz = IndexTupleDSize(*itup);
+				itemsz = MAXALIGN(itemsz);
+
+				data += itemsz;
+
+				l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+				if (l == InvalidOffsetNumber)
+					elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
+						 (int) itemsz);
+
+				ninserted++;
+			}
+		}
+
+		/*
+		 * number of tuples inserted must be same as requested in REDO record.
+		 */
+		Assert(ninserted == xldata->ntups);
+
+		PageSetLSN(writepage, lsn);
+		MarkBufferDirty(writebuf);
+	}
+
+	/* replay the record for deleting entries from overflow buffer */
+	if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
+	{
+		Page		page;
+		char	   *ptr;
+		Size		len;
+
+		ptr = XLogRecGetBlockData(record, 2, &len);
+
+		page = (Page) BufferGetPage(deletebuf);
+
+		if (len > 0)
+		{
+			OffsetNumber *unused;
+			OffsetNumber *unend;
+
+			unused = (OffsetNumber *) ptr;
+			unend = (OffsetNumber *) ((char *) ptr + len);
+
+			if ((unend - unused) > 0)
+				PageIndexMultiDelete(page, unused, unend - unused);
+		}
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(deletebuf);
+	}
+
+	/*
+	 * Replay is complete, now we can release the buffers. We release locks at
+	 * end of replay operation to ensure that we hold lock on primary bucket
+	 * page till end of operation.  We can optimize by releasing the lock on
+	 * write buffer as soon as the operation for same is complete, if it is
+	 * not same as primary bucket page, but that doesn't seem to be worth
+	 * complicating the code.
+	 */
+	if (BufferIsValid(deletebuf))
+		UnlockReleaseBuffer(deletebuf);
+
+	if (BufferIsValid(writebuf))
+		UnlockReleaseBuffer(writebuf);
+
+	if (BufferIsValid(bucketbuf))
+		UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay squeeze page operation of hash index
+ */
+static void
+hash_xlog_squeeze_page(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
+	Buffer		bucketbuf = InvalidBuffer;
+	Buffer		writebuf;
+	Buffer		ovflbuf;
+	Buffer		prevbuf = InvalidBuffer;
+	Buffer		mapbuf;
+	XLogRedoAction action;
+
+	/*
+	 * Ensure to have a cleanup lock on primary bucket page before we start
+	 * with the actual replay operation.  This is to ensure that neither a
+	 * scan can start nor a scan can be already-in-progress during the replay
+	 * of this operation.  If we allow scans during this operation, then they
+	 * can miss some records or show the same record multiple times.
+	 */
+	if (xldata->is_prim_bucket_same_wrt)
+		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+	else
+	{
+		RelFileNode rnode;
+		BlockNumber blkno;
+
+		XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+
+		bucketbuf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+										   RBM_NORMAL);
+		if (BufferIsValid(bucketbuf))
+			LockBufferForCleanup(bucketbuf);
+
+		action = XLogReadBufferForRedo(record, 1, &writebuf);
+	}
+
+	/* replay the record for adding entries in overflow buffer */
+	if (action == BLK_NEEDS_REDO)
+	{
+		Page		writepage;
+		char	   *begin;
+		char	   *data;
+		Size		datalen;
+		uint16		ninserted = 0;
+
+		data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+		writepage = (Page) BufferGetPage(writebuf);
+
+		if (xldata->ntups > 0)
+		{
+			OffsetNumber *towrite = (OffsetNumber *) data;
+
+			data += sizeof(OffsetNumber) * xldata->ntups;
+
+			while (data - begin < datalen)
+			{
+				IndexTuple	itup = (IndexTuple) data;
+				Size		itemsz;
+				OffsetNumber l;
+
+				itemsz = IndexTupleDSize(*itup);
+				itemsz = MAXALIGN(itemsz);
+
+				data += itemsz;
+
+				l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+				if (l == InvalidOffsetNumber)
+					elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
+						 (int) itemsz);
+
+				ninserted++;
+			}
+		}
+
+		/*
+		 * number of tuples inserted must be same as requested in REDO record.
+		 */
+		Assert(ninserted == xldata->ntups);
+
+		/*
+		 * if the page on which are adding tuples is a page previous to freed
+		 * overflow page, then update its nextblno.
+		 */
+		if (xldata->is_prev_bucket_same_wrt)
+		{
+			HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
+
+			writeopaque->hasho_nextblkno = xldata->nextblkno;
+		}
+
+		PageSetLSN(writepage, lsn);
+		MarkBufferDirty(writebuf);
+	}
+
+	/* replay the record for initializing overflow buffer */
+	if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
+	{
+		Page		ovflpage;
+
+		ovflpage = BufferGetPage(ovflbuf);
+
+		_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+
+		PageSetLSN(ovflpage, lsn);
+		MarkBufferDirty(ovflbuf);
+	}
+	if (BufferIsValid(ovflbuf))
+		UnlockReleaseBuffer(ovflbuf);
+
+	/* replay the record for page previous to the freed overflow page */
+	if (!xldata->is_prev_bucket_same_wrt &&
+		XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
+	{
+		Page		prevpage = BufferGetPage(prevbuf);
+		HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+		prevopaque->hasho_nextblkno = xldata->nextblkno;
+
+		PageSetLSN(prevpage, lsn);
+		MarkBufferDirty(prevbuf);
+	}
+	if (BufferIsValid(prevbuf))
+		UnlockReleaseBuffer(prevbuf);
+
+	/* replay the record for page next to the freed overflow page */
+	if (XLogRecHasBlockRef(record, 4))
+	{
+		Buffer		nextbuf;
+
+		if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
+		{
+			Page		nextpage = BufferGetPage(nextbuf);
+			HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+			nextopaque->hasho_prevblkno = xldata->prevblkno;
+
+			PageSetLSN(nextpage, lsn);
+			MarkBufferDirty(nextbuf);
+		}
+		if (BufferIsValid(nextbuf))
+			UnlockReleaseBuffer(nextbuf);
+	}
+
+	/* replay the record for bitmap page */
+	if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
+	{
+		Page		mappage = (Page) BufferGetPage(mapbuf);
+		uint32	   *freep = NULL;
+		char	   *data;
+		uint32	   *bitmap_page_bit;
+		Size		datalen;
+
+		freep = HashPageGetBitmap(mappage);
+
+		data = XLogRecGetBlockData(record, 5, &datalen);
+		bitmap_page_bit = (uint32 *) data;
+
+		CLRBIT(freep, *bitmap_page_bit);
+
+		PageSetLSN(mappage, lsn);
+		MarkBufferDirty(mapbuf);
+	}
+	if (BufferIsValid(mapbuf))
+		UnlockReleaseBuffer(mapbuf);
+
+	/* replay the record for meta page */
+	if (XLogRecHasBlockRef(record, 6))
+	{
+		Buffer		metabuf;
+
+		if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
+		{
+			HashMetaPage metap;
+			Page		page;
+			char	   *data;
+			uint32	   *firstfree_ovflpage;
+			Size		datalen;
+
+			data = XLogRecGetBlockData(record, 6, &datalen);
+			firstfree_ovflpage = (uint32 *) data;
+
+			page = BufferGetPage(metabuf);
+			metap = HashPageGetMeta(page);
+			metap->hashm_firstfree = *firstfree_ovflpage;
+
+			PageSetLSN(page, lsn);
+			MarkBufferDirty(metabuf);
+		}
+		if (BufferIsValid(metabuf))
+			UnlockReleaseBuffer(metabuf);
+	}
+
+	/*
+	 * We release locks on writebuf and bucketbuf at end of replay operation
+	 * to ensure that we hold lock on primary bucket page till end of
+	 * operation.  We can optimize by releasing the lock on write buffer as
+	 * soon as the operation for same is complete, if it is not same as
+	 * primary bucket page, but that doesn't seem to be worth complicating the
+	 * code.
+	 */
+	if (BufferIsValid(writebuf))
+		UnlockReleaseBuffer(writebuf);
+
+	if (BufferIsValid(bucketbuf))
+		UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay delete operation of hash index
+ */
+static void
+hash_xlog_delete(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
+	Buffer		bucketbuf = InvalidBuffer;
+	Buffer		deletebuf;
+	Page		page;
+	XLogRedoAction action;
+
+	/*
+	 * Ensure to have a cleanup lock on primary bucket page before we start
+	 * with the actual replay operation.  This is to ensure that neither a
+	 * scan can start nor a scan can be already-in-progress during the replay
+	 * of this operation.  If we allow scans during this operation, then they
+	 * can miss some records or show the same record multiple times.
+	 */
+	if (xldata->is_primary_bucket_page)
+		action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
+	else
+	{
+		RelFileNode rnode;
+		BlockNumber blkno;
+
+		XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
+
+		bucketbuf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
+										   RBM_NORMAL);
+		if (BufferIsValid(bucketbuf))
+			LockBufferForCleanup(bucketbuf);
+
+		action = XLogReadBufferForRedo(record, 1, &deletebuf);
+	}
+
+	/* replay the record for deleting entries in bucket page */
+	if (action == BLK_NEEDS_REDO)
+	{
+		char	   *ptr;
+		Size		len;
+
+		ptr = XLogRecGetBlockData(record, 1, &len);
+
+		page = (Page) BufferGetPage(deletebuf);
+
+		if (len > 0)
+		{
+			OffsetNumber *unused;
+			OffsetNumber *unend;
+
+			unused = (OffsetNumber *) ptr;
+			unend = (OffsetNumber *) ((char *) ptr + len);
+
+			if ((unend - unused) > 0)
+				PageIndexMultiDelete(page, unused, unend - unused);
+		}
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(deletebuf);
+	}
+	if (BufferIsValid(deletebuf))
+		UnlockReleaseBuffer(deletebuf);
+
+	if (BufferIsValid(bucketbuf))
+		UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay split cleanup flag operation for primary bucket page.
+ */
+static void
+hash_xlog_split_cleanup(XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	Buffer		buffer;
+	Page		page;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		HashPageOpaque bucket_opaque;
+
+		page = (Page) BufferGetPage(buffer);
+
+		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay for update meta page
+ */
+static void
+hash_xlog_update_meta_page(XLogReaderState *record)
+{
+	HashMetaPage metap;
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
+	Buffer		metabuf;
+	Page		page;
+
+	if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
+	{
+		page = BufferGetPage(metabuf);
+		metap = HashPageGetMeta(page);
+
+		metap->hashm_ntuples = xldata->ntuples;
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(metabuf);
+	}
+	if (BufferIsValid(metabuf))
+		UnlockReleaseBuffer(metabuf);
+}
+
+void
+hash_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_HASH_INIT_META_PAGE:
+			hash_xlog_init_meta_page(record);
+			break;
+		case XLOG_HASH_INIT_BITMAP_PAGE:
+			hash_xlog_init_bitmap_page(record);
+			break;
+		case XLOG_HASH_INSERT:
+			hash_xlog_insert(record);
+			break;
+		case XLOG_HASH_ADD_OVFL_PAGE:
+			hash_xlog_addovflpage(record);
+			break;
+		case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
+			hash_xlog_split_allocpage(record);
+			break;
+		case XLOG_HASH_SPLIT_PAGE:
+			hash_xlog_split_page(record);
+			break;
+		case XLOG_HASH_SPLIT_COMPLETE:
+			hash_xlog_split_complete(record);
+			break;
+		case XLOG_HASH_MOVE_PAGE_CONTENTS:
+			hash_xlog_move_page_contents(record);
+			break;
+		case XLOG_HASH_SQUEEZE_PAGE:
+			hash_xlog_squeeze_page(record);
+			break;
+		case XLOG_HASH_DELETE:
+			hash_xlog_delete(record);
+			break;
+		case XLOG_HASH_SPLIT_CLEANUP:
+			hash_xlog_split_cleanup(record);
+			break;
+		case XLOG_HASH_UPDATE_META_PAGE:
+			hash_xlog_update_meta_page(record);
+			break;
+		default:
+			elog(PANIC, "hash_redo: unknown op code %u", info);
+	}
+}
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
index 354e733..241728f 100644
--- a/src/backend/access/hash/hashinsert.c
+++ b/src/backend/access/hash/hashinsert.c
@@ -16,6 +16,8 @@
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "miscadmin.h"
 #include "utils/rel.h"
 
 
@@ -40,6 +42,7 @@ _hash_doinsert(Relation rel, IndexTuple itup)
 	bool		do_expand;
 	uint32		hashkey;
 	Bucket		bucket;
+	OffsetNumber itup_off;
 
 	/*
 	 * Get the hash key for the item (it's stored in the index tuple itself).
@@ -158,25 +161,20 @@ restart_insert:
 		Assert(pageopaque->hasho_bucket == bucket);
 	}
 
-	/* found page with enough space, so add the item here */
-	(void) _hash_pgaddtup(rel, buf, itemsz, itup);
-
-	/*
-	 * dirty and release the modified page.  if the page we modified was an
-	 * overflow page, we also need to separately drop the pin we retained on
-	 * the primary bucket page.
-	 */
-	MarkBufferDirty(buf);
-	_hash_relbuf(rel, buf);
-	if (buf != bucket_buf)
-		_hash_dropbuf(rel, bucket_buf);
-
 	/*
 	 * Write-lock the metapage so we can increment the tuple count. After
 	 * incrementing it, check to see if it's time for a split.
 	 */
 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
 
+	/* Do the update.  No ereport(ERROR) until changes are logged */
+	START_CRIT_SECTION();
+
+	/* found page with enough space, so add the item here */
+	itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
+	MarkBufferDirty(buf);
+
+	/* metapage operations */
 	metap = HashPageGetMeta(metapage);
 	metap->hashm_ntuples += 1;
 
@@ -184,10 +182,43 @@ restart_insert:
 	do_expand = metap->hashm_ntuples >
 		(double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
 
-	/* Write out the metapage and drop lock, but keep pin */
 	MarkBufferDirty(metabuf);
+
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_insert xlrec;
+		XLogRecPtr	recptr;
+
+		xlrec.offnum = itup_off;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
+
+		XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+		XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
+
+		PageSetLSN(BufferGetPage(buf), recptr);
+		PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
+	END_CRIT_SECTION();
+
+	/* drop lock on metapage, but keep pin */
 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
 
+	/*
+	 * Release the modified page and ensure to release the pin on primary
+	 * page.
+	 */
+	_hash_relbuf(rel, buf);
+	if (buf != bucket_buf)
+		_hash_dropbuf(rel, bucket_buf);
+
 	/* Attempt to split if a split is needed */
 	if (do_expand)
 		_hash_expandtable(rel, metabuf);
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
index d35089c..12aeac3 100644
--- a/src/backend/access/hash/hashovfl.c
+++ b/src/backend/access/hash/hashovfl.c
@@ -18,6 +18,8 @@
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "miscadmin.h"
 #include "utils/rel.h"
 
 
@@ -94,7 +96,9 @@ _hash_ovflblkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
  *	dropped before exiting (we assume the caller is not interested in 'buf'
  *	anymore) if not asked to retain.  The pin will be retained only for the
  *	primary bucket.  The returned overflow page will be pinned and
- *	write-locked; it is guaranteed to be empty.
+ *	write-locked; it is not guaranteed to be empty as we release and require
+ *	lock on it, so the caller must ensure that it has required space
+ *	in the page.
  *
  *	The caller must hold a pin, but no lock, on the metapage buffer.
  *	That buffer is returned in the same state.
@@ -136,6 +140,13 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
 	 * page is released, then finally acquire the lock on new overflow buffer.
 	 * We need this locking order to avoid deadlock with backends that are
 	 * doing inserts.
+	 *
+	 * Note: We could have avoided locking many buffers here if we make two
+	 * WAL records for acquiring an overflow page (one to allocate an overflow
+	 * page and another to add it to overflow bucket chain).  However, doing
+	 * so can leak an overflow page, if the system crashes after allocation.
+	 * Needless to say, it is better to have a single record from a
+	 * performance point of view as well.
 	 */
 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
@@ -303,8 +314,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
 found:
 
 	/*
-	 * Do the update.
+	 * Do the update.  No ereport(ERROR) until changes are logged. We want to
+	 * log the changes for bitmap page and overflow page together to avoid
+	 * loss of pages in case the new page is added.
 	 */
+	START_CRIT_SECTION();
+
 	if (page_found)
 	{
 		Assert(BufferIsValid(mapbuf));
@@ -361,6 +376,61 @@ found:
 
 	MarkBufferDirty(buf);
 
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		XLogRecPtr	recptr;
+		xl_hash_addovflpage xlrec;
+
+		xlrec.bmpage_found = page_found;
+		xlrec.bmsize = metap->hashm_bmsize;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHashAddOvflPage);
+
+		XLogRegisterBuffer(0, ovflbuf, REGBUF_WILL_INIT);
+		XLogRegisterBufData(0, (char *) &pageopaque->hasho_bucket, sizeof(Bucket));
+
+		XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+
+		if (BufferIsValid(mapbuf))
+		{
+			/*
+			 * As bitmap page doesn't have standard page layout, so this will
+			 * allow us to log the data.
+			 */
+			XLogRegisterBuffer(2, mapbuf, REGBUF_STANDARD);
+			XLogRegisterBufData(2, (char *) &bitmap_page_bit, sizeof(uint32));
+		}
+
+		if (BufferIsValid(newmapbuf))
+			XLogRegisterBuffer(3, newmapbuf, REGBUF_WILL_INIT);
+
+		/*
+		 * To replay meta page changes, we can log the entire metapage which
+		 * doesn't seem advisable considering size of hash metapage or we can
+		 * log the individual updated values which seems doable, but we prefer
+		 * to perform exact operations on metapage during replay as are done
+		 * during actual operation.  That looks straight forward and has an
+		 * advantage of using lesser space in WAL.
+		 */
+		XLogRegisterBuffer(4, metabuf, REGBUF_STANDARD);
+		XLogRegisterBufData(4, (char *) &metap->hashm_firstfree, sizeof(uint32));
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_ADD_OVFL_PAGE);
+
+		PageSetLSN(BufferGetPage(ovflbuf), recptr);
+		PageSetLSN(BufferGetPage(buf), recptr);
+
+		if (BufferIsValid(mapbuf))
+			PageSetLSN(BufferGetPage(mapbuf), recptr);
+
+		if (BufferIsValid(newmapbuf))
+			PageSetLSN(BufferGetPage(newmapbuf), recptr);
+	}
+
+	END_CRIT_SECTION();
+
 	if (retain_pin)
 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 	else
@@ -374,6 +444,13 @@ found:
 	if (BufferIsValid(newmapbuf))
 		_hash_relbuf(rel, newmapbuf);
 
+	/*
+	 * we need to release and reacquire the lock on overflow buffer to ensure
+	 * that standby shouldn't see an intermediate state of it.
+	 */
+	LockBuffer(ovflbuf, BUFFER_LOCK_UNLOCK);
+	LockBuffer(ovflbuf, BUFFER_LOCK_EXCLUSIVE);
+
 	return ovflbuf;
 }
 
@@ -407,7 +484,11 @@ _hash_firstfreebit(uint32 map)
  *	Remove this overflow page from its bucket's chain, and mark the page as
  *	free.  On entry, ovflbuf is write-locked; it is released before exiting.
  *
- *	Add the tuples (itups) to wbuf.
+ *	Add the tuples (itups) to wbuf in this function, we could do that in the
+ *	caller as well.  The advantage of doing it here is we can easily write
+ *	the WAL for XLOG_HASH_SQUEEZE_PAGE operation.  Addition of tuples and
+ *	removal of overflow page has to done as an atomic operation, otherwise
+ *	during replay on standby users might find duplicate records.
  *
  *	Since this function is invoked in VACUUM, we provide an access strategy
  *	parameter that controls fetches of the bucket pages.
@@ -429,8 +510,6 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
 	HashMetaPage metap;
 	Buffer		metabuf;
 	Buffer		mapbuf;
-	Buffer		prevbuf = InvalidBuffer;
-	Buffer		nextbuf = InvalidBuffer;
 	BlockNumber ovflblkno;
 	BlockNumber prevblkno;
 	BlockNumber blkno;
@@ -444,6 +523,9 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
 	int32		bitmappage,
 				bitmapbit;
 	Bucket bucket PG_USED_FOR_ASSERTS_ONLY;
+	Buffer		prevbuf = InvalidBuffer;
+	Buffer		nextbuf = InvalidBuffer;
+	bool		update_metap = false;
 
 	/* Get information from the doomed page */
 	_hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
@@ -508,6 +590,12 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
 	/* Get write-lock on metapage to update firstfree */
 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
 
+	/* This operation needs to log multiple tuples, prepare WAL for that */
+	if (RelationNeedsWAL(rel))
+		XLogEnsureRecordSpace(HASH_XLOG_FREE_OVFL_BUFS, XLR_NORMAL_RDATAS + nitups);
+
+	START_CRIT_SECTION();
+
 	/*
 	 * we have to insert tuples on the "write" page, being careful to preserve
 	 * hashkey ordering.  (If we insert many tuples into the same "write" page
@@ -519,7 +607,11 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
 		MarkBufferDirty(wbuf);
 	}
 
-	/* Initialise the freed overflow page. */
+	/*
+	 * Initialise the freed overflow page, here we can't complete zeroed the
+	 * page as WAL replay routines expect pages to be initialized. See
+	 * explanation of RBM_NORMAL mode atop XLogReadBufferExtended.
+	 */
 	_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
 	MarkBufferDirty(ovflbuf);
 
@@ -550,9 +642,83 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
 	if (ovflbitno < metap->hashm_firstfree)
 	{
 		metap->hashm_firstfree = ovflbitno;
+		update_metap = true;
 		MarkBufferDirty(metabuf);
 	}
 
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_squeeze_page xlrec;
+		XLogRecPtr	recptr;
+		int			i;
+
+		xlrec.prevblkno = prevblkno;
+		xlrec.nextblkno = nextblkno;
+		xlrec.ntups = nitups;
+		xlrec.is_prim_bucket_same_wrt = (wbuf == bucketbuf) ? true : false;
+		xlrec.is_prev_bucket_same_wrt = (wbuf == prevbuf) ? true : false;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
+
+		/*
+		 * bucket buffer needs to be registered to ensure that we can acquire
+		 * a cleanup lock on it during replay.
+		 */
+		if (!xlrec.is_prim_bucket_same_wrt)
+			XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD);
+
+		XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
+		if (xlrec.ntups > 0)
+		{
+			XLogRegisterBufData(1, (char *) itup_offsets,
+								nitups * sizeof(OffsetNumber));
+			for (i = 0; i < nitups; i++)
+				XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
+		}
+
+		XLogRegisterBuffer(2, ovflbuf, REGBUF_STANDARD);
+
+		/*
+		 * If prevpage and the writepage (block in which we are moving tuples
+		 * from overflow) are same, then no need to separately register
+		 * prevpage.  During replay, we can directly update the nextblock in
+		 * writepage.
+		 */
+		if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
+			XLogRegisterBuffer(3, prevbuf, REGBUF_STANDARD);
+
+		if (BufferIsValid(nextbuf))
+			XLogRegisterBuffer(4, nextbuf, REGBUF_STANDARD);
+
+		XLogRegisterBuffer(5, mapbuf, REGBUF_STANDARD);
+		XLogRegisterBufData(5, (char *) &bitmapbit, sizeof(uint32));
+
+		if (update_metap)
+		{
+			XLogRegisterBuffer(6, metabuf, REGBUF_STANDARD);
+			XLogRegisterBufData(6, (char *) &metap->hashm_firstfree, sizeof(uint32));
+		}
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SQUEEZE_PAGE);
+
+		PageSetLSN(BufferGetPage(wbuf), recptr);
+		PageSetLSN(BufferGetPage(ovflbuf), recptr);
+
+		if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
+			PageSetLSN(BufferGetPage(prevbuf), recptr);
+		if (BufferIsValid(nextbuf))
+			PageSetLSN(BufferGetPage(nextbuf), recptr);
+
+		PageSetLSN(BufferGetPage(mapbuf), recptr);
+
+		if (update_metap)
+			PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
+	END_CRIT_SECTION();
+
 	/* release previous bucket if it is not same as write bucket */
 	if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
 		_hash_relbuf(rel, prevbuf);
@@ -604,7 +770,11 @@ _hash_initbitmapbuffer(Buffer buf, uint16 bmsize, bool initpage)
 	freep = HashPageGetBitmap(pg);
 	MemSet(freep, 0xFF, bmsize);
 
-	/* Set pd_lower just past the end of the bitmap page data. */
+	/*
+	 * Set pd_lower just past the end of the bitmap page data.  We could even
+	 * set pd_lower equal to pd_upper, but this is more precise and makes the
+	 * page look compressible to xlog.c.
+	 */
 	((PageHeader) pg)->pd_lower = ((char *) freep + bmsize) - (char *) pg;
 }
 
@@ -766,6 +936,15 @@ readpage:
 					Assert(nitups == ndeletable);
 
 					/*
+					 * This operation needs to log multiple tuples, prepare
+					 * WAL for that.
+					 */
+					if (RelationNeedsWAL(rel))
+						XLogEnsureRecordSpace(0, XLR_NORMAL_RDATAS + nitups);
+
+					START_CRIT_SECTION();
+
+					/*
 					 * we have to insert tuples on the "write" page, being
 					 * careful to preserve hashkey ordering.  (If we insert
 					 * many tuples into the same "write" page it would be
@@ -778,6 +957,43 @@ readpage:
 					PageIndexMultiDelete(rpage, deletable, ndeletable);
 					MarkBufferDirty(rbuf);
 
+					/* XLOG stuff */
+					if (RelationNeedsWAL(rel))
+					{
+						XLogRecPtr	recptr;
+						xl_hash_move_page_contents xlrec;
+
+						xlrec.ntups = nitups;
+						xlrec.is_prim_bucket_same_wrt = (wbuf == bucket_buf) ? true : false;
+
+						XLogBeginInsert();
+						XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
+
+						/*
+						 * bucket buffer needs to be registered to ensure that
+						 * we can acquire a cleanup lock on it during replay.
+						 */
+						if (!xlrec.is_prim_bucket_same_wrt)
+							XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+						XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
+						XLogRegisterBufData(1, (char *) itup_offsets,
+											nitups * sizeof(OffsetNumber));
+						for (i = 0; i < nitups; i++)
+							XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
+
+						XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD);
+						XLogRegisterBufData(2, (char *) deletable,
+										  ndeletable * sizeof(OffsetNumber));
+
+						recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_MOVE_PAGE_CONTENTS);
+
+						PageSetLSN(BufferGetPage(wbuf), recptr);
+						PageSetLSN(BufferGetPage(rbuf), recptr);
+					}
+
+					END_CRIT_SECTION();
+
 					tups_moved = true;
 				}
 
@@ -790,13 +1006,24 @@ readpage:
 				else
 					_hash_relbuf(rel, wbuf);
 
+				/*
+				 * We need to release and if required reacquire the lock on
+				 * rbuf to ensure that standby shouldn't see an intermediate
+				 * state of it.  If we don't release the lock, after replay of
+				 * XLOG_HASH_MOVE_PAGE_CONTENTS on standby users will be able to
+				 * view the results of partial deletion on rblkno.
+				 */
+				LockBuffer(rbuf, BUFFER_LOCK_UNLOCK);
+
 				/* nothing more to do if we reached the read page */
 				if (rblkno == wblkno)
 				{
-					_hash_relbuf(rel, rbuf);
+					_hash_dropbuf(rel, rbuf);
 					return;
 				}
 
+				LockBuffer(rbuf, BUFFER_LOCK_EXCLUSIVE);
+
 				wbuf = next_wbuf;
 				wpage = BufferGetPage(wbuf);
 				wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 07cb4f2..bac030a 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "access/hash_xlog.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
@@ -43,6 +44,8 @@ static void _hash_splitbucket(Relation rel, Buffer metabuf,
 				  HTAB *htab,
 				  uint32 maxbucket,
 				  uint32 highmask, uint32 lowmask);
+static void
+			log_split_page(Relation rel, Buffer buf);
 
 
 /*
@@ -381,6 +384,31 @@ _hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
 	pg = BufferGetPage(metabuf);
 	metap = HashPageGetMeta(pg);
 
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_init_meta_page xlrec;
+		XLogRecPtr	recptr;
+
+		/*
+		 * Here, we could have copied the entire metapage as well to WAL
+		 * record and restore it as it is during replay.  However the size of
+		 * metapage is not small (more than 600 bytes), so recording just the
+		 * information required to construct metapage.
+		 */
+		xlrec.num_tuples = num_tuples;
+		xlrec.procid = metap->hashm_procid;
+		xlrec.ffactor = metap->hashm_ffactor;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHashInitMetaPage);
+		XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
+
+		PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
 	num_buckets = metap->hashm_maxbucket + 1;
 
 	/*
@@ -405,6 +433,12 @@ _hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
 		buf = _hash_getnewbuf(rel, blkno, forkNum);
 		_hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false);
 		MarkBufferDirty(buf);
+
+		log_newpage(&rel->rd_node,
+					forkNum,
+					blkno,
+					BufferGetPage(buf),
+					true);
 		_hash_relbuf(rel, buf);
 	}
 
@@ -431,6 +465,32 @@ _hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
 	metap->hashm_nmaps++;
 	MarkBufferDirty(metabuf);
 
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_init_bitmap_page xlrec;
+		XLogRecPtr	recptr;
+
+		xlrec.bmsize = metap->hashm_bmsize;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfHashInitBitmapPage);
+		XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
+
+		/*
+		 * We can log the exact changes made to meta page, however as no
+		 * concurrent operation could see the index during the replay of this
+		 * record, we can perform the operations during replay as they are
+		 * done here.
+		 */
+		XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_BITMAP_PAGE);
+
+		PageSetLSN(BufferGetPage(bitmapbuf), recptr);
+		PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
 	/* all done */
 	_hash_relbuf(rel, bitmapbuf);
 	_hash_relbuf(rel, metabuf);
@@ -525,7 +585,10 @@ _hash_init_metabuffer(Buffer buf, double num_tuples, RegProcedure procid,
 	metap->hashm_ovflpoint = log2_num_buckets;
 	metap->hashm_firstfree = 0;
 
-	/* Set pd_lower just past the end of the metadata. */
+	/*
+	 * Set pd_lower just past the end of the metadata.  This is to log
+	 * full_page_image of metapage in xloginsert.c.
+	 */
 	((PageHeader) page)->pd_lower =
 		((char *) metap + sizeof(HashMetaPageData)) - (char *) page;
 }
@@ -569,6 +632,8 @@ _hash_expandtable(Relation rel, Buffer metabuf)
 	uint32		maxbucket;
 	uint32		highmask;
 	uint32		lowmask;
+	bool		metap_update_masks = false;
+	bool		metap_update_splitpoint = false;
 
 restart_expand:
 
@@ -728,7 +793,11 @@ restart_expand:
 		 * The number of buckets in the new splitpoint is equal to the total
 		 * number already in existence, i.e. new_bucket.  Currently this maps
 		 * one-to-one to blocks required, but someday we may need a more
-		 * complicated calculation here.
+		 * complicated calculation here.  We treat allocation of buckets as a
+		 * separate WAL action.  Even if we fail after this operation, it
+		 * won't be a leak on standby, as next split will consume this space.
+		 * In any case, even without failure we don't use all the space in one
+		 * split operation.
 		 */
 		if (!_hash_alloc_buckets(rel, start_nblkno, new_bucket))
 		{
@@ -757,8 +826,7 @@ restart_expand:
 	 * Since we are scribbling on the pages in the shared buffers, establish a
 	 * critical section.  Any failure in this next code leaves us with a big
 	 * problem: the metapage is effectively corrupt but could get written back
-	 * to disk.  We don't really expect any failure, but just to be sure,
-	 * establish a critical section.
+	 * to disk.
 	 */
 	START_CRIT_SECTION();
 
@@ -772,6 +840,7 @@ restart_expand:
 		/* Starting a new doubling */
 		metap->hashm_lowmask = metap->hashm_highmask;
 		metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
+		metap_update_masks = true;
 	}
 
 	/*
@@ -784,6 +853,7 @@ restart_expand:
 	{
 		metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
 		metap->hashm_ovflpoint = spare_ndx;
+		metap_update_splitpoint = true;
 	}
 
 	MarkBufferDirty(metabuf);
@@ -829,11 +899,61 @@ restart_expand:
 
 	MarkBufferDirty(buf_nblkno);
 
+	/* XLOG stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_hash_split_allocpage xlrec;
+		XLogRecPtr	recptr;
+
+		xlrec.new_bucket = maxbucket;
+		xlrec.old_bucket_flag = oopaque->hasho_flag;
+		xlrec.new_bucket_flag = nopaque->hasho_flag;
+		xlrec.flags = 0;
+
+		XLogBeginInsert();
+
+		XLogRegisterBuffer(0, buf_oblkno, REGBUF_STANDARD);
+		XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT);
+		XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
+
+		if (metap_update_masks)
+		{
+			xlrec.flags |= XLH_SPLIT_META_UPDATE_MASKS;
+			XLogRegisterBufData(2, (char *) &metap->hashm_lowmask, sizeof(uint32));
+			XLogRegisterBufData(2, (char *) &metap->hashm_highmask, sizeof(uint32));
+		}
+
+		if (metap_update_splitpoint)
+		{
+			xlrec.flags |= XLH_SPLIT_META_UPDATE_SPLITPOINT;
+			XLogRegisterBufData(2, (char *) &metap->hashm_ovflpoint,
+								sizeof(uint32));
+			XLogRegisterBufData(2,
+					   (char *) &metap->hashm_spares[metap->hashm_ovflpoint],
+								sizeof(uint32));
+		}
+
+		XLogRegisterData((char *) &xlrec, SizeOfHashSplitAllocPage);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_ALLOCATE_PAGE);
+
+		PageSetLSN(BufferGetPage(buf_oblkno), recptr);
+		PageSetLSN(BufferGetPage(buf_nblkno), recptr);
+		PageSetLSN(BufferGetPage(metabuf), recptr);
+	}
+
 	END_CRIT_SECTION();
 
 	/* drop lock, but keep pin */
 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
 
+	/*
+	 * we need to release and reacquire the lock on new block buffer to ensure
+	 * that standby shouldn't see an intermediate state of it.
+	 */
+	LockBuffer(buf_nblkno, BUFFER_LOCK_UNLOCK);
+	LockBuffer(buf_nblkno, BUFFER_LOCK_EXCLUSIVE);
+
 	/* Relocate records to the new bucket */
 	_hash_splitbucket(rel, metabuf,
 					  old_bucket, new_bucket,
@@ -883,6 +1003,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 {
 	BlockNumber lastblock;
 	char		zerobuf[BLCKSZ];
+	Page		page;
 
 	lastblock = firstblock + nblocks - 1;
 
@@ -893,7 +1014,21 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	if (lastblock < firstblock || lastblock == InvalidBlockNumber)
 		return false;
 
-	MemSet(zerobuf, 0, sizeof(zerobuf));
+	page = (Page) zerobuf;
+
+	/*
+	 * Initialise the new bucket page, here we can't complete zeroed the page
+	 * as WAL replay routines expect pages to be initialized. See explanation
+	 * of RBM_NORMAL mode atop XLogReadBufferExtended.
+	 */
+	_hash_pageinit(page, BLCKSZ);
+
+	if (RelationNeedsWAL(rel))
+		log_newpage(&rel->rd_node,
+					MAIN_FORKNUM,
+					lastblock,
+					zerobuf,
+					true);
 
 	RelationOpenSmgr(rel);
 	smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf, false);
@@ -1029,8 +1164,11 @@ _hash_splitbucket(Relation rel,
 				itemsz = IndexTupleDSize(*new_itup);
 				itemsz = MAXALIGN(itemsz);
 
-				if (PageGetFreeSpace(npage) < itemsz)
+				while (PageGetFreeSpace(npage) < itemsz)
 				{
+					/* log the split operation before releasing the lock */
+					log_split_page(rel, nbuf);
+
 					/* drop lock, but keep pin */
 					LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
 
@@ -1041,6 +1179,13 @@ _hash_splitbucket(Relation rel,
 				}
 
 				/*
+				 * Change the shared buffer state in critical section,
+				 * otherwise any error could make it unrecoverable after
+				 * recovery.
+				 */
+				START_CRIT_SECTION();
+
+				/*
 				 * Insert tuple on new page, using _hash_pgaddtup to ensure
 				 * correct ordering by hashkey.  This is a tad inefficient
 				 * since we may have to shuffle itempointers repeatedly.
@@ -1049,6 +1194,8 @@ _hash_splitbucket(Relation rel,
 				 */
 				(void) _hash_pgaddtup(rel, nbuf, itemsz, new_itup);
 
+				END_CRIT_SECTION();
+
 				/* be tidy */
 				pfree(new_itup);
 			}
@@ -1072,6 +1219,9 @@ _hash_splitbucket(Relation rel,
 		/* Exit loop if no more overflow pages in old bucket */
 		if (!BlockNumberIsValid(oblkno))
 		{
+			/* log the split operation before releasing the lock */
+			log_split_page(rel, nbuf);
+
 			if (nbuf == bucket_nbuf)
 				LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
 			else
@@ -1101,6 +1251,8 @@ _hash_splitbucket(Relation rel,
 	npage = BufferGetPage(bucket_nbuf);
 	nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
 
+	START_CRIT_SECTION();
+
 	oopaque->hasho_flag &= ~LH_BUCKET_BEING_SPLIT;
 	nopaque->hasho_flag &= ~LH_BUCKET_BEING_POPULATED;
 
@@ -1117,6 +1269,29 @@ _hash_splitbucket(Relation rel,
 	 */
 	MarkBufferDirty(bucket_obuf);
 	MarkBufferDirty(bucket_nbuf);
+
+	if (RelationNeedsWAL(rel))
+	{
+		XLogRecPtr	recptr;
+		xl_hash_split_complete xlrec;
+
+		xlrec.old_bucket_flag = oopaque->hasho_flag;
+		xlrec.new_bucket_flag = nopaque->hasho_flag;
+
+		XLogBeginInsert();
+
+		XLogRegisterData((char *) &xlrec, SizeOfHashSplitComplete);
+
+		XLogRegisterBuffer(0, bucket_obuf, REGBUF_STANDARD);
+		XLogRegisterBuffer(1, bucket_nbuf, REGBUF_STANDARD);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_COMPLETE);
+
+		PageSetLSN(BufferGetPage(bucket_obuf), recptr);
+		PageSetLSN(BufferGetPage(bucket_nbuf), recptr);
+	}
+
+	END_CRIT_SECTION();
 }
 
 /*
@@ -1243,6 +1418,38 @@ _hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Bucket obucket,
 }
 
 /*
+ *	log_split_page() -- Log the split operation
+ *
+ *	We log the split operation when the new page in new bucket gets full,
+ *	so we log the entire page.
+ *
+ *	'buf' must be locked by the caller which is also responsible for unlocking
+ *	it.
+ */
+static void
+log_split_page(Relation rel, Buffer buf)
+{
+	START_CRIT_SECTION();
+
+	MarkBufferDirty(buf);
+
+	if (RelationNeedsWAL(rel))
+	{
+		XLogRecPtr	recptr;
+
+		XLogBeginInsert();
+
+		XLogRegisterBuffer(0, buf, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+
+		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE);
+
+		PageSetLSN(BufferGetPage(buf), recptr);
+	}
+
+	END_CRIT_SECTION();
+}
+
+/*
  *	_hash_getcachedmetap() -- Returns cached metapage data.
  *
  *	If metabuf is not InvalidBuffer, caller must hold a pin, but no lock, on
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 9e5d7e4..d733770 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -123,6 +123,7 @@ _hash_readnext(IndexScanDesc scan,
 	if (block_found)
 	{
 		*pagep = BufferGetPage(*bufp);
+		TestForOldSnapshot(scan->xs_snapshot, rel, *pagep);
 		*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
 	}
 }
@@ -168,6 +169,7 @@ _hash_readprev(IndexScanDesc scan,
 		*bufp = _hash_getbuf(rel, blkno, HASH_READ,
 							 LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
 		*pagep = BufferGetPage(*bufp);
+		TestForOldSnapshot(scan->xs_snapshot, rel, *pagep);
 		*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
 
 		/*
@@ -283,6 +285,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 
 	buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_READ, NULL);
 	page = BufferGetPage(buf);
+	TestForOldSnapshot(scan->xs_snapshot, rel, page);
 	opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 	bucket = opaque->hasho_bucket;
 
@@ -318,6 +321,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
 		old_buf = _hash_getbuf(rel, old_blkno, HASH_READ, LH_BUCKET_PAGE);
+		TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(old_buf));
 
 		/*
 		 * remember the split bucket buffer so as to use it later for
@@ -520,6 +524,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 					_hash_readprev(scan, &buf, &page, &opaque);
 					if (BufferIsValid(buf))
 					{
+						TestForOldSnapshot(scan->xs_snapshot, rel, page);
 						maxoff = PageGetMaxOffsetNumber(page);
 						offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
 					}
diff --git a/src/backend/access/rmgrdesc/hashdesc.c b/src/backend/access/rmgrdesc/hashdesc.c
index 7eac819..f00ec2d 100644
--- a/src/backend/access/rmgrdesc/hashdesc.c
+++ b/src/backend/access/rmgrdesc/hashdesc.c
@@ -19,10 +19,142 @@
 void
 hash_desc(StringInfo buf, XLogReaderState *record)
 {
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_HASH_INIT_META_PAGE:
+			{
+				xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) rec;
+
+				appendStringInfo(buf, "num_tuples %g, fillfactor %d",
+								 xlrec->num_tuples, xlrec->ffactor);
+				break;
+			}
+		case XLOG_HASH_INIT_BITMAP_PAGE:
+			{
+				xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) rec;
+
+				appendStringInfo(buf, "bmsize %d", xlrec->bmsize);
+				break;
+			}
+		case XLOG_HASH_INSERT:
+			{
+				xl_hash_insert *xlrec = (xl_hash_insert *) rec;
+
+				appendStringInfo(buf, "off %u", xlrec->offnum);
+				break;
+			}
+		case XLOG_HASH_ADD_OVFL_PAGE:
+			{
+				xl_hash_addovflpage *xlrec = (xl_hash_addovflpage *) rec;
+
+				appendStringInfo(buf, "bmsize %d, bmpage_found %c",
+						   xlrec->bmsize, (xlrec->bmpage_found) ? 'T' : 'F');
+				break;
+			}
+		case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
+			{
+				xl_hash_split_allocpage *xlrec = (xl_hash_split_allocpage *) rec;
+
+				appendStringInfo(buf, "new_bucket %u, meta_page_masks_updated %c, issplitpoint_changed %c",
+								 xlrec->new_bucket,
+					(xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS) ? 'T' : 'F',
+								 (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT) ? 'T' : 'F');
+				break;
+			}
+		case XLOG_HASH_SPLIT_COMPLETE:
+			{
+				xl_hash_split_complete *xlrec = (xl_hash_split_complete *) rec;
+
+				appendStringInfo(buf, "old_bucket_flag %u, new_bucket_flag %u",
+								 xlrec->old_bucket_flag, xlrec->new_bucket_flag);
+				break;
+			}
+		case XLOG_HASH_MOVE_PAGE_CONTENTS:
+			{
+				xl_hash_move_page_contents *xlrec = (xl_hash_move_page_contents *) rec;
+
+				appendStringInfo(buf, "ntups %d, is_primary %c",
+								 xlrec->ntups,
+								 xlrec->is_prim_bucket_same_wrt ? 'T' : 'F');
+				break;
+			}
+		case XLOG_HASH_SQUEEZE_PAGE:
+			{
+				xl_hash_squeeze_page *xlrec = (xl_hash_squeeze_page *) rec;
+
+				appendStringInfo(buf, "prevblkno %u, nextblkno %u, ntups %d, is_primary %c",
+								 xlrec->prevblkno,
+								 xlrec->nextblkno,
+								 xlrec->ntups,
+								 xlrec->is_prim_bucket_same_wrt ? 'T' : 'F');
+				break;
+			}
+		case XLOG_HASH_DELETE:
+			{
+				xl_hash_delete *xlrec = (xl_hash_delete *) rec;
+
+				appendStringInfo(buf, "is_primary %c",
+								 xlrec->is_primary_bucket_page ? 'T' : 'F');
+				break;
+			}
+		case XLOG_HASH_UPDATE_META_PAGE:
+			{
+				xl_hash_update_meta_page *xlrec = (xl_hash_update_meta_page *) rec;
+
+				appendStringInfo(buf, "ntuples %g",
+								 xlrec->ntuples);
+				break;
+			}
+	}
 }
 
 const char *
 hash_identify(uint8 info)
 {
-	return NULL;
+	const char *id = NULL;
+
+	switch (info & ~XLR_INFO_MASK)
+	{
+		case XLOG_HASH_INIT_META_PAGE:
+			id = "INIT_META_PAGE";
+			break;
+		case XLOG_HASH_INIT_BITMAP_PAGE:
+			id = "INIT_BITMAP_PAGE";
+			break;
+		case XLOG_HASH_INSERT:
+			id = "INSERT";
+			break;
+		case XLOG_HASH_ADD_OVFL_PAGE:
+			id = "ADD_OVFL_PAGE";
+			break;
+		case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
+			id = "SPLIT_ALLOCATE_PAGE";
+			break;
+		case XLOG_HASH_SPLIT_PAGE:
+			id = "SPLIT_PAGE";
+			break;
+		case XLOG_HASH_SPLIT_COMPLETE:
+			id = "SPLIT_COMPLETE";
+			break;
+		case XLOG_HASH_MOVE_PAGE_CONTENTS:
+			id = "MOVE_PAGE_CONTENTS";
+			break;
+		case XLOG_HASH_SQUEEZE_PAGE:
+			id = "SQUEEZE_PAGE";
+			break;
+		case XLOG_HASH_DELETE:
+			id = "DELETE";
+			break;
+		case XLOG_HASH_SPLIT_CLEANUP:
+			id = "SPLIT_CLEANUP";
+			break;
+		case XLOG_HASH_UPDATE_META_PAGE:
+			id = "UPDATE_META_PAGE";
+			break;
+	}
+
+	return id;
 }
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 72bb06c..9618032 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -506,11 +506,6 @@ DefineIndex(Oid relationId,
 	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
 	amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
 
-	if (strcmp(accessMethodName, "hash") == 0 &&
-		RelationNeedsWAL(rel))
-		ereport(WARNING,
-				(errmsg("hash indexes are not WAL-logged and their use is discouraged")));
-
 	if (stmt->unique && !amRoutine->amcanunique)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 9001e20..ce55fc5 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5880,13 +5880,10 @@ RelationIdIsInInitFile(Oid relationId)
 /*
  * Tells whether any index for the relation is unlogged.
  *
- * Any index using the hash AM is implicitly unlogged.
- *
  * Note: There doesn't seem to be any way to have an unlogged index attached
- * to a permanent table except to create a hash index, but it seems best to
- * keep this general so that it returns sensible results even when they seem
- * obvious (like for an unlogged table) and to handle possible future unlogged
- * indexes on permanent tables.
+ * to a permanent table, but it seems best to keep this general so that it
+ * returns sensible results even when they seem obvious (like for an unlogged
+ * table) and to handle possible future unlogged indexes on permanent tables.
  */
 bool
 RelationHasUnloggedIndex(Relation rel)
@@ -5908,8 +5905,7 @@ RelationHasUnloggedIndex(Relation rel)
 			elog(ERROR, "cache lookup failed for relation %u", indexoid);
 		reltup = (Form_pg_class) GETSTRUCT(tp);
 
-		if (reltup->relpersistence == RELPERSISTENCE_UNLOGGED
-			|| reltup->relam == HASH_AM_OID)
+		if (reltup->relpersistence == RELPERSISTENCE_UNLOGGED)
 			result = true;
 
 		ReleaseSysCache(tp);
diff --git a/src/include/access/hash_xlog.h b/src/include/access/hash_xlog.h
index cc23163..8400022 100644
--- a/src/include/access/hash_xlog.h
+++ b/src/include/access/hash_xlog.h
@@ -16,7 +16,239 @@
 
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
+#include "storage/off.h"
 
+/* Number of buffers required for XLOG_HASH_SQUEEZE_PAGE operation */
+#define HASH_XLOG_FREE_OVFL_BUFS	6
+
+/*
+ * XLOG records for hash operations
+ */
+#define XLOG_HASH_INIT_META_PAGE	0x00		/* initialize the meta page */
+#define XLOG_HASH_INIT_BITMAP_PAGE	0x10		/* initialize the bitmap page */
+#define XLOG_HASH_INSERT		0x20	/* add index tuple without split */
+#define XLOG_HASH_ADD_OVFL_PAGE 0x30	/* add overflow page */
+#define XLOG_HASH_SPLIT_ALLOCATE_PAGE	0x40	/* allocate new page for split */
+#define XLOG_HASH_SPLIT_PAGE	0x50	/* split page */
+#define XLOG_HASH_SPLIT_COMPLETE	0x60		/* completion of split
+												 * operation */
+#define XLOG_HASH_MOVE_PAGE_CONTENTS	0x70	/* remove tuples from one page
+												 * and add to another page */
+#define XLOG_HASH_SQUEEZE_PAGE	0x80	/* add tuples to one of the previous
+										 * pages in chain and free the ovfl
+										 * page */
+#define XLOG_HASH_DELETE		0x90	/* delete index tuples from a page */
+#define XLOG_HASH_SPLIT_CLEANUP 0xA0	/* clear split-cleanup flag in primary
+										 * bucket page after deleting tuples
+										 * that are moved due to split	*/
+#define XLOG_HASH_UPDATE_META_PAGE	0xB0		/* update meta page after
+												 * vacuum */
+
+
+/*
+ * xl_hash_split_allocpage flag values, 8 bits are available.
+ */
+#define XLH_SPLIT_META_UPDATE_MASKS		(1<<0)
+#define XLH_SPLIT_META_UPDATE_SPLITPOINT		(1<<1)
+
+/*
+ * This is what we need to know about a HASH index create.
+ *
+ * Backup block 0: metapage
+ */
+typedef struct xl_hash_createidx
+{
+	double		num_tuples;
+	RegProcedure procid;
+	uint16		ffactor;
+}	xl_hash_createidx;
+#define SizeOfHashCreateIdx (offsetof(xl_hash_createidx, ffactor) + sizeof(uint16))
+
+/*
+ * This is what we need to know about simple (without split) insert.
+ *
+ * This data record is used for XLOG_HASH_INSERT
+ *
+ * Backup Blk 0: original page (data contains the inserted tuple)
+ * Backup Blk 1: metapage (HashMetaPageData)
+ */
+typedef struct xl_hash_insert
+{
+	OffsetNumber offnum;
+}	xl_hash_insert;
+
+#define SizeOfHashInsert	(offsetof(xl_hash_insert, offnum) + sizeof(OffsetNumber))
+
+/*
+ * This is what we need to know about addition of overflow page.
+ *
+ * This data record is used for XLOG_HASH_ADD_OVFL_PAGE
+ *
+ * Backup Blk 0: newly allocated overflow page
+ * Backup Blk 1: page before new overflow page in the bucket chain
+ * Backup Blk 2: bitmap page
+ * Backup Blk 3: new bitmap page
+ * Backup Blk 4: metapage
+ */
+typedef struct xl_hash_addovflpage
+{
+	uint16		bmsize;
+	bool		bmpage_found;
+}	xl_hash_addovflpage;
+
+#define SizeOfHashAddOvflPage	\
+	(offsetof(xl_hash_addovflpage, bmpage_found) + sizeof(bool))
+
+/*
+ * This is what we need to know about allocating a page for split.
+ *
+ * This data record is used for XLOG_HASH_SPLIT_ALLOCATE_PAGE
+ *
+ * Backup Blk 0: page for old bucket
+ * Backup Blk 1: page for new bucket
+ * Backup Blk 2: metapage
+ */
+typedef struct xl_hash_split_allocpage
+{
+	uint32		new_bucket;
+	uint16		old_bucket_flag;
+	uint16		new_bucket_flag;
+	uint8		flags;
+}	xl_hash_split_allocpage;
+
+#define SizeOfHashSplitAllocPage	\
+	(offsetof(xl_hash_split_allocpage, flags) + sizeof(uint8))
+
+/*
+ * This is what we need to know about completing the split operation.
+ *
+ * This data record is used for XLOG_HASH_SPLIT_COMPLETE
+ *
+ * Backup Blk 0: page for old bucket
+ * Backup Blk 1: page for new bucket
+ */
+typedef struct xl_hash_split_complete
+{
+	uint16		old_bucket_flag;
+	uint16		new_bucket_flag;
+}	xl_hash_split_complete;
+
+#define SizeOfHashSplitComplete \
+	(offsetof(xl_hash_split_complete, new_bucket_flag) + sizeof(uint16))
+
+/*
+ * This is what we need to know about move page contents required during
+ * squeeze operation.
+ *
+ * This data record is used for XLOG_HASH_MOVE_PAGE_CONTENTS
+ *
+ * Backup Blk 0: bucket page
+ * Backup Blk 1: page containing moved tuples
+ * Backup Blk 2: page from which tuples will be removed
+ */
+typedef struct xl_hash_move_page_contents
+{
+	uint16		ntups;
+	bool		is_prim_bucket_same_wrt;		/* TRUE if the page to which
+												 * tuples are moved is same as
+												 * primary bucket page */
+}	xl_hash_move_page_contents;
+
+#define SizeOfHashMovePageContents	\
+	(offsetof(xl_hash_move_page_contents, is_prim_bucket_same_wrt) + sizeof(bool))
+
+/*
+ * This is what we need to know about the squeeze page operation.
+ *
+ * This data record is used for XLOG_HASH_SQUEEZE_PAGE
+ *
+ * Backup Blk 0: page containing tuples moved from freed overflow page
+ * Backup Blk 1: freed overflow page
+ * Backup Blk 2: page previous to the freed overflow page
+ * Backup Blk 3: page next to the freed overflow page
+ * Backup Blk 4: bitmap page containing info of freed overflow page
+ * Backup Blk 5: meta page
+ */
+typedef struct xl_hash_squeeze_page
+{
+	BlockNumber prevblkno;
+	BlockNumber nextblkno;
+	uint16		ntups;
+	bool		is_prim_bucket_same_wrt;		/* TRUE if the page to which
+												 * tuples are moved is same as
+												 * primary bucket page */
+	bool		is_prev_bucket_same_wrt;		/* TRUE if the page to which
+												 * tuples are moved is the
+												 * page previous to the freed
+												 * overflow page */
+}	xl_hash_squeeze_page;
+
+#define SizeOfHashSqueezePage	\
+	(offsetof(xl_hash_squeeze_page, is_prev_bucket_same_wrt) + sizeof(bool))
+
+/*
+ * This is what we need to know about the deletion of index tuples from a page.
+ *
+ * This data record is used for XLOG_HASH_DELETE
+ *
+ * Backup Blk 0: primary bucket page
+ * Backup Blk 1: page from which tuples are deleted
+ */
+typedef struct xl_hash_delete
+{
+	bool		is_primary_bucket_page; /* TRUE if the operation is for
+										 * primary bucket page */
+}	xl_hash_delete;
+
+#define SizeOfHashDelete	(offsetof(xl_hash_delete, is_primary_bucket_page) + sizeof(bool))
+
+/*
+ * This is what we need for metapage update operation.
+ *
+ * This data record is used for XLOG_HASH_UPDATE_META_PAGE
+ *
+ * Backup Blk 0: meta page
+ */
+typedef struct xl_hash_update_meta_page
+{
+	double		ntuples;
+}	xl_hash_update_meta_page;
+
+#define SizeOfHashUpdateMetaPage	\
+	(offsetof(xl_hash_update_meta_page, ntuples) + sizeof(double))
+
+/*
+ * This is what we need to initialize metapage.
+ *
+ * This data record is used for XLOG_HASH_INIT_META_PAGE
+ *
+ * Backup Blk 0: meta page
+ */
+typedef struct xl_hash_init_meta_page
+{
+	double		num_tuples;
+	RegProcedure procid;
+	uint16		ffactor;
+}	xl_hash_init_meta_page;
+
+#define SizeOfHashInitMetaPage		\
+	(offsetof(xl_hash_init_meta_page, ffactor) + sizeof(uint16))
+
+/*
+ * This is what we need to initialize bitmap page.
+ *
+ * This data record is used for XLOG_HASH_INIT_BITMAP_PAGE
+ *
+ * Backup Blk 0: bitmap page
+ * Backup Blk 1: meta page
+ */
+typedef struct xl_hash_init_bitmap_page
+{
+	uint16		bmsize;
+}	xl_hash_init_bitmap_page;
+
+#define SizeOfHashInitBitmapPage	\
+	(offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
 
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index e519fdb..26cd059 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2335,13 +2335,9 @@ Options: fastupdate=on, gin_pending_list_limit=128
 -- HASH
 --
 CREATE INDEX hash_i4_index ON hash_i4_heap USING hash (random int4_ops);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 CREATE INDEX hash_name_index ON hash_name_heap USING hash (random name_ops);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 CREATE INDEX hash_txt_index ON hash_txt_heap USING hash (random text_ops);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 CREATE INDEX hash_f8_index ON hash_f8_heap USING hash (random float8_ops);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 CREATE UNLOGGED TABLE unlogged_hash_table (id int4);
 CREATE INDEX unlogged_hash_index ON unlogged_hash_table USING hash (id int4_ops);
 DROP TABLE unlogged_hash_table;
@@ -2350,7 +2346,6 @@ DROP TABLE unlogged_hash_table;
 -- maintenance_work_mem setting and fillfactor:
 SET maintenance_work_mem = '1MB';
 CREATE INDEX hash_tuplesort_idx ON tenk1 USING hash (stringu1 name_ops) WITH (fillfactor = 10);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM tenk1 WHERE stringu1 = 'TVAAAA';
                       QUERY PLAN                       
diff --git a/src/test/regress/expected/enum.out b/src/test/regress/expected/enum.out
index 514d1d0..0e60304 100644
--- a/src/test/regress/expected/enum.out
+++ b/src/test/regress/expected/enum.out
@@ -383,7 +383,6 @@ DROP INDEX enumtest_btree;
 -- Hash index / opclass with the = operator
 --
 CREATE INDEX enumtest_hash ON enumtest USING hash (col);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 SELECT * FROM enumtest WHERE col = 'orange';
   col   
 --------
diff --git a/src/test/regress/expected/hash_index.out b/src/test/regress/expected/hash_index.out
index f8b9f02..0a18efa 100644
--- a/src/test/regress/expected/hash_index.out
+++ b/src/test/regress/expected/hash_index.out
@@ -201,7 +201,6 @@ SELECT h.seqno AS f20000
 --
 CREATE TABLE hash_split_heap (keycol INT);
 CREATE INDEX hash_split_index on hash_split_heap USING HASH (keycol);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 INSERT INTO hash_split_heap SELECT 1 FROM generate_series(1, 70000) a;
 VACUUM FULL hash_split_heap;
 -- Let's do a backward scan.
@@ -230,5 +229,4 @@ DROP TABLE hash_temp_heap CASCADE;
 CREATE TABLE hash_heap_float4 (x float4, y int);
 INSERT INTO hash_heap_float4 VALUES (1.1,1);
 CREATE INDEX hash_idx ON hash_heap_float4 USING hash (x);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 DROP TABLE hash_heap_float4 CASCADE;
diff --git a/src/test/regress/expected/macaddr.out b/src/test/regress/expected/macaddr.out
index e84ff5f..151f9ce 100644
--- a/src/test/regress/expected/macaddr.out
+++ b/src/test/regress/expected/macaddr.out
@@ -41,7 +41,6 @@ SELECT * FROM macaddr_data;
 
 CREATE INDEX macaddr_data_btree ON macaddr_data USING btree (b);
 CREATE INDEX macaddr_data_hash ON macaddr_data USING hash (b);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 SELECT a, b, trunc(b) FROM macaddr_data ORDER BY 2, 1;
  a  |         b         |       trunc       
 ----+-------------------+-------------------
diff --git a/src/test/regress/expected/replica_identity.out b/src/test/regress/expected/replica_identity.out
index fa63235..67c34a9 100644
--- a/src/test/regress/expected/replica_identity.out
+++ b/src/test/regress/expected/replica_identity.out
@@ -12,7 +12,6 @@ CREATE UNIQUE INDEX test_replica_identity_keyab_key ON test_replica_identity (ke
 CREATE UNIQUE INDEX test_replica_identity_oid_idx ON test_replica_identity (oid);
 CREATE UNIQUE INDEX test_replica_identity_nonkey ON test_replica_identity (keya, nonkey);
 CREATE INDEX test_replica_identity_hash ON test_replica_identity USING hash (nonkey);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 CREATE UNIQUE INDEX test_replica_identity_expr ON test_replica_identity (keya, keyb, (3));
 CREATE UNIQUE INDEX test_replica_identity_partial ON test_replica_identity (keya, keyb) WHERE keyb != '3';
 -- default is 'd'/DEFAULT for user created tables
diff --git a/src/test/regress/expected/uuid.out b/src/test/regress/expected/uuid.out
index 423f277..db66dc7 100644
--- a/src/test/regress/expected/uuid.out
+++ b/src/test/regress/expected/uuid.out
@@ -114,7 +114,6 @@ SELECT COUNT(*) FROM guid1 WHERE guid_field >= '22222222-2222-2222-2222-22222222
 -- btree and hash index creation test
 CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
 CREATE INDEX guid1_hash  ON guid1 USING HASH  (guid_field);
-WARNING:  hash indexes are not WAL-logged and their use is discouraged
 -- unique index test
 CREATE UNIQUE INDEX guid1_unique_BTREE ON guid1 USING BTREE (guid_field);
 -- should fail
-- 
1.8.4.msysgit.0

