diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c
index 328c722..a9e4d43 100644
--- a/contrib/worker_spi/worker_spi.c
+++ b/contrib/worker_spi/worker_spi.c
@@ -180,6 +180,13 @@ worker_spi_main(Datum main_arg)
 	/* We're now ready to receive signals */
 	BackgroundWorkerUnblockSignals();
 
+	/*
+	 * Allocate memory to store compressed and uncompressed backup blocks
+	 * This comes of use if backup blocks are to be compressed
+	 * at the time of writing FPW in WAL.
+	 */
+	CompressBackupBlocksPagesAlloc();
+
 	/* Connect to our database */
 	BackgroundWorkerInitializeConnection("postgres", NULL);
 
@@ -243,6 +250,13 @@ worker_spi_main(Datum main_arg)
 		{
 			got_sighup = false;
 			ProcessConfigFile(PGC_SIGHUP);
+
+			/*
+			 * Allocate memory to store compressed and uncompressed backup blocks
+			 * This comes of use if backup blocks need to be compressed
+			 * at the time of writing FPW in WAL.
+			 */
+			CompressBackupBlocksPagesAlloc();
 		}
 
 		/*
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 47b1192..f5f8cbe 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2181,14 +2181,14 @@ include_dir 'conf.d'
      </varlistentry>
 
      <varlistentry id="guc-full-page-writes" xreflabel="full_page_writes">
-      <term><varname>full_page_writes</varname> (<type>boolean</type>)
+      <term><varname>full_page_writes</varname> (<type>enum</type>)</term>
       <indexterm>
        <primary><varname>full_page_writes</> configuration parameter</primary>
       </indexterm>
-      </term>
       <listitem>
        <para>
-        When this parameter is on, the <productname>PostgreSQL</> server
+        When this parameter is <literal>on</> or <literal>compress</>,
+        the <productname>PostgreSQL</> server
         writes the entire content of each disk page to WAL during the
         first modification of that page after a checkpoint.
         This is needed because
@@ -2206,6 +2206,11 @@ include_dir 'conf.d'
        </para>
 
        <para>
+        Valid values are <literal>on</>, <literal>compress</>, and <literal>off</>.
+        The default is <literal>on</>.
+       </para>
+
+       <para>
         Turning this parameter off speeds normal operation, but
         might lead to either unrecoverable data corruption, or silent
         data corruption, after a system failure. The risks are similar to turning off
@@ -2220,9 +2225,13 @@ include_dir 'conf.d'
        </para>
 
        <para>
+        Setting this parameter to <literal>compress</> compresses
+        the full page image to reduce the amount of WAL data.
+       </para>
+
+       <para>
         This parameter can only be set in the <filename>postgresql.conf</>
         file or on the server command line.
-        The default is <literal>on</>.
        </para>
       </listitem>
      </varlistentry>
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e0957ff..2e59db6 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -31,6 +31,21 @@ const struct config_enum_entry wal_level_options[] = {
 	{NULL, 0, false}
 };
 
+static const char *
+full_page_writes_str(FullPageWritesLevel level)
+{
+	switch (level)
+	{
+		case FULL_PAGE_WRITES_ON:
+			return "true";
+		case FULL_PAGE_WRITES_COMPRESS:
+			return "compress";
+		case FULL_PAGE_WRITES_OFF:
+			return "false";
+	}
+	return "unrecognized";
+}
+
 void
 xlog_desc(StringInfo buf, XLogRecord *record)
 {
@@ -49,7 +64,7 @@ xlog_desc(StringInfo buf, XLogRecord *record)
 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
 						 checkpoint->ThisTimeLineID,
 						 checkpoint->PrevTimeLineID,
-						 checkpoint->fullPageWrites ? "true" : "false",
+						 full_page_writes_str(checkpoint->fullPageWrites),
 						 checkpoint->nextXidEpoch, checkpoint->nextXid,
 						 checkpoint->nextOid,
 						 checkpoint->nextMulti,
@@ -118,10 +133,10 @@ xlog_desc(StringInfo buf, XLogRecord *record)
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
-		bool		fpw;
+		int		fpw;
 
-		memcpy(&fpw, rec, sizeof(bool));
-		appendStringInfo(buf, "%s", fpw ? "true" : "false");
+		memcpy(&fpw, rec, sizeof(int));
+		appendStringInfo(buf, "full_page_writes: %s", full_page_writes_str(fpw));
 	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3c9aeae..0d8bf90 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -61,6 +61,7 @@
 #include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/pg_lzcompress.h"
 #include "utils/ps_status.h"
 #include "utils/relmapper.h"
 #include "utils/snapmgr.h"
@@ -84,7 +85,7 @@ int			XLogArchiveTimeout = 0;
 bool		XLogArchiveMode = false;
 char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
-bool		fullPageWrites = true;
+int 		fullPageWrites = FULL_PAGE_WRITES_ON;
 bool		wal_log_hints = false;
 bool		log_checkpoints = false;
 int			sync_method = DEFAULT_SYNC_METHOD;
@@ -178,7 +179,7 @@ static TimeLineID receiveTLI = 0;
  * that the recovery starting checkpoint record indicates, and then updated
  * each time XLOG_FPW_CHANGE record is replayed.
  */
