diff --git a/contrib/pg_xlogdump/pg_xlogdump.c b/contrib/pg_xlogdump/pg_xlogdump.c
index c1bfbc2..3ebaac6 100644
--- a/contrib/pg_xlogdump/pg_xlogdump.c
+++ b/contrib/pg_xlogdump/pg_xlogdump.c
@@ -363,14 +363,14 @@ XLogDumpCountRecord(XLogDumpConfig *config, XLogDumpStats *stats,
 	 * takes up BLCKSZ bytes, minus the "hole" length.
 	 *
 	 * XXX: We peek into xlogreader's private decoded backup blocks for the
-	 * hole_length. It doesn't seem worth it to add an accessor macro for
+	 * length of block. It doesn't seem worth it to add an accessor macro for
 	 * this.
 	 */
 	fpi_len = 0;
 	for (block_id = 0; block_id <= record->max_block_id; block_id++)
 	{
 		if (XLogRecHasBlockImage(record, block_id))
-			fpi_len += BLCKSZ - record->blocks[block_id].hole_length;
+			fpi_len += record->blocks[block_id].bkp_len;
 	}
 
 	/* Update per-rmgr statistics */
@@ -465,9 +465,16 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
 				   blk);
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				printf(" (FPW); hole: offset: %u, length: %u\n",
-					   record->blocks[block_id].hole_offset,
-					   record->blocks[block_id].hole_length);
+				if (record->blocks[block_id].is_compressed)
+					printf(" (FPW compressed); hole offset: %u, "
+						   "compressed length: %u, original length: %u\n",
+						   record->blocks[block_id].hole_offset,
+						   record->blocks[block_id].bkp_len,
+						   record->blocks[block_id].bkp_uncompress_len);
+				else
+					printf(" (FPW); hole offset: %u, length: %u\n",
+						   record->blocks[block_id].hole_offset,
+						   record->blocks[block_id].bkp_len);
 			}
 			putchar('\n');
 		}
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 9261e7f..1e5d4ea 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2282,6 +2282,35 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-wal-compression" xreflabel="wal_compression">
+      <term><varname>wal_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>wal_compression</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When this parameter is <literal>on</>, the <productname>PostgreSQL</>
+        server compresses the content of full-page writes when necessary and
+        inserts in WAL records with smaller sizes, reducing the amount of
+        WAL stored on disk.
+       </para>
+
+       <para>
+        Compression has the advantage of reducing the amount of disk I/O when
+        doing WAL-logging, at the cost of some extra CPU to perform the
+        compression of a block image.  At WAL replay, compressed block images
+        need extra CPU cycles to perform the decompression of each block
+        image, but it can reduce as well replay time in I/O bounded
+        environments.
+       </para>
+
+       <para>
+        The default value is <literal>off</>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-buffers" xreflabel="wal_buffers">
       <term><varname>wal_buffers</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a28155f..b0d401d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -89,6 +89,7 @@ char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
+bool		wal_compression = false;
 bool		log_checkpoints = false;
 int			sync_method = DEFAULT_SYNC_METHOD;
 int			wal_level = WAL_LEVEL_MINIMAL;
@@ -4671,7 +4672,7 @@ BootStrapXLOG(void)
 	record->xl_rmid = RM_XLOG_ID;
 	recptr += SizeOfXLogRecord;
 	/* fill the XLogRecordDataHeaderShort struct */
-	*(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
+	*(recptr++) = XLR_CHUNK_ID_DATA_SHORT;
 	*(recptr++) = sizeof(checkPoint);
 	memcpy(recptr, &checkPoint, sizeof(checkPoint));
 	recptr += sizeof(checkPoint);
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a1e2eb8..91830ac 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -24,12 +24,16 @@
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "pg_trace.h"
 
+/* maximum size for compression buffer of block image */
+#define PGLZ_MAX_BLCKSZ	PGLZ_MAX_OUTPUT(BLCKSZ)
+
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -50,6 +54,8 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
+	char		compressed_page[PGLZ_MAX_BLCKSZ]; /* recipient for compressed
+												   * page */
 }	registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -81,6 +87,9 @@ static char *hdr_scratch = NULL;
 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
 	 SizeOfXLogRecordDataHeaderLong)
 