-static bool lastFullPageWrites;
+static int lastFullPageWrites;
 
 /*
  * Local copy of SharedRecoveryInProgress variable. True actually means "not
@@ -456,7 +457,7 @@ typedef struct XLogCtlInsert
 	 */
 	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
 	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
-	bool		fullPageWrites;
+	int		fullPageWrites;
 
 	/*
 	 * exclusiveBackup is true if a backup started with pg_start_backup() is
@@ -747,6 +748,11 @@ static bool holdingAllLocks = false;
 static MemoryContext walDebugCxt = NULL;
 #endif
 
+/* For storing backup blocks before and after compression */
+static char *compressedPages;
+static char *uncompressedPages;
+static bool outOfMem = 0;
+
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
 static bool recoveryStopsBefore(XLogRecord *record);
@@ -820,6 +826,8 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 				  XLogRecPtr *PrevPtr);
 static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
+static char *CompressBackupBlocks(char *page, uint32 orig_len, char *dest, uint32 *len);
+void CompressBackupBlocksPagesAlloc(void);
 static char *GetXLogBuffer(XLogRecPtr ptr);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
@@ -870,6 +878,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	static XLogRecord *rechdr;
 	XLogRecPtr	StartPos;
 	XLogRecPtr	EndPos;
+	int			fpw;
+
+	char *compressed_blocks;
+	uint32 compressed_len = 0;
+	uint32 orig_len = 0;
+	bool compressed = 0;
+	BkpBlock   *bkpb;
 
 	if (rechdr == NULL)
 	{
@@ -914,6 +929,10 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	 * chain if we have to loop back here.
 	 */
 begin:;
+
+	orig_len = 0;
+	compressed = 0;
+
 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
 	{
 		dtbuf[i] = InvalidBuffer;
@@ -922,12 +941,13 @@ begin:;
 
 	/*
 	 * Decide if we need to do full-page writes in this XLOG record: true if
-	 * full_page_writes is on or we have a PITR request for it.  Since we
-	 * don't yet have an insertion lock, fullPageWrites and forcePageWrites
-	 * could change under us, but we'll recheck them once we have a lock.
+	 * full_page_writes is needed (i.e., on or compress) or we have a PITR
+	 * request for it.  Since we don't yet have an insertion lock,
+	 * fullPageWrites and forcePageWrites could change under us, but we'll
+	 * recheck them once we have a lock.
 	 */
-	doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
-
+	fpw = Insert->fullPageWrites;
+	doPageWrites = fpw != FULL_PAGE_WRITES_OFF || Insert->forcePageWrites;
 	len = 0;
 	for (rdt = rdata;;)
 	{
@@ -1049,6 +1069,49 @@ begin:;
 			write_len += rdt->len;
 			rdt->next = NULL;
 		}
+
+	}
+
+	/*
+     * If compression is set on replace the rdata nodes of backup blocks added in the loop
+     * above by single rdata node that contains compressed backup blocks and their headers
+     * except the header of first block which is used to store the information about compression.
+     */
+	if ((fpw == FULL_PAGE_WRITES_OFF || fpw == FULL_PAGE_WRITES_COMPRESS) && !outOfMem && rdt_lastnormal->next != NULL)
+	{
+		rdt = rdt_lastnormal->next;
+		rdt = rdt->next;
+		for (; rdt != NULL; rdt = rdt->next)
+		{
+			memcpy(uncompressedPages + orig_len, rdt->data, rdt->len);
+			orig_len += rdt->len;
+		}
+		if(orig_len)
+		{
+			/* Compress the backup blocks before including it in rdata chain */
+			compressed_blocks = CompressBackupBlocks(uncompressedPages, orig_len,
+													compressedPages, &(compressed_len));
+			if (compressed_blocks != NULL)
+			{
+				/*
+				 * write_len is the length of compressed block and its varlena
+				 * header
+				 */
+				rdt = (rdt_lastnormal->next)->next;
+				rdt->data = compressed_blocks;
+				rdt->len = compressed_len;
+				write_len = len + sizeof(BkpBlock);
+				write_len += rdt->len;
+				rdt->next = NULL;
+				compressed = 1;
+			}
+		}
+		/* Adding information about compression in backup block header */
+		bkpb = (BkpBlock *)rdt_lastnormal->next->data;
+		if (!compressed)
+			bkpb->flag_compress = BKPBLOCKS_UNCOMPRESSED;
+		else
+			bkpb->flag_compress = BKPBLOCKS_COMPRESSED;
 	}
 
 	/*
@@ -1159,12 +1222,14 @@ begin:;
 	}
 
 	/*
-	 * Also check to see if fullPageWrites or forcePageWrites was just turned
-	 * on; if we weren't already doing full-page writes then go back and
-	 * recompute. (If it was just turned off, we could recompute the record
-	 * without full pages, but we choose not to bother.)
+	 * Also check to see if fullPageWrites was just changed on or compress,
+	 * or if forcePageWrites was just turned on; if we weren't already doing
+	 * full-page writes then go back and recompute. (If it was just turned off,
+	 * we could recompute the record without full pages, but we choose not
+	 * to bother.)
 	 */
-	if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
+	if ((Insert->fullPageWrites != FULL_PAGE_WRITES_OFF || Insert->forcePageWrites) &&
+		!doPageWrites)
 	{
 		/* Oops, must redo it with full-page data. */
 		WALInsertLockRelease();
@@ -1778,6 +1843,42 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 }
 
 /*
+ * Create a compressed version of a backup block
+ *
+ * If successful, return a compressed result and set 'len' to its length.
+ * Otherwise (ie, compressed result is actually bigger than original),
+ * return NULL.
+ */
+static char *
+CompressBackupBlocks(char *page, uint32 orig_len, char *dest, uint32 *len)
+{
+	struct varlena *buf = (struct varlena *) dest;
+	bool ret;
+	ret = pglz_compress(page, BLCKSZ,
+						(PGLZ_Header *) buf, PGLZ_strategy_default);
+
+	/* Zero is returned for incompressible data */
+	if(!ret)
+		return NULL;
+	/*
+	 * We recheck the actual size even if pglz_compress() report success,
+	 * because it might be satisfied with having saved as little as one byte
+	 * in the compressed data --- which could turn into a net loss once you
+	 * consider header and alignment padding.  Worst case, the compressed
+	 * format might require three padding bytes (plus header, which is
+	 * included in VARSIZE(buf)), whereas the uncompressed format would take
+	 * only one header byte and no padding if the value is short enough.  So
+	 * we insist on a savings of more than 2 bytes to ensure we have a gain.
+	*/
+	if(VARSIZE(buf) >= orig_len - 2)
+	{
+		return NULL;
+	}
+	*len = VARSIZE(buf);
+	return (char *) buf;
+}
+
+/*
  * Get a pointer to the right location in the WAL buffer containing the
  * given XLogRecPtr.
  *
@@ -2083,7 +2184,7 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock,
 			bkpb->hole_offset = 0;
 			bkpb->hole_length = 0;
 		}
-
+		bkpb->flag_compress = BKPBLOCKS_UNCOMPRESSED;
 		return true;			/* buffer requires backup */
 	}
 