+/* Scratch buffer holding block image data to be compressed  */
+static char *compression_scratch = NULL;
+
 /*
  * An array of XLogRecData structs, to hold registered data.
  */
@@ -97,6 +106,9 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
 				   XLogRecPtr *fpw_lsn);
 
+static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
+									uint16 hole_length, char *dest, uint16 *len);
+
 /*
  * Begin constructing a WAL record. This must be called before the
  * XLogRegister* functions and XLogInsert().
@@ -482,7 +494,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
+		XLogRecordBlockImageCompressionInfo cbimg;
 		bool		samerel;
+		bool		is_compressed = false;
+		bool		with_hole = false;
 
 		if (!regbuf->in_use)
 			continue;
@@ -522,6 +537,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bkpb.id = block_id;
 		bkpb.fork_flags = regbuf->forkno;
 		bkpb.data_length = 0;
+		bkpb.chunk_id = 0;
+		bkpb.chunk_id |= XLR_CHUNK_BLOCK_REFERENCE;
 
 		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
 			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
@@ -529,9 +546,13 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if (needs_backup)
 		{
 			Page		page = regbuf->page;
+			uint16		hole_length;
+			uint16		hole_offset;
+			uint16		compress_len = 0;
 
 			/*
-			 * The page needs to be backed up, so set up *bimg
+			 * The page needs to be backed up, so calculate its hole length
+			 * and offset.
 			 */
 			if (regbuf->flags & REGBUF_STANDARD)
 			{
@@ -543,49 +564,97 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper > lower &&
 					upper <= BLCKSZ)
 				{
-					bimg.hole_offset = lower;
-					bimg.hole_length = upper - lower;
+					hole_offset = lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to compress out */
-					bimg.hole_offset = 0;
-					bimg.hole_length = 0;
+					hole_offset = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
-				bimg.hole_offset = 0;
-				bimg.hole_length = 0;
+				hole_offset = 0;
+				hole_length = 0;
 			}
 
-			/* Fill in the remaining fields in the XLogRecordBlockData struct */
-			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+			/*
+			 * First try to compress block without its hole to improve the
+			 * compression of the whole. If the block is considered as
+			 * not compressible, complete the block header information
+			 * accordingly.
+			 */
+			if (wal_compression)
+			{
 
-			total_len += BLCKSZ - bimg.hole_length;
+				/* Perform compression of block */
+				if (XLogCompressBackupBlock(page,hole_offset,
+											hole_length,
+											regbuf->compressed_page,
+											&compress_len))
+				{
+					/* compression is done, add record */
+					is_compressed = true;
+				}
+			}
 
 			/*
 			 * Construct XLogRecData entries for the page content.
 			 */
-			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
-			rdt_datas_last = rdt_datas_last->next;
-			if (bimg.hole_length == 0)
+			bkpb.chunk_id |= XLR_CHUNK_BLOCK_HAS_IMAGE;
+
+			if (is_compressed)
 			{
-				rdt_datas_last->data = page;
-				rdt_datas_last->len = BLCKSZ;
+				/* compressed block information */
+				bimg.length = compress_len;
+				bimg.hole_offset = hole_offset;
+				bkpb.chunk_id |= XLR_CHUNK_BKP_COMPRESSED;
+				if (hole_length != 0)
+				{
+					cbimg.raw_length = BLCKSZ - hole_length;
+					bkpb.chunk_id |= XLR_CHUNK_BKP_WITH_HOLE;
+					with_hole = true;
+				}
+
+				/* record entry for compressed block */
+				rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+				rdt_datas_last = rdt_datas_last->next;
+				rdt_datas_last->data = regbuf->compressed_page;
+				rdt_datas_last->len = compress_len;
+				total_len += bimg.length;
 			}
 			else
 			{
-				/* must skip the hole */
-				rdt_datas_last->data = page;
-				rdt_datas_last->len = bimg.hole_offset;
+				/* uncompressed block information */
+				bimg.length = BLCKSZ - hole_length;
+				bimg.hole_offset = hole_offset;
+				total_len += bimg.length;
 
-				rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+				/* record entries for uncompressed block */
+				rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 				rdt_datas_last = rdt_datas_last->next;
+				if (hole_length == 0)
+				{
+					rdt_datas_last->data = page;
+					rdt_datas_last->len = BLCKSZ;
+				}
+				else
+				{
+					/* must skip the hole */
+					rdt_datas_last->data = page;
+					rdt_datas_last->len = hole_offset;
 
-				rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
-				rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
+					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+					rdt_datas_last = rdt_datas_last->next;
+
+					rdt_datas_last->data = page + (hole_offset + hole_length);
+					rdt_datas_last->len = BLCKSZ - (hole_offset + hole_length);
+					bkpb.chunk_id |= XLR_CHUNK_BKP_WITH_HOLE;
+					with_hole = true;
+				}
 			}
 		}
 
@@ -595,7 +664,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			 * Link the caller-supplied rdata chain for this buffer to the
 			 * overall list.
 			 */
-			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+			bkpb.chunk_id |= XLR_CHUNK_BLOCK_HAS_DATA;
 			bkpb.data_length = regbuf->rdata_len;
 			total_len += regbuf->rdata_len;
 
@@ -619,6 +688,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
+			if ((is_compressed == 1) && (with_hole == 1))
+			{
+				memcpy(scratch, &cbimg,
+					   SizeOfXLogRecordBlockImageCompressionInfo);
+				scratch += SizeOfXLogRecordBlockImageCompressionInfo;
+			}
 		}
 		if (!samerel)
 		{
@@ -634,13 +709,13 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	{
 		if (mainrdata_len > 255)
 		{
-			*(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+			*(scratch++) = XLR_CHUNK_ID_DATA_LONG;
 			memcpy(scratch, &mainrdata_len, sizeof(uint32));
 			scratch += sizeof(uint32);
 		}
 		else
 		{
-			*(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+			*(scratch++) = XLR_CHUNK_ID_DATA_SHORT;
 			*(scratch++) = (uint8) mainrdata_len;
 		}
 		rdt_datas_last->next = mainrdata_head;
@@ -681,6 +756,51 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 }
 
 /*
+ * Create a compressed version of a backup block. If successful, return
+ * true and set 'len' to its length. If block cannot be compressed or if
+ * compression failed return false.
+ */
+static bool
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
+						char *dest, uint16 *len)
+{
+	int orig_len = BLCKSZ - hole_length;
+	char *scratch_buf;
+	int32 compressed_len;
+
+	if (hole_length != 0)
+	{
+		scratch_buf = compression_scratch;
+		memcpy(scratch_buf, page, hole_offset);
+		memcpy(scratch_buf + hole_offset,
+				page + (hole_offset + hole_length),
+				BLCKSZ - (hole_length + hole_offset));
+	}
+	else
+		scratch_buf = page;
+
+	/* run compression */
+	compressed_len = pglz_compress(scratch_buf, orig_len, dest,
+								   PGLZ_strategy_default);
+
+	/* leave if data cannot be compressed */
+	if (compressed_len < 0)
+		return false;
+
+	/*
+	 * We recheck the actual size even if pglz_compress() reports success and
+	 * see if at least 2 bytes of length have been saved, as this corresponds
+	 * to the additional amount of data stored in WAL record for a compressed
+	 * block via raw_length when block contains hole.
+	 */
+	*len = (uint16) compressed_len;
+	if ((hole_length != 0) &&
+		(*len >= orig_len - SizeOfXLogRecordBlockImageCompressionInfo))
+		return false;
+	return true;
+}
+
+/*
  * Determine whether the buffer referenced has to be backed up.
  *
  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
@@ -893,4 +1013,9 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	/* allocate scratch buffer used for compression of block images */
+	if (compression_scratch == NULL)
+		compression_scratch = MemoryContextAllocZero(xloginsert_cxt,
+													 BLCKSZ);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 60470b5..62be704 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -20,6 +20,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
 
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 
@@ -74,13 +75,15 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 	state->max_block_id = -1;
 
 	/*
-	 * Permanently allocate readBuf.  We do it this way, rather than just
-	 * making a static array, for two reasons: (1) no need to waste the
-	 * storage in most instantiations of the backend; (2) a static char array
-	 * isn't guaranteed to have any particular alignment, whereas palloc()
-	 * will provide MAXALIGN'd storage.
+	 * Permanently allocate readBuf and uncompressBuf.  We do it this way,
+	 * rather than just making a static array, for two reasons:
+	 * (1) no need to waste the  storage in most instantiations of the
+	 * backend; (2) a static char array isn't guaranteed to have any
+	 * particular alignment, whereas palloc() will provide MAXALIGN'd
+	 * storage.
 	 */
 	state->readBuf = (char *) palloc(XLOG_BLCKSZ);
+	state->uncompressBuf = (char *) palloc(BLCKSZ);
 
 	state->read_page = pagereadfunc;
 	/* system_identifier initialized to zeroes above */
@@ -98,6 +101,7 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
 	{
 		pfree(state->errormsg_buf);
 		pfree(state->readBuf);
+		pfree(state->uncompressBuf);
 		pfree(state);
 		return NULL;
 	}
@@ -125,6 +129,7 @@ XLogReaderFree(XLogReaderState *state)
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
 	pfree(state->readBuf);
+	pfree(state->uncompressBuf);
 	pfree(state);
 }
 
@@ -133,7 +138,7 @@ XLogReaderFree(XLogReaderState *state)
  * Returns true if successful, false if out of memory.
  *
  * readRecordBufSize is set to the new buffer size.
- *
+
  * To avoid useless small increases, round its size to a multiple of
  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
  * with.  (That is enough for all "normal" records, but very large commit or
@@ -922,6 +927,8 @@ ResetDecoder(XLogReaderState *state)
 		state->blocks[block_id].in_use = false;
 		state->blocks[block_id].has_image = false;
 		state->blocks[block_id].has_data = false;
+		state->blocks[block_id].is_compressed = false;
+		state->blocks[block_id].with_hole = false;
 	}
 	state->max_block_id = -1;
 }