@@ -3985,22 +4086,31 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
 
 	/* Locate requested BkpBlock in the record */
 	blk = (char *) XLogRecGetData(record) + record->xl_len;
+
+	memcpy(&bkpb, blk, sizeof(BkpBlock));
+	blk = blk + sizeof(BkpBlock);
+
+	/* Check if blocks in WAL record are compressed */
+	if (bkpb.flag_compress == BKPBLOCKS_COMPRESSED)
+	{
+		/* Checks to see if decompression is successful is made inside the function */
+		pglz_decompress((PGLZ_Header *) blk, uncompressedPages);
+		blk = uncompressedPages;
+	}
 	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
 	{
 		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
 			continue;
 
-		memcpy(&bkpb, blk, sizeof(BkpBlock));
-		blk += sizeof(BkpBlock);
-
 		if (i == block_index)
 		{
 			/* Found it, apply the update */
 			return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
 											  keep_buffer);
 		}
-
 		blk += BLCKSZ - bkpb.hole_length;
+		memcpy(&bkpb, blk, sizeof(BkpBlock));
+		blk += sizeof(BkpBlock);
 	}
 
 	/* Caller specified a bogus block_index */
@@ -6013,6 +6123,7 @@ StartupXLOG(void)
 			  (errmsg("database system was interrupted; last known up at %s",
 					  str_time(ControlFile->time))));
 
+
 	/* This is just to allow attaching to startup process with a debugger */
 #ifdef XLOG_REPLAY_DELAY
 	if (ControlFile->state != DB_SHUTDOWNED)
@@ -6026,6 +6137,9 @@ StartupXLOG(void)
 	 */
 	ValidateXLOGDirectoryStructure();
 
+	/* Allocate memory to store compressed and uncompressed backup blocks */
+	CompressBackupBlocksPagesAlloc();
+
 	/*
 	 * Clear out any old relcache cache files.  This is *necessary* if we do
 	 * any WAL replay, since that would probably result in the cache files
@@ -6382,6 +6496,11 @@ StartupXLOG(void)
 	{
 		int			rmid;
 
+		if (uncompressedPages == NULL)
+		{
+			uncompressedPages = (char *)palloc(XLR_TOTAL_BLCKSZ);
+		}
+
 		/*
 		 * Update pg_control to show that we are recovering and to show the
 		 * selected checkpoint as the place we are starting from. We also mark
@@ -7612,6 +7731,50 @@ InitXLOGAccess(void)
 
 	/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
 	(void) GetRedoRecPtr();
+	/* Allocate memory to store compressed backup blocks */
+	CompressBackupBlocksPagesAlloc();
+}
+
+/*
+ * Allocate pages to store compressed backup blocks once per backend.
+ * Size of pages depend on the compression algorithm used. These pages
+ * persist till the end of the backend process. If memory allocation
+ * fails we disable compression of backup blocks entirely.
+ */
+void
+CompressBackupBlocksPagesAlloc(void)
+{
+	/*
+	 * Freeing the memory used for compression
+	 * if full_page_writes GUC is changed to 'on' at runtime
+	 */
+	if (fullPageWrites == FULL_PAGE_WRITES_ON && compressedPages != NULL)
+	{
+		free(compressedPages);
+		compressedPages = NULL;
+	}
+	if (fullPageWrites == FULL_PAGE_WRITES_ON && uncompressedPages != NULL)
+	{
+		free(uncompressedPages);
+		uncompressedPages = NULL;
+	}
+
+	if (fullPageWrites !=  FULL_PAGE_WRITES_ON &&
+		compressedPages == NULL)
+	{
+		size_t buffer_size = VARHDRSZ;
+		buffer_size += PGLZ_MAX_OUTPUT(XLR_TOTAL_BLCKSZ);
+		compressedPages = (char *) malloc(buffer_size);
+		if (compressedPages == NULL)
+			outOfMem = 1;
+	}
+	if (fullPageWrites != FULL_PAGE_WRITES_ON &&
+		uncompressedPages == NULL)
+	{
+		uncompressedPages = (char *)malloc(XLR_TOTAL_BLCKSZ);
+		if (uncompressedPages == NULL)
+			outOfMem = 1;
+	}
 }
 
 /*
@@ -9088,10 +9251,10 @@ UpdateFullPageWrites(void)
 	 * setting it to false, first write the WAL record and then set the global
 	 * flag.
 	 */