@@ -952,6 +959,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	uint32		datatotal;
 	RelFileNode *rnode = NULL;
 	uint8		block_id;
+	uint8		chunk_id;
+	bool		is_compressed=0;
+	bool		with_hole=0;
 
 	ResetDecoder(state);
 
@@ -965,9 +975,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	datatotal = 0;
 	while (remaining > datatotal)
 	{
-		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
+		COPY_HEADER_FIELD(&chunk_id, sizeof(uint8));
 
-		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
+		if (chunk_id == XLR_CHUNK_ID_DATA_SHORT)
 		{
 			/* XLogRecordDataHeaderShort */
 			uint8		main_data_len;
@@ -979,7 +989,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			break;				/* by convention, the main data fragment is
 								 * always last */
 		}
-		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
+		else if (chunk_id == XLR_CHUNK_ID_DATA_LONG)
 		{
 			/* XLogRecordDataHeaderLong */
 			uint32		main_data_len;
@@ -990,81 +1000,148 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			break;				/* by convention, the main data fragment is
 								 * always last */
 		}
-		else if (block_id <= XLR_MAX_BLOCK_ID)
+		else if ((chunk_id & XLR_CHUNK_BLOCK_REFERENCE) != 0)
 		{
-			/* XLogRecordBlockHeader */
-			DecodedBkpBlock *blk;
-			uint8		fork_flags;
-
-			if (block_id <= state->max_block_id)
+			COPY_HEADER_FIELD(&block_id, sizeof(uint8));
+			if (block_id <= XLR_MAX_BLOCK_ID)
 			{
-				report_invalid_record(state,
-									  "out-of-order block_id %u at %X/%X",
-									  block_id,
-									  (uint32) (state->ReadRecPtr >> 32),
-									  (uint32) state->ReadRecPtr);
-				goto err;
-			}
-			state->max_block_id = block_id;
+				/* XLogRecordBlockHeader */
+				DecodedBkpBlock *blk;
+				uint8		fork_flags;
 
-			blk = &state->blocks[block_id];
-			blk->in_use = true;
+				if (block_id <= state->max_block_id)
+				{
+					report_invalid_record(state,
+										"out-of-order block_id %u at %X/%X",
+										block_id,
+										(uint32) (state->ReadRecPtr >> 32),
+										(uint32) state->ReadRecPtr);
+					goto err;
+				}
+				state->max_block_id = block_id;
 
-			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
-			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
-			blk->flags = fork_flags;
-			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
-			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+				blk = &state->blocks[block_id];
+				blk->in_use = true;
 
-			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
-			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
-			if (blk->has_data && blk->data_len == 0)
-				report_invalid_record(state,
-					  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
-									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
-			if (!blk->has_data && blk->data_len != 0)
-				report_invalid_record(state,
-				 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
-									  (unsigned int) blk->data_len,
-									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
-			datatotal += blk->data_len;
+				COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
+				blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
+				blk->flags = fork_flags;
+				blk->has_image = ((chunk_id & XLR_CHUNK_BLOCK_HAS_IMAGE) != 0);
+				blk->has_data = ((chunk_id & XLR_CHUNK_BLOCK_HAS_DATA) != 0);
 
-			if (blk->has_image)
-			{
-				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
-				COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-				datatotal += BLCKSZ - blk->hole_length;
-			}
-			if (!(fork_flags & BKPBLOCK_SAME_REL))
-			{
-				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
-				rnode = &blk->rnode;
+				COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
+				/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
+				if (blk->has_data && blk->data_len == 0)
+					report_invalid_record(state,
+						"XLR_CHUNK_BLOCK_HAS_DATA set, but no data included at %X/%X",
+						(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+				if (!blk->has_data && blk->data_len != 0)
+					report_invalid_record(state,
+						"XLR_CHUNK_BLOCK_HAS_DATA not set, but data length is %u at %X/%X",
+						(unsigned int) blk->data_len,
+						(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+				datatotal += blk->data_len;
+
+				if (blk->has_image)
+				{
+					XLogRecordBlockImageHeader bkp_info;
+					COPY_HEADER_FIELD(&bkp_info, sizeof(XLogRecordBlockImageHeader));
+					blk->bkp_len = bkp_info.length;
+					blk->hole_offset = bkp_info.hole_offset;
+
+					blk->is_compressed = ((chunk_id & XLR_CHUNK_BKP_COMPRESSED) != 0);
+					blk->with_hole = ((chunk_id & XLR_CHUNK_BKP_WITH_HOLE) != 0);
+
+					if ((blk->with_hole == 0 && blk->hole_offset != 0) ||
+						(blk->with_hole == 1 && blk->hole_offset <= 0))
+					{
+						report_invalid_record(state,
+							"Invalid hole offset in record %X/%X",
+							(uint32) (state->ReadRecPtr >> 32),
+							(uint32) state->ReadRecPtr);
+						goto err;
+					}
+
+					if (blk->is_compressed == 1)
+					{
+						if (blk->with_hole == 1)
+						{
+							COPY_HEADER_FIELD(&blk->bkp_uncompress_len, sizeof(uint16));
+							if (blk->bkp_uncompress_len >= BLCKSZ)
+							{
+								report_invalid_record(state,
+										"Invalid block length in record %X/%X",
+										(uint32) (state->ReadRecPtr >> 32),
+										(uint32) state->ReadRecPtr);
+								goto err;
+							}
+						}
+						else
+							blk->bkp_uncompress_len = BLCKSZ;
+					}
+					else
+					{
+						/*
+						 * Length of a block image must be less than BLCKSZ
+						 * if the block has hole
+						 */
+						if (blk->with_hole == 1 && blk->bkp_len >= BLCKSZ)
+							report_invalid_record(state,
+									"Invalid block length in record %X/%X",
+									(uint32) (state->ReadRecPtr >> 32),
+									(uint32) state->ReadRecPtr);
+
+						/*
+						 * Length of a block image must be equal to BLCKSZ
+						 * if the block does not have hole
+						 */
+						if (blk->with_hole == 0 && blk->bkp_len != BLCKSZ)
+							report_invalid_record(state,
+								"Invalid block length in record %X/%X",
+								(uint32) (state->ReadRecPtr >> 32),
+								(uint32) state->ReadRecPtr);
+					}
+					datatotal += blk->bkp_len;
+				}
+				if (!(fork_flags & BKPBLOCK_SAME_REL))
+				{
+					COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
+					rnode = &blk->rnode;
+				}
+				else
+				{
+					if (rnode == NULL)
+					{
+						report_invalid_record(state,
+							"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
+							(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+						goto err;
+					}
+
+					blk->rnode = *rnode;
+				}
+				COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
 			}
 			else
 			{
-				if (rnode == NULL)
-				{
-					report_invalid_record(state,
-						"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
-										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
-					goto err;
-				}
-
-				blk->rnode = *rnode;
+				report_invalid_record(state,
+									"invalid block_id %u at %X/%X",
+									block_id,
+									(uint32) (state->ReadRecPtr >> 32),
+									(uint32) state->ReadRecPtr);
+				goto err;
 			}
-			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
 		}
 		else
 		{
 			report_invalid_record(state,
-								  "invalid block_id %u at %X/%X",
-								  block_id,
+								  "invalid chunk_id %u at %X/%X",
+								  chunk_id,
 								  (uint32) (state->ReadRecPtr >> 32),
 								  (uint32) state->ReadRecPtr);
-			goto err;
+				goto err;
 		}
 	}
-
 	if (remaining != datatotal)
 		goto shortdata_err;
 
@@ -1088,7 +1165,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 		if (blk->has_image)
 		{
 			blk->bkp_image = ptr;
-			ptr += BLCKSZ - blk->hole_length;
+			ptr += blk->bkp_len;
 		}
 		if (blk->has_data)
 		{
@@ -1194,6 +1271,8 @@ bool
 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
+	char   *block_image;
+	int		hole_length;
 
 	if (!record->blocks[block_id].in_use)
 		return false;
@@ -1201,19 +1280,43 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 		return false;
 
 	bkpb = &record->blocks[block_id];
+	block_image = bkpb->bkp_image;
+
+	/*
+	 * Fetch page data, with different processing depending on if the
+	 * page is compressed or not.
+	 */
+	if (bkpb->is_compressed)
+	{
+		if (pglz_decompress(block_image, bkpb->bkp_len, record->uncompressBuf,
+							bkpb->bkp_uncompress_len) < 0)
+		{
+			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
+								  (uint32) (record->ReadRecPtr >> 32),
+								  (uint32) record->ReadRecPtr,
+								  block_id);
+			return false;
+		}
+
+		block_image = record->uncompressBuf;
+		hole_length = BLCKSZ - bkpb->bkp_uncompress_len;
+	}
+	else
+		hole_length = BLCKSZ - bkpb->bkp_len;
 
-	if (bkpb->hole_length == 0)
+	/* generate page, taking into account hole if necessary */
+	if (hole_length == 0)
 	{
-		memcpy(page, bkpb->bkp_image, BLCKSZ);
+		memcpy(page, block_image, BLCKSZ);
 	}
 	else
 	{
-		memcpy(page, bkpb->bkp_image, bkpb->hole_offset);
+		memcpy(page, block_image, bkpb->hole_offset);
 		/* must zero-fill the hole */
-		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
-		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
-			   bkpb->bkp_image + bkpb->hole_offset,
-			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+		MemSet(page + bkpb->hole_offset, 0, hole_length);
+		memcpy(page + (bkpb->hole_offset + hole_length),
+			   block_image + bkpb->hole_offset,
+			   BLCKSZ - (bkpb->hole_offset + hole_length));
 	}
 
 	return true;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index d84dba7..130ae74 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -995,6 +995,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"wal_compression", PGC_USERSET, WAL_SETTINGS,
+			 gettext_noop("Compresses full-page writes written in WAL file."),
+			 NULL
+		},
+		&wal_compression,
+		false,
+		NULL, NULL, NULL
+	},
 
 	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f8f9ce1..7590a6f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -186,6 +186,7 @@
 					#   fsync_writethrough
 					#   open_sync
 #full_page_writes = on			# recover from partial page writes
+#wal_compression = off			# enable compression of full-page writes
 #wal_log_hints = off			# also do full page writes of non-critical updates
 					# (change requires restart)
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index a16089f..54ab3c9 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -1089,7 +1089,7 @@ WriteEmptyXLOG(void)
 	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
 	record->xl_rmid = RM_XLOG_ID;
 	recptr += SizeOfXLogRecord;
-	*(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
+	*(recptr++) = XLR_CHUNK_ID_DATA_SHORT;
 	*(recptr++) = sizeof(CheckPoint);
 	memcpy(recptr, &ControlFile.checkPointCopy,
 		   sizeof(CheckPoint));
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0e8e587..2b1f423 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -100,6 +100,7 @@ extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
+extern bool wal_compression;
 extern bool log_checkpoints;
 
 extern int	CheckPointSegments;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 74bec20..06c35ba 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -52,9 +52,12 @@ typedef struct
 
 	/* Information on full-page image, if any */
 	bool		has_image;
+	bool		is_compressed;
+	bool		with_hole;
 	char	   *bkp_image;
+	uint16		bkp_len;
+	uint16		bkp_uncompress_len;
 	uint16		hole_offset;
-	uint16		hole_length;
 
 	/* Buffer holding the rmgr-specific data associated with this block */
 	bool		has_data;
@@ -138,6 +141,9 @@ struct XLogReaderState
 	/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
 	char	   *readBuf;
 
+	/* Scratch buffer used for uncompressed pages */
+	char	   *uncompressBuf;
+
 	/* last read segment, segment offset, read length, TLI */
 	XLogSegNo	readSegNo;
 	uint32		readOff;
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 25a9265..de13618 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -35,8 +35,9 @@
  * the fields are not aligned.
  *
  * The XLogRecordBlockHeader, XLogRecordDataHeaderShort and
- * XLogRecordDataHeaderLong structs all begin with a single 'id' byte. It's
- * used to distinguish between block references, and the main data structs.
+ * XLogRecordDataHeaderLong structs all begin with a single 'chunk_id' byte.
+ * It's used to distinguish between different kinds of block references,
+ * and the main data structs.
  */
 typedef struct XLogRecord
 {
@@ -82,12 +83,13 @@ typedef struct XLogRecord
  */
 typedef struct XLogRecordBlockHeader
 {
+	uint8		chunk_id;		/* xlog fragment id */
 	uint8		id;				/* block reference ID */
 	uint8		fork_flags;		/* fork within the relation, and flags */
 	uint16		data_length;	/* number of payload bytes (not including page
 								 * image) */
 
-	/* If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows */
+	/* If XLR_CHUNK_BLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows */
 	/* If !BKPBLOCK_SAME_REL is not set, a RelFileNode follows */
 	/* BlockNumber follows */
 } XLogRecordBlockHeader;
@@ -96,30 +98,57 @@ typedef struct XLogRecordBlockHeader
 
 /*
  * Additional header information when a full-page image is included
- * (i.e. when BKPBLOCK_HAS_IMAGE is set).
+ * (i.e. when XLR_CHUNK_BLOCK_HAS_IMAGE is set).
  *
- * As a trivial form of data compression, the XLOG code is aware that
- * PG data pages usually contain an unused "hole" in the middle, which
- * contains only zero bytes.  If hole_length > 0 then we have removed
- * such a "hole" from the stored data (and it's not counted in the
- * XLOG record's CRC, either).  Hence, the amount of block data actually
- * present is BLCKSZ - hole_length bytes.
+ * Block images are able to do several types of compression:
+ * - When wal_compression is off, as a trivial form of compression, the
+ * XLOG code is aware that PG data pages usually contain an unused "hole"
+ * in the middle, which contains only zero bytes.  If length < BLCKSZ
+ * then we have removed such a "hole" from the stored data (and it is
+ * not counted in the XLOG record's CRC, either).  Hence, the amount
+ * of block data actually present is "length" bytes.  The hole "offset"
+ * on page is defined using "hole_offset".
+ * - When wal_compression is on, block images are compressed using a
+ * compression algorithm without their hole to improve compression
+ * process of the page. "length" corresponds in this case to the length
+ * of the compressed block. "hole_offset" is the hole offset of the page,
+ * and the length of the uncompressed block is defined by "raw_length",
+ * whose data is included in the record only when compression is enabled
+ * and block contains hole. If the block has no hole, it is ensured
+ * that the raw size of a compressed block image is equal to BLCKSZ,
+ * hence the contents of XLogRecordBlockImageCompressionInfo
+ * are not necessary.
  */
 typedef struct XLogRecordBlockImageHeader
 {
-	uint16		hole_offset;	/* number of bytes before "hole" */
-	uint16		hole_length;	/* number of bytes in "hole" */
+	uint16	length;		/* length of block data in record */
+	uint16	hole_offset;	/* number of bytes before "hole" */
+
+	/* Followed by the data related to compression if block is compressed */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader sizeof(XLogRecordBlockImageHeader)
 
 /*
+ * Extra header information used when a block is compressed and has a hole.
+ * This state is determined by the previous flags is_compressed and with_hole.
+ */
+typedef struct XLogRecordBlockImageCompressionInfo
+{
+	uint16	raw_length;		/* raw length of uncompressed block */
+} XLogRecordBlockImageCompressionInfo;
+
+#define SizeOfXLogRecordBlockImageCompressionInfo \
+	sizeof(XLogRecordBlockImageCompressionInfo)
+
+/*
  * Maximum size of the header for a block reference. This is used to size a
  * temporary buffer for constructing the header.
  */
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
+	 SizeOfXLogRecordBlockImageCompressionInfo + \
 	 sizeof(RelFileNode) + \
 	 sizeof(BlockNumber))
 
@@ -129,8 +158,6 @@ typedef struct XLogRecordBlockImageHeader
  */
 #define BKPBLOCK_FORK_MASK	0x0F
 #define BKPBLOCK_FLAG_MASK	0xF0
-#define BKPBLOCK_HAS_IMAGE	0x10	/* block data is an XLogRecordBlockImage */
-#define BKPBLOCK_HAS_DATA	0x20
 #define BKPBLOCK_WILL_INIT	0x40	/* redo will re-init the page */
 #define BKPBLOCK_SAME_REL	0x80	/* RelFileNode omitted, same as previous */
 
@@ -145,7 +172,7 @@ typedef struct XLogRecordBlockImageHeader
  */
 typedef struct XLogRecordDataHeaderShort
 {
-	uint8		id;				/* XLR_BLOCK_ID_DATA_SHORT */
+	uint8		chunk_id;		/* XLR_CHUNK_ID_DATA_SHORT */
 	uint8		data_length;	/* number of payload bytes */
 } XLogRecordDataHeaderShort;
 
@@ -153,18 +180,17 @@ typedef struct XLogRecordDataHeaderShort
 
 typedef struct XLogRecordDataHeaderLong
 {
-	uint8		id;				/* XLR_BLOCK_ID_DATA_LONG */
+	uint8		chunk_id;		/* XLR_CHUNK_ID_DATA_LONG */
 	/* followed by uint32 data_length, unaligned */
 } XLogRecordDataHeaderLong;
 
 #define SizeOfXLogRecordDataHeaderLong (sizeof(uint8) + sizeof(uint32))
 
 /*
- * Block IDs used to distinguish different kinds of record fragments. Block
+ * Block IDs used to distinguish between different block_references. Block
  * references are numbered from 0 to XLR_MAX_BLOCK_ID. A rmgr is free to use
  * any ID number in that range (although you should stick to small numbers,
- * because the WAL machinery is optimized for that case). A couple of ID
- * numbers are reserved to denote the "main" data portion of the record.
+ * because the WAL machinery is optimized for that case).
  *
  * The maximum is currently set at 32, quite arbitrarily. Most records only
  * need a handful of block references, but there are a few exceptions that
@@ -172,7 +198,25 @@ typedef struct XLogRecordDataHeaderLong
  */
 #define XLR_MAX_BLOCK_ID			32
 
-#define XLR_BLOCK_ID_DATA_SHORT		255
-#define XLR_BLOCK_ID_DATA_LONG		254
+/*
+ * Chunk IDs are used to  distinguish between different kinds of xlog record
+ * fragments. A couple of ID numbers are reserved to denote the "main" data
+ * portion of the record.
+ * Presence of a compressed block image is determined by XLR_CHUNK_BKP_COMPRESSED
+ * in chunk ID. Similarly presence of a hole in block image is determined by
+ * XLR_CHUNK_BKP_WITH_HOLE bit in chunk ID.
+ * XLR_CHUNK_BLOCK_REFERENCE refers to block fragments of an xlog record.
+ * XLR_CHUNK_BLOCK_HAS_IMAGE indicates presence of back up image in block reference.
+ * XLR_CHUNK_BLOCK_HAS_DATA indicates presence of data in block reference.
+ */
+
+#define XLR_CHUNK_ID_DATA_SHORT		255
+#define XLR_CHUNK_ID_DATA_LONG		254
+#define XLR_CHUNK_BKP_COMPRESSED  	0x01
+#define XLR_CHUNK_BKP_WITH_HOLE		0x02
+#define XLR_CHUNK_BLOCK_REFERENCE	0x10
+#define XLR_CHUNK_BLOCK_HAS_IMAGE	0x04
+#define XLR_CHUNK_BLOCK_HAS_DATA	0x08
+
 
 #endif   /* XLOGRECORD_H */