-	if (fullPageWrites)
+	if (fullPageWrites != FULL_PAGE_WRITES_OFF)
 	{
 		WALInsertLockAcquireExclusive();
-		Insert->fullPageWrites = true;
+		Insert->fullPageWrites = fullPageWrites;
 		WALInsertLockRelease();
 	}
 
@@ -9104,17 +9267,17 @@ UpdateFullPageWrites(void)
 		XLogRecData rdata;
 
 		rdata.data = (char *) (&fullPageWrites);
-		rdata.len = sizeof(bool);
+		rdata.len = sizeof(int);
 		rdata.buffer = InvalidBuffer;
 		rdata.next = NULL;
 
 		XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
 	}
 
-	if (!fullPageWrites)
+	if (fullPageWrites == FULL_PAGE_WRITES_OFF)
 	{
 		WALInsertLockAcquireExclusive();
-		Insert->fullPageWrites = false;
+		Insert->fullPageWrites = fullPageWrites;
 		WALInsertLockRelease();
 	}
 	END_CRIT_SECTION();
@@ -9457,16 +9620,16 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 	}
 	else if (info == XLOG_FPW_CHANGE)
 	{
-		bool		fpw;
+		int		fpw;
 
-		memcpy(&fpw, XLogRecGetData(record), sizeof(bool));
+		memcpy(&fpw, XLogRecGetData(record), sizeof(int));
 
 		/*
 		 * Update the LSN of the last replayed XLOG_FPW_CHANGE record so that
 		 * do_pg_start_backup() and do_pg_stop_backup() can check whether
 		 * full_page_writes has been disabled during online backup.
 		 */
-		if (!fpw)
+		if (fpw == FULL_PAGE_WRITES_OFF)
 		{
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->lastFpwDisableRecPtr < ReadRecPtr)
@@ -9808,7 +9971,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 
 		do
 		{
-			bool		checkpointfpw;
+			int		checkpointfpw;
 
 			/*
 			 * Force a CHECKPOINT.  Aside from being necessary to prevent torn
@@ -9857,7 +10020,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 				recptr = XLogCtl->lastFpwDisableRecPtr;
 				SpinLockRelease(&XLogCtl->info_lck);
 
-				if (!checkpointfpw || startpoint <= recptr)
+				if (checkpointfpw == FULL_PAGE_WRITES_OFF || startpoint <= recptr)
 					ereport(ERROR,
 						  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 						   errmsg("WAL generated with full_page_writes=off was replayed "
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f06daa2..2fcb7ea 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -665,6 +665,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
  * record's header, which means in particular that xl_tot_len is at least
  * SizeOfXlogRecord, so it is safe to fetch xl_len.
  */
+
 static bool
 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 {
@@ -674,6 +675,8 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 	BkpBlock	bkpb;
 	char	   *blk;
 	size_t		remaining = record->xl_tot_len;
+	struct varlena *tmp;
+	uint32 b_tot_len;
 
 	/* First the rmgr data */
 	if (remaining < SizeOfXLogRecord + len)
@@ -689,52 +692,82 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
 
 	/* Add in the backup blocks, if any */
 	blk = (char *) XLogRecGetData(record) + len;
-	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+	if (remaining != 0)
 	{
-		uint32		blen;
-
-		if (!(record->xl_info & XLR_BKP_BLOCK(i)))
-			continue;
-
 		if (remaining < sizeof(BkpBlock))
 		{
-			report_invalid_record(state,
-							  "invalid backup block size in record at %X/%X",
-								  (uint32) (recptr >> 32), (uint32) recptr);
+			report_invalid_record(state,"invalid backup block size in record at %X/%X",
+								(uint32) (recptr >> 32), (uint32) recptr);
 			return false;
 		}
 		memcpy(&bkpb, blk, sizeof(BkpBlock));
 
-		if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+		if (bkpb.flag_compress == BKPBLOCKS_UNCOMPRESSED)
 		{
-			report_invalid_record(state,
-								  "incorrect hole size in record at %X/%X",
-								  (uint32) (recptr >> 32), (uint32) recptr);
-			return false;
+			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+			{
+				uint32		blen;
+
+				if (!(record->xl_info & XLR_BKP_BLOCK(i)))
+					continue;
+
+				if (remaining < sizeof(BkpBlock))
+				{
+					report_invalid_record(state,
+									"invalid backup block size in record at %X/%X",
+									(uint32) (recptr >> 32), (uint32) recptr);
+					return false;
+				}
+				memcpy(&bkpb, blk, sizeof(BkpBlock));
+
+				if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
+				{
+					report_invalid_record(state,
+									"incorrect hole size in record at %X/%X",
+									(uint32) (recptr >> 32), (uint32) recptr);
+					return false;
+				}
+
+				blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
+
+				if (remaining < blen)
+				{
+					report_invalid_record(state,
+									"invalid backup block size in record at %X/%X",
+									(uint32) (recptr >> 32), (uint32) recptr);
+					return false;
+				}
+				remaining -= blen;
+				COMP_CRC32(crc, blk, blen);
+				blk += blen;
+			}
+			/* Check that xl_tot_len agrees with our calculation */
+			if (remaining != 0)
+			{
+				report_invalid_record(state,
+								"incorrect total length in record at %X/%X",
+								(uint32) (recptr >> 32), (uint32) recptr);
+				return false;
+			}
 		}
-		blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
-		if (remaining < blen)
+		else
 		{
-			report_invalid_record(state,
-							  "invalid backup block size in record at %X/%X",
-								  (uint32) (recptr >> 32), (uint32) recptr);
-			return false;
-		}
-		remaining -= blen;
-		COMP_CRC32(crc, blk, blen);
-		blk += blen;
-	}
+			tmp = blk + sizeof(BkpBlock);
+			b_tot_len = VARSIZE(tmp);
 
-	/* Check that xl_tot_len agrees with our calculation */
-	if (remaining != 0)
-	{
-		report_invalid_record(state,
-							  "incorrect total length in record at %X/%X",
-							  (uint32) (recptr >> 32), (uint32) recptr);
-		return false;
+			/*
+			 * Check to ensure that the total length of compressed blocks stored as varlena
+			 * agrees with the xl_tot_len stored in XLogRecord
+			 */
+			if ((remaining - sizeof(BkpBlock)) != b_tot_len)
+			{
+				report_invalid_record(state,"invalid backup block size in record at %X/%X",
+									(uint32) (recptr >> 32), (uint32) recptr);
+				return false;
+			}
+			COMP_CRC32(crc, blk, remaining);
+		}
 	}
-
 	/* Finally include the record header */
 	COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
 	FIN_CRC32(crc);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 61f17bf..7b89303 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -199,6 +199,9 @@ static void drop_unnamed_stmt(void);
 static void SigHupHandler(SIGNAL_ARGS);
 static void log_disconnections(int code, Datum arg);
 
+/*-----------------------------------------------------------------
+ */
+extern void CompressBackupBlocksPagesAlloc();
 
 /* ----------------------------------------------------------------
  *		routines to obtain user input
@@ -3983,6 +3986,8 @@ PostgresMain(int argc, char *argv[],
 		{
 			got_SIGHUP = false;
 			ProcessConfigFile(PGC_SIGHUP);
+			/* Allocate memory to store compressed and uncompressed backup blocks */
+			CompressBackupBlocksPagesAlloc();
 		}
 
 		/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index d7142d2..5f8dbee 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -419,6 +419,23 @@ static const struct config_enum_entry row_security_options[] = {
 };
 
 /*
+ * Although only "on", "off", and "compress" are documented, we
+ * accept all the likely variants of "on" and "off".
+ */
+static const struct config_enum_entry full_page_writes_options[] = {
+	{"compress", FULL_PAGE_WRITES_COMPRESS, false},
+	{"on", FULL_PAGE_WRITES_ON, false},
+	{"off", FULL_PAGE_WRITES_OFF, false},
+	{"true", FULL_PAGE_WRITES_ON, true},
+	{"false", FULL_PAGE_WRITES_OFF, true},
+	{"yes", FULL_PAGE_WRITES_ON, true},
+	{"no", FULL_PAGE_WRITES_OFF, true},
+	{"1", FULL_PAGE_WRITES_ON, true},
+	{"0", FULL_PAGE_WRITES_OFF, true},
+	{NULL, 0, false}
+};
+
+/*
  * Options for enum values stored in other modules
  */
 extern const struct config_enum_entry wal_level_options[];
@@ -895,20 +912,6 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 	{
-		{"full_page_writes", PGC_SIGHUP, WAL_SETTINGS,
-			gettext_noop("Writes full pages to WAL when first modified after a checkpoint."),
-			gettext_noop("A page write in process during an operating system crash might be "
-						 "only partially written to disk.  During recovery, the row changes "
-			  "stored in WAL are not enough to recover.  This option writes "
-						 "pages when first modified after a checkpoint to WAL so full recovery "
-						 "is possible.")
-		},
-		&fullPageWrites,
-		true,
-		NULL, NULL, NULL
-	},
-
-	{
 		{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
 			gettext_noop("Writes full pages to WAL when first modified after a checkpoint, even for a non-critical modifications"),
 			NULL
@@ -3436,6 +3439,20 @@ static struct config_enum ConfigureNamesEnum[] =
 	},
 
 	{
+		{"full_page_writes", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Writes full pages to WAL when first modified after a checkpoint."),
+			gettext_noop("A page write in process during an operating system crash might be "
+						 "only partially written to disk.  During recovery, the row changes "
+			  "stored in WAL are not enough to recover.  This option writes "
+						 "pages when first modified after a checkpoint to WAL so full recovery "
+						 "is possible.")
+		},
+		&fullPageWrites,
+		FULL_PAGE_WRITES_ON, full_page_writes_options,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"trace_recovery_messages", PGC_SIGHUP, DEVELOPER_OPTIONS,
 			gettext_noop("Enables logging of recovery-related debugging information."),
 			gettext_noop("Each level includes all the levels that follow it. The later"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index dac6776..9f51e30 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -185,7 +185,8 @@
 					#   fsync
 					#   fsync_writethrough
 					#   open_sync
-#full_page_writes = on			# recover from partial page writes
+#full_page_writes = on			# recover from partial page writes;
+					# off, compress, or on
 #wal_log_hints = off			# also do full page writes of non-critical updates
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
 					# (change requires restart)
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 32cc100..9e732e2 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -250,7 +250,7 @@ main(int argc, char *argv[])
 	printf(_("Latest checkpoint's PrevTimeLineID:   %u\n"),
 		   ControlFile.checkPointCopy.PrevTimeLineID);
 	printf(_("Latest checkpoint's full_page_writes: %s\n"),
-		   ControlFile.checkPointCopy.fullPageWrites ? _("on") : _("off"));
+		   FullPageWritesStr(ControlFile.checkPointCopy.fullPageWrites));
 	printf(_("Latest checkpoint's NextXID:          %u/%u\n"),
 		   ControlFile.checkPointCopy.nextXidEpoch,
 		   ControlFile.checkPointCopy.nextXid);
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index f4c1eaf..00714a6 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -516,7 +516,7 @@ GuessControlValues(void)
 	ControlFile.checkPointCopy.redo = SizeOfXLogLongPHD;
 	ControlFile.checkPointCopy.ThisTimeLineID = 1;
 	ControlFile.checkPointCopy.PrevTimeLineID = 1;
-	ControlFile.checkPointCopy.fullPageWrites = false;
+	ControlFile.checkPointCopy.fullPageWrites = FULL_PAGE_WRITES_OFF;
 	ControlFile.checkPointCopy.nextXidEpoch = 0;
 	ControlFile.checkPointCopy.nextXid = FirstNormalTransactionId;
 	ControlFile.checkPointCopy.nextOid = FirstBootstrapObjectId;
@@ -600,7 +600,7 @@ PrintControlValues(bool guessed)
 	printf(_("Latest checkpoint's TimeLineID:       %u\n"),
 		   ControlFile.checkPointCopy.ThisTimeLineID);
 	printf(_("Latest checkpoint's full_page_writes: %s\n"),
-		   ControlFile.checkPointCopy.fullPageWrites ? _("on") : _("off"));
+		   FullPageWritesStr(ControlFile.checkPointCopy.fullPageWrites));
 	printf(_("Latest checkpoint's NextXID:          %u/%u\n"),
 		   ControlFile.checkPointCopy.nextXidEpoch,
 		   ControlFile.checkPointCopy.nextXid);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0ae110f..90ce4aa 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -47,10 +47,10 @@ typedef struct XLogRecord
 	uint32		xl_len;			/* total len of rmgr data */
 	uint8		xl_info;		/* flag bits, see below */
 	RmgrId		xl_rmid;		/* resource manager for this record */
-	/* 2 bytes of padding here, initialize to zero */
+	/* 1 byte of padding here, initialize to zero */
+    uint8		xl_compress;	/* compression of fpws in record */
 	XLogRecPtr	xl_prev;		/* ptr to previous record in log */
 	pg_crc32	xl_crc;			/* CRC for this record */
-
 	/* If MAXALIGN==8, there are 4 wasted bytes here */
 
 	/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
@@ -191,7 +191,6 @@ extern int	XLogArchiveTimeout;
 extern bool XLogArchiveMode;
 extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
-extern bool fullPageWrites;
 extern bool wal_log_hints;
 extern bool log_checkpoints;
 
@@ -205,6 +204,17 @@ typedef enum WalLevel
 } WalLevel;
 extern int	wal_level;
 
+typedef enum FullPageWritesLevel
+{
+	FULL_PAGE_WRITES_OFF = 0,
+	FULL_PAGE_WRITES_COMPRESS,
+	FULL_PAGE_WRITES_ON
+} FullPageWritesLevel;
+extern int	fullPageWrites;
+#define FullPageWritesStr(fpw)	\
+	(fpw == FULL_PAGE_WRITES_ON ? _("on") :	\
+	 (fpw == FULL_PAGE_WRITES_OFF ? _("off") : _("compress")))
+
 #define XLogArchivingActive()	(XLogArchiveMode && wal_level >= WAL_LEVEL_ARCHIVE)
 #define XLogArchiveCommandSet() (XLogArchiveCommand[0] != '\0')
 
@@ -281,6 +291,9 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/* Total size of maximum number of backup blocks including their headers in an XLOG record */
+#define XLR_TOTAL_BLCKSZ XLR_MAX_BKP_BLOCKS * BLCKSZ + XLR_MAX_BKP_BLOCKS * sizeof(BkpBlock)
+
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
 extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
 extern void XLogFlush(XLogRecPtr RecPtr);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 27b9899..ed2b9a5 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -46,12 +46,15 @@ typedef struct BkpBlock
 	RelFileNode node;			/* relation containing block */
 	ForkNumber	fork;			/* fork within the relation */
 	BlockNumber block;			/* block number */
-	uint16		hole_offset;	/* number of bytes before "hole" */
-	uint16		hole_length;	/* number of bytes in "hole" */
-
+	unsigned	hole_offset:15, /* number of bytes before "hole" */
+				flag_compress:2,/* flag to store compression information */
+				hole_length:15; /* number of bytes in "hole" */
 	/* ACTUAL BLOCK DATA FOLLOWS AT END OF STRUCT */
 } BkpBlock;
 
+#define BKPBLOCKS_UNCOMPRESSED	0	/* uncompressed */
+#define BKPBLOCKS_COMPRESSED	1	/* compressed */
+
 /*
  * Each page of XLOG file has a header like this:
  */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index ba79d25..6a536fc 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -35,7 +35,7 @@ typedef struct CheckPoint
 	TimeLineID	ThisTimeLineID; /* current TLI */
 	TimeLineID	PrevTimeLineID; /* previous TLI, if this record begins a new
 								 * timeline (equals ThisTimeLineID otherwise) */
-	bool		fullPageWrites; /* current full_page_writes */
+	int			fullPageWrites; /* current full_page_writes */
 	uint32		nextXidEpoch;	/* higher-order bits of nextXid */
 	TransactionId nextXid;		/* next free XID */
 	Oid			nextOid;		/* next free OID */
