*** a/src/backend/access/common/heaptuple.c
--- b/src/backend/access/common/heaptuple.c
***************
*** 60,66 ****
--- 60,70 ----
  #include "access/sysattr.h"
  #include "access/tuptoaster.h"
  #include "executor/tuptable.h"
+ #include "utils/datum.h"
+ #include "utils/pg_lzcompress.h"
  
+ /* guc variable for EWT compression ratio*/
+ int			wal_update_compression_ratio = 25;
  
  /* Does att's datatype allow packing into the 1-byte-header varlena format? */
  #define ATT_IS_PACKABLE(att) \
***************
*** 69,74 ****
--- 73,80 ----
  #define VARLENA_ATT_IS_PACKABLE(att) \
  	((att)->attstorage != 'p')
  
+ static void heap_get_attr_offsets (TupleDesc tupleDesc, HeapTuple Tuple,
+   							  	  int32 **offsets, int *noffsets);
  
  /* ----------------------------------------------------------------
   *						misc support routines
***************
*** 617,622 **** heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest)
--- 623,766 ----
  	memcpy((char *) dest->t_data, (char *) src->t_data, src->t_len);
  }
  
+ /* ----------------
+  * heap_get_attr_offsets
+  *
+  *		Given a tuple, extract it's column starting offsets including null
+  *	columns also. For null columns the offset will be same as next attribute
+  *  offset.
+  * ----------------
+  */
+ static void
+ heap_get_attr_offsets (TupleDesc tupleDesc, HeapTuple Tuple,
+   							  	  int32 **offsets, int *noffsets)
+ {
+ 	HeapTupleHeader tup = Tuple->t_data;
+ 	Form_pg_attribute *att = tupleDesc->attrs;
+ 	bool		hasnulls = HeapTupleHasNulls(Tuple);
+ 	bits8	   *bp = Tuple->t_data->t_bits;		/* ptr to null bitmap in tuple */
+ 	bool		slow = false;	/* can we use/set attcacheoff? */
+ 	char	   *tp;				/* ptr to tuple data */
+ 	long		off;			/* offset in tuple data */
+ 	int			natts;
+ 	int			attnum;
+ 
+ 	natts = HeapTupleHeaderGetNatts(Tuple->t_data);
+ 
+ 	*offsets = palloc(natts * sizeof(int32));
+ 
+ 	*noffsets = 0;
+ 
+ 	/* copied from heap_deform_tuple */
+ 	tp = (char *) tup + tup->t_hoff;
+ 	off = 0;
+ 	for (attnum = 0; attnum < natts; attnum++)
+ 	{
+ 		Form_pg_attribute thisatt = att[attnum];
+ 
+ 		if (hasnulls && att_isnull(attnum, bp))
+ 		{
+ 			slow = true;		/* can't use attcacheoff anymore */
+ 			(*offsets)[(*noffsets)++] = off;
+ 			continue;
+ 		}
+ 
+ 		if (!slow && thisatt->attcacheoff >= 0)
+ 			off = thisatt->attcacheoff;
+ 		else if (thisatt->attlen == -1)
+ 		{
+ 			/*
+ 			 * We can only cache the offset for a varlena attribute if the
+ 			 * offset is already suitably aligned, so that there would be no
+ 			 * pad bytes in any case: then the offset will be valid for either
+ 			 * an aligned or unaligned value.
+ 			 */
+ 			if (!slow &&
+ 				off == att_align_nominal(off, thisatt->attalign))
+ 				thisatt->attcacheoff = off;
+ 			else
+ 			{
+ 				off = att_align_pointer(off, thisatt->attalign, -1,
+ 										tp + off);
+ 				slow = true;
+ 			}
+ 		}
+ 		else
+ 		{
+ 			/* not varlena, so safe to use att_align_nominal */
+ 			off = att_align_nominal(off, thisatt->attalign);
+ 
+ 			if (!slow)
+ 				thisatt->attcacheoff = off;
+ 		}
+ 
+ 		(*offsets)[(*noffsets)++] = off;
+ 
+ 		off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+ 
+ 		if (thisatt->attlen <= 0)
+ 			slow = true;		/* can't use attcacheoff anymore */
+ 
+ 	}
+ 
+ }
+ 
+ /* ----------------
+  * heap_delta_encode
+  *
+  *		Calculate the delta between two tuples, using pglz. The result is
+  * stored in *encdata. *encdata must point to a PGLZ_header buffer, with at
+  * least PGLZ_MAX_OUTPUT(newtup->t_len) bytes.
+  * ----------------
+  */
+ bool
+ heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, HeapTuple newtup,
+ 				  char *encdata)
+ {
+ 	int32	   *hoffsets,
+ 			   *newoffsets;
+ 	int			noffsets;
+ 	PGLZ_Strategy strategy;
+ 	int32 		newbitmaplen,
+ 				hbitmpalen;
+ 
+ 	newbitmaplen = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+ 	hbitmpalen = oldtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+ 
+ 	/*
+ 	 * Deform and get the old and new tuple column boundary offsets. Which are
+ 	 * required for calculating delta between old and new tuples.
+ 	 */
+ 	heap_get_attr_offsets(tupleDesc, oldtup, &hoffsets, &noffsets);
+ 	heap_get_attr_offsets(tupleDesc, newtup, &newoffsets, &noffsets);
+ 
+ 	strategy = *PGLZ_strategy_always;
+ 	strategy.min_comp_rate = wal_update_compression_ratio;
+ 
+ 	return pglz_compress_with_history((char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ 									  newtup->t_len - offsetof(HeapTupleHeaderData, t_bits),
+ 									  (char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ 									  oldtup->t_len - offsetof(HeapTupleHeaderData, t_bits),
+ 									  newoffsets, hoffsets, noffsets,
+ 									  newbitmaplen, hbitmpalen,
+ 									  (PGLZ_Header *) encdata, &strategy);
+ }
+ 
+ /* ----------------
+  * heap_delta_decode
+  *
+  *		Decode a tuple using delta-encoded WAL tuple and old tuple version.
+  * ----------------
+  */
+ void
+ heap_delta_decode(char *encdata, HeapTuple oldtup, HeapTuple newtup)
+ {
+ 	return pglz_decompress_with_history((char *) encdata,
+ 										(char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+ 										&newtup->t_len,
+ 										(char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits));
+ }
+ 
  /*
   * heap_form_tuple
   *		construct a tuple from the given values[] and isnull[] arrays,
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 70,75 ****
--- 70,76 ----
  #include "utils/snapmgr.h"
  #include "utils/syscache.h"
  #include "utils/tqual.h"
+ #include "utils/pg_lzcompress.h"
  
  
  /* GUC variable */
***************
*** 5765,5770 **** log_heap_update(Relation reln, Buffer oldbuf,
--- 5766,5781 ----
  	XLogRecPtr	recptr;
  	XLogRecData rdata[4];
  	Page		page = BufferGetPage(newbuf);
+ 	char	   *newtupdata;
+ 	int			newtuplen;
+ 	bool		compressed = false;
+ 
+ 	/* Structure which holds EWT */
+ 	struct
+ 	{
+ 		PGLZ_Header pglzheader;
+ 		char		buf[MaxHeapTupleSize];
+ 	}			buf;
  
  	/* Caller should not call me on a non-WAL-logged relation */
  	Assert(RelationNeedsWAL(reln));
***************
*** 5774,5788 **** log_heap_update(Relation reln, Buffer oldbuf,
  	else
  		info = XLOG_HEAP_UPDATE;
  
  	xlrec.target.node = reln->rd_node;
  	xlrec.target.tid = oldtup->t_self;
  	xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
  	xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
  											  oldtup->t_data->t_infomask2);
  	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
! 	xlrec.all_visible_cleared = all_visible_cleared;
  	xlrec.newtid = newtup->t_self;
! 	xlrec.new_all_visible_cleared = new_all_visible_cleared;
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapUpdate;
--- 5785,5830 ----
  	else
  		info = XLOG_HEAP_UPDATE;
  
+  	newtupdata = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+ 	newtuplen = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ 
+ 	/*
+ 	 * EWT can be generated for all new tuple versions created by Update
+ 	 * operation. Currently we do it when both the old and new tuple versions
+ 	 * are on same page, because during recovery if the page containing old
+ 	 * tuple is corrupt, it should not cascade that corruption to other pages.
+ 	 * Under the general assumption that for long runs most updates tend to
+ 	 * create new tuple version on same page, there should not be significant
+ 	 * impact on WAL reduction or performance.
+ 	 *
+ 	 * We should not generate EWT when we need to backup the whole bolck in
+ 	 * WAL as in that case there is no saving by reduced WAL size.
+ 	 */
+ 	if ((oldbuf == newbuf) && !XLogCheckBufferNeedsBackup(newbuf))
+ 	{
+ 		/* Delta-encode the new tuple using the old tuple */
+ 		if (heap_delta_encode(reln->rd_att, oldtup, newtup, (char *) &buf.pglzheader))
+ 		{
+ 			compressed = true;
+ 			newtupdata = (char *) &buf.pglzheader;
+ 			newtuplen = VARSIZE(&buf.pglzheader);
+ 		}
+ 	}
+ 
+ 	xlrec.flags = 0;
  	xlrec.target.node = reln->rd_node;
  	xlrec.target.tid = oldtup->t_self;
  	xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
  	xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
  											  oldtup->t_data->t_infomask2);
  	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
! 	if (all_visible_cleared)
! 		xlrec.flags |= XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED;
  	xlrec.newtid = newtup->t_self;
! 	if (new_all_visible_cleared)
! 		xlrec.flags |= XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED;
! 	if (compressed)
! 		xlrec.flags |= XL_HEAP_UPDATE_DELTA_ENCODED;
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapUpdate;
***************
*** 5809,5817 **** log_heap_update(Relation reln, Buffer oldbuf,
  	rdata[2].buffer_std = true;
  	rdata[2].next = &(rdata[3]);
  
! 	/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
! 	rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
! 	rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
  	rdata[3].buffer = newbuf;
  	rdata[3].buffer_std = true;
  	rdata[3].next = NULL;
--- 5851,5862 ----
  	rdata[2].buffer_std = true;
  	rdata[2].next = &(rdata[3]);
  
! 	/*
! 	 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data follows .........
! 	 * OR PG93FORMAT [If encoded]: LZ header + Encoded data follows
! 	 */
! 	rdata[3].data = newtupdata;
! 	rdata[3].len = newtuplen;
  	rdata[3].buffer = newbuf;
  	rdata[3].buffer_std = true;
  	rdata[3].next = NULL;
***************
*** 6614,6620 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
--- 6659,6668 ----
  	Page		page;
  	OffsetNumber offnum;
  	ItemId		lp = NULL;
+ 	HeapTupleData newtup;
+ 	HeapTupleData oldtup;
  	HeapTupleHeader htup;
+ 	HeapTupleHeader oldtupdata = NULL;
  	struct
  	{
  		HeapTupleHeaderData hdr;
***************
*** 6629,6635 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  	 * The visibility map may need to be fixed even if the heap page is
  	 * already up-to-date.
  	 */
! 	if (xlrec->all_visible_cleared)
  	{
  		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
  		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
--- 6677,6683 ----
  	 * The visibility map may need to be fixed even if the heap page is
  	 * already up-to-date.
  	 */
! 	if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)
  	{
  		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
  		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
***************
*** 6689,6695 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
  		elog(PANIC, "heap_update_redo: invalid lp");
  
! 	htup = (HeapTupleHeader) PageGetItem(page, lp);
  
  	htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
  	htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
--- 6737,6743 ----
  	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
  		elog(PANIC, "heap_update_redo: invalid lp");
  
! 	oldtupdata = htup = (HeapTupleHeader) PageGetItem(page, lp);
  
  	htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
  	htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
***************
*** 6707,6713 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
! 	if (xlrec->all_visible_cleared)
  		PageClearAllVisible(page);
  
  	/*
--- 6755,6761 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
! 	if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)
  		PageClearAllVisible(page);
  
  	/*
***************
*** 6732,6738 **** newt:;
  	 * The visibility map may need to be fixed even if the heap page is
  	 * already up-to-date.
  	 */
! 	if (xlrec->new_all_visible_cleared)
  	{
  		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
  		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
--- 6780,6786 ----
  	 * The visibility map may need to be fixed even if the heap page is
  	 * already up-to-date.
  	 */
! 	if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)
  	{
  		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
  		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
***************
*** 6795,6804 **** newsame:;
  		   SizeOfHeapHeader);
  	htup = &tbuf.hdr;
  	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
! 	/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
! 	memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
! 		   (char *) xlrec + hsize,
! 		   newlen);
  	newlen += offsetof(HeapTupleHeaderData, t_bits);
  	htup->t_infomask2 = xlhdr.t_infomask2;
  	htup->t_infomask = xlhdr.t_infomask;
--- 6843,6874 ----
  		   SizeOfHeapHeader);
  	htup = &tbuf.hdr;
  	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
! 
! 	/*
! 	 * If the record is EWT then decode it.
! 	 */
! 	if (xlrec->flags & XL_HEAP_UPDATE_DELTA_ENCODED)
! 	{
! 		/*
! 		 * PG93FORMAT: Header + Control byte + history reference (2 - 3)bytes
! 		 * + New data (1 byte length + variable data)+ ...
! 		 */
! 		PGLZ_Header *encoded_data = (PGLZ_Header *) (((char *) xlrec) + hsize);
! 
! 		oldtup.t_data = oldtupdata;
! 		newtup.t_data = htup;
! 
! 		heap_delta_decode((char *) encoded_data, &oldtup, &newtup);
! 		newlen = newtup.t_len;
! 	}
! 	else
! 	{
! 		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
! 		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
! 			   (char *) xlrec + hsize,
! 			   newlen);
! 	}
! 
  	newlen += offsetof(HeapTupleHeaderData, t_bits);
  	htup->t_infomask2 = xlhdr.t_infomask2;
  	htup->t_infomask = xlhdr.t_infomask;
***************
*** 6814,6820 **** newsame:;
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
  
! 	if (xlrec->new_all_visible_cleared)
  		PageClearAllVisible(page);
  
  	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
--- 6884,6890 ----
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
  
! 	if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)
  		PageClearAllVisible(page);
  
  	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 1209,1214 **** begin:;
--- 1209,1236 ----
  }
  
  /*
+  * Determine whether the buffer referenced has to be backed up. Since we don't
+  * yet have the insert lock, fullPageWrites and forcePageWrites could change
+  * later, but will not cause any problem because this function is used only to
+  * identify whether EWT is required for WAL update.
+  */
+ bool
+ XLogCheckBufferNeedsBackup(Buffer buffer)
+ {
+ 	bool		doPageWrites;
+ 	Page		page;
+ 
+ 	page = BufferGetPage(buffer);
+ 
+ 	doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
+ 
+ 	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+ 		return true;			/* buffer requires backup */
+ 
+ 	return false;				/* buffer does not need to be backed up */
+ }
+ 
+ /*
   * Determine whether the buffer referenced by an XLogRecData item has to
   * be backed up, and if so fill a BkpBlock struct for it.  In any case
   * save the buffer's LSN at *lsn.
*** a/src/backend/utils/adt/pg_lzcompress.c
--- b/src/backend/utils/adt/pg_lzcompress.c
***************
*** 471,476 **** pglz_find_match(PGLZ_HistEntry **hstart, const char *input, const char *end,
--- 471,516 ----
  	return 0;
  }
  
+ /* ----------
+  * pglz_find_match -
+  *
+  *		Lookup the history table if the actual input stream matches
+  *		another sequence of characters, starting somewhere earlier
+  *		in the input buffer.
+  * ----------
+  */
+ static inline int
+ pglz_find_match_with_history(const char *input, const char *end,
+ 				const char *history, const char *hend, int *lenp)
+ {
+ 	const char *ip = input;
+ 	const char *hp = history;
+ 
+ 	/*
+ 	 * Determine length of match. A better match must be larger than the
+ 	 * best so far. And if we already have a match of 16 or more bytes,
+ 	 * it's worth the call overhead to use memcmp() to check if this match
+ 	 * is equal for the same size. After that we must fallback to
+ 	 * character by character comparison to know the exact position where
+ 	 * the diff occurred.
+ 	 */
+ 	while (ip < end && hp < hend && *ip == *hp && *lenp < PGLZ_MAX_MATCH)
+ 	{
+ 		(*lenp)++;
+ 		ip++;
+ 		hp++;
+ 	}
+ 
+ 	/*
+ 	 * Return match information only if it results at least in one byte
+ 	 * reduction.
+ 	 */
+ 	if (*lenp > 2)
+ 		return 1;
+ 
+ 	return 0;
+ }
+ 
  
  /* ----------
   * pglz_compress -
***************
*** 637,642 **** pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
--- 677,879 ----
  	return true;
  }
  
+ /* ----------
+  * pglz_compress_with_history
+  *
+  * Like pglz_compress, but performs delta encoding rather than compression.
+  * The references are offsets from the start of history data, rather
+  * than current output position. 'hoffsets' and 'newoffsets' are array of
+  * offsets in the history and source to consider. We could scan the history
+  * string for possible matches on which offsets are likely to be interesting
+  * (attribute boundaries, when encoding tuples, for example), this is a lot
+  * faster.
+  * For attributes having NULL value, the offset will be same as next attribute
+  * offset. When old tuple contains NULL and new tuple has non-NULL value,
+  * it will copy it as New Data in Encoded WAL Tuple. When new tuple has NULL
+  * value and old tuple has non-NULL value, the old tuple value will be ignored.
+  * ----------
+  */
+ bool
+ pglz_compress_with_history(const char *source, int32 slen,
+ 								   const char *history, int32 hlen,
+ 								   int32 *newoffsets, int32 *hoffsets, int32 noffsets,
+ 								   int32 newbitmaplen, int32 hbitmaplen,
+ 								   PGLZ_Header *dest, const PGLZ_Strategy *strategy)
+ {
+ 	unsigned char *bp = ((unsigned char *) dest) + sizeof(PGLZ_Header);
+ 	unsigned char *bstart = bp;
+ 	const char *dp = source;
+ 	const char *dend = source + slen;
+ 	unsigned char ctrl_dummy = 0;
+ 	unsigned char *ctrlp = &ctrl_dummy;
+ 	unsigned char ctrlb = 0;
+ 	unsigned char ctrl = 0;
+ 	bool		found_match = false;
+ 	int32		match_len = 0;
+ 	int32		match_off;
+ 	int32		result_size;
+ 	int32		result_max;
+ 	int			i;
+ 	int32		need_rate;
+ 	const char *hp = history;
+ 	const char *hend = history + hlen;
+ 
+ 	/*
+ 	 * Tuples of length greater than PGLZ_HISTORY_SIZE are not allowed for
+ 	 * delta encode as this is the maximum size of history offset.
+ 	 */
+ 	if (hlen >= PGLZ_HISTORY_SIZE)
+ 		return false;
+ 
+ 	/*
+ 	 * Our fallback strategy is the default.
+ 	 */
+ 	if (strategy == NULL)
+ 		strategy = PGLZ_strategy_default;
+ 
+ 	/*
+ 	 * If the strategy forbids compression (at all or if source chunk size out
+ 	 * of range), fail.
+ 	 */
+ 	if (strategy->match_size_good <= 0 ||
+ 		slen < strategy->min_input_size ||
+ 		slen > strategy->max_input_size)
+ 		return false;
+ 
+ 	/*
+ 	 * Save the original source size in the header.
+ 	 */
+ 	dest->rawsize = slen;
+ 
+ 	need_rate = strategy->min_comp_rate;
+ 	if (need_rate < 0)
+ 		need_rate = 0;
+ 	else if (need_rate > 99)
+ 		need_rate = 99;
+ 
+ 	/*
+ 	 * Compute the maximum result size allowed by the strategy, namely the
+ 	 * input size minus the minimum wanted compression rate.  This had better
+ 	 * be <= slen, else we might overrun the provided output buffer.
+ 	 */
+ 	if (slen > (INT_MAX / 100))
+ 	{
+ 		/* Approximate to avoid overflow */
+ 		result_max = (slen / 100) * (100 - need_rate);
+ 	}
+ 	else
+ 		result_max = (slen * (100 - need_rate)) / 100;
+ 
+ 	/*
+ 	 * Compress the source directly into the output buffer until bitmaplen.
+ 	 */
+ 	dend = source + newbitmaplen;
+ 	while (dp < dend)
+ 	{
+ 		if (bp - bstart >= result_max)
+ 			return false;
+ 
+ 		pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp);
+ 		dp++;				/* Do not do this ++ in the line above! */
+ 	}
+ 
+ 	/*
+ 	 * Loop through all attributes offsets, if the attribute data differs with
+ 	 * history refering offsets, store the [Offset,Length] reffering history
+ 	 * version till the match and store the changed data as New data.
+ 	 */
+ 	match_off = hbitmaplen;
+ 	hp = history + hbitmaplen;
+ 	for (i = 0; i < noffsets; i++)
+ 	{
+ 		dend = source + ((i + 1 == noffsets) ? slen : newoffsets[i + 1] + newbitmaplen);
+ 		hend = history + ((i + 1 == noffsets) ? hlen : hoffsets[i + 1] + hbitmaplen);
+ 
+ 		MATCH_AGAIN:
+ 		/*
+ 		 * If we already exceeded the maximum result size, fail.
+ 		 *
+ 		 * We check once per loop; since the loop body could emit as many as 4
+ 		 * bytes (a control byte and 3-byte tag), PGLZ_MAX_OUTPUT() had better
+ 		 * allow 4 slop bytes.
+ 		 */
+ 		if (bp - bstart >= result_max)
+ 			return false;
+ 
+ 		/*
+ 		 * Try to find a match in the history
+ 		 */
+ 		if (pglz_find_match_with_history(dp + match_len, dend, hp + match_len,
+ 										 hend, &match_len))
+ 		{
+ 			found_match = true;
+ 
+ 			/* Finding the maximum match across the offsets */
+ 			if ((i + 1 == noffsets)
+ 				|| ((dp + match_len) < dend)
+ 				|| ((hp + match_len < hend)))
+ 			{
+ 				/*
+ 				 * Create the tag and add history entries for all matched
+ 				 * characters.
+ 				 */
+ 				pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off);
+ 				match_off += match_len;
+ 				dp += match_len;
+ 				hp += match_len;
+ 
+ 				if (match_len == PGLZ_MAX_MATCH)
+ 				{
+ 					match_len = 0;
+ 					goto MATCH_AGAIN;
+ 				}
+ 				else
+ 				{
+ 					hp = hend;
+ 					match_off = hend - history;
+ 					match_len = 0;
+ 				}
+ 			}
+ 		}
+ 		else
+ 		{
+ 			hp = hend;
+ 			match_off = hend - history;
+ 			match_len = 0;
+ 		}
+ 
+ 		/* copy the unmatched data to output buffer directly from source */
+ 		while ((dp + match_len) < dend)
+ 		{
+ 			if (bp - bstart >= result_max)
+ 				return false;
+ 
+ 			pglz_out_literal(ctrlp, ctrlb, ctrl, bp, *dp);
+ 			dp++;				/* Do not do this ++ in the line above! */
+ 		}
+ 	}
+ 
+ 	if (!found_match)
+ 		return false;
+ 
+ 	/*
+ 	 * Write out the last control byte and check that we haven't overrun the
+ 	 * output size allowed by the strategy.
+ 	 */
+ 	*ctrlp = ctrlb;
+ 	result_size = bp - bstart;
+ 
+ #ifdef DELTA_DEBUG
+ 	elog(LOG, "old %d new %d compressed %d", hlen, slen, result_size);
+ #endif
+ 
+ 	/*
+ 	 * Success - need only fill in the actual length of the compressed datum.
+ 	 */
+ 	SET_VARSIZE_COMPRESSED(dest, result_size + sizeof(PGLZ_Header));
+ 
+ 	return true;
+ }
  
  /* ----------
   * pglz_decompress -
***************
*** 647,661 **** pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
  void
  pglz_decompress(const PGLZ_Header *source, char *dest)
  {
  	const unsigned char *sp;
  	const unsigned char *srcend;
  	unsigned char *dp;
  	unsigned char *destend;
  
  	sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
! 	srcend = ((const unsigned char *) source) + VARSIZE(source);
  	dp = (unsigned char *) dest;
! 	destend = dp + source->rawsize;
  
  	while (sp < srcend && dp < destend)
  	{
--- 884,921 ----
  void
  pglz_decompress(const PGLZ_Header *source, char *dest)
  {
+ 	pglz_decompress_with_history((char *) source, dest, NULL, NULL);
+ }
+ 
+ /* ----------
+  * pglz_decompress_with_history -
+  *
+  *		Decompresses source into dest.
+  *		To decompress, it uses history if provided.
+  * ----------
+  */
+ void
+ pglz_decompress_with_history(const char *source, char *dest, uint32 *destlen,
+ 							 const char *history)
+ {
+ 	PGLZ_Header src;
  	const unsigned char *sp;
  	const unsigned char *srcend;
  	unsigned char *dp;
  	unsigned char *destend;
  
+ 	/* To avoid the unaligned access of PGLZ_Header */
+ 	memcpy((char *) &src, source, sizeof(PGLZ_Header));
+ 
  	sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
! 	srcend = ((const unsigned char *) source) + VARSIZE(&src);
  	dp = (unsigned char *) dest;
! 	destend = dp + src.rawsize;
! 
! 	if (destlen)
! 	{
! 		*destlen = src.rawsize;
! 	}
  
  	while (sp < srcend && dp < destend)
  	{
***************
*** 699,724 **** pglz_decompress(const PGLZ_Header *source, char *dest)
  					break;
  				}
  
! 				/*
! 				 * Now we copy the bytes specified by the tag from OUTPUT to
! 				 * OUTPUT. It is dangerous and platform dependent to use
! 				 * memcpy() here, because the copied areas could overlap
! 				 * extremely!
! 				 */
! 				while (len--)
  				{
! 					*dp = dp[-off];
! 					dp++;
  				}
  			}
  			else
  			{
  				/*
! 				 * An unset control bit means LITERAL BYTE. So we just copy
! 				 * one from INPUT to OUTPUT.
  				 */
! 				if (dp >= destend)		/* check for buffer overrun */
! 					break;		/* do not clobber memory */
  
  				*dp++ = *sp++;
  			}
--- 959,996 ----
  					break;
  				}
  
! 				if (history)
! 				{
! 					/*
! 					 * Now we copy the bytes specified by the tag from history
! 					 * to OUTPUT.
! 					 */
! 					memcpy(dp, history + off, len);
! 					dp += len;
! 				}
! 				else
  				{
! 					/*
! 					 * Now we copy the bytes specified by the tag from OUTPUT
! 					 * to OUTPUT. It is dangerous and platform dependent to
! 					 * use memcpy() here, because the copied areas could
! 					 * overlap extremely!
! 					 */
! 					while (len--)
! 					{
! 						*dp = dp[-off];
! 						dp++;
! 					}
  				}
  			}
  			else
  			{
  				/*
! 				 * An unset control bit means LITERAL BYTE. So we just
! 				 * copy one from INPUT to OUTPUT.
  				 */
! 				if (dp >= destend)	/* check for buffer overrun */
! 					break;	/* do not clobber memory */
  
  				*dp++ = *sp++;
  			}
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 123,128 **** extern int	CommitSiblings;
--- 123,129 ----
  extern char *default_tablespace;
  extern char *temp_tablespaces;
  extern bool synchronize_seqscans;
+ extern int	wal_update_compression_ratio;
  extern int	ssl_renegotiation_limit;
  extern char *SSLCipherSuites;
  
***************
*** 2382,2387 **** static struct config_int ConfigureNamesInt[] =
--- 2383,2399 ----
  		NULL, NULL, NULL
  	},
  
+ 	{
+ 		/* Not for general use */
+ 		{"wal_update_compression_ratio", PGC_USERSET, DEVELOPER_OPTIONS,
+ 			gettext_noop("Sets the compression ratio of delta record for wal update"),
+ 			NULL,
+ 		},
+ 		&wal_update_compression_ratio,
+ 		25, 1, 99,
+ 		NULL, NULL, NULL
+ 	},
+ 
  	/* End-of-list marker */
  	{
  		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
*** a/src/include/access/heapam_xlog.h
--- b/src/include/access/heapam_xlog.h
***************
*** 147,159 **** typedef struct xl_heap_update
  	TransactionId old_xmax;		/* xmax of the old tuple */
  	TransactionId new_xmax;		/* xmax of the new tuple */
  	ItemPointerData newtid;		/* new inserted tuple id */
! 	uint8		old_infobits_set;	/* infomask bits to set on old tuple */
! 	bool		all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
! 	bool		new_all_visible_cleared;		/* same for the page of newtid */
  	/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
  
! #define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
  
  /*
   * This is what we need to know about vacuum page cleanup/redirect
--- 147,168 ----
  	TransactionId old_xmax;		/* xmax of the old tuple */
  	TransactionId new_xmax;		/* xmax of the new tuple */
  	ItemPointerData newtid;		/* new inserted tuple id */
! 	uint8		old_infobits_set;		/* infomask bits to set on old tuple */
! 	int			flags;			/* flag bits, see below */
  	/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
  
! #define XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED		0x01	/* Indicates as old
! 														 * page's all visible
! 														 * bit is cleared */
! #define XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED	0x02	/* Indicates as new
! 														 * page's all visible
! 														 * bit is cleared */
! #define XL_HEAP_UPDATE_DELTA_ENCODED			0x04	/* Indicates as the
! 														 * update operation is
! 														 * delta encoded */
! 
! #define SizeOfHeapUpdate	(offsetof(xl_heap_update, flags) + sizeof(int))
  
  /*
   * This is what we need to know about vacuum page cleanup/redirect
*** a/src/include/access/htup_details.h
--- b/src/include/access/htup_details.h
***************
*** 687,692 **** extern HeapTuple heap_modify_tuple(HeapTuple tuple,
--- 687,697 ----
  extern void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
  				  Datum *values, bool *isnull);
  
+ extern bool heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup,
+ 				HeapTuple newtup, char *encdata);
+ extern void heap_delta_decode (char *encdata, HeapTuple oldtup,
+ 				HeapTuple newtup);
+ 
  /* these three are deprecated versions of the three above: */
  extern HeapTuple heap_formtuple(TupleDesc tupleDescriptor,
  			   Datum *values, char *nulls);
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 261,266 **** typedef struct CheckpointStatsData
--- 261,267 ----
  extern CheckpointStatsData CheckpointStats;
  
  extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
+ extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
  extern void XLogFlush(XLogRecPtr RecPtr);
  extern bool XLogBackgroundFlush(void);
  extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
*** a/src/include/utils/pg_lzcompress.h
--- b/src/include/utils/pg_lzcompress.h
***************
*** 107,112 **** extern const PGLZ_Strategy *const PGLZ_strategy_always;
--- 107,119 ----
   */
  extern bool pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,
  			  const PGLZ_Strategy *strategy);
+ extern bool pglz_compress_with_history(const char *source, int32 slen,
+ 									const char *history, int32 hlen,
+ 									int32 *newoffsets, int32 *hoffsets, int32 noffsets,
+ 									int32 newbitmaplen, int32 hbitmaplen,
+ 									PGLZ_Header *dest, const PGLZ_Strategy *strategy);
  extern void pglz_decompress(const PGLZ_Header *source, char *dest);
+ extern void pglz_decompress_with_history(const char *source, char *dest,
+ 								uint32 *destlen, const char *history);
  
  #endif   /* _PG_LZCOMPRESS_H_ */
*** a/src/test/regress/expected/update.out
--- b/src/test/regress/expected/update.out
***************
*** 97,99 **** SELECT a, b, char_length(c) FROM update_test;
--- 97,169 ----
  (2 rows)
  
  DROP TABLE update_test;
+ --
+ -- Test to update continuos and non continuos columns
+ --
+ DROP TABLE IF EXISTS update_test;
+ NOTICE:  table "update_test" does not exist, skipping
+ CREATE TABLE update_test (
+ 		bser bigserial,
+ 		bln boolean,
+ 		ename VARCHAR(25),
+ 		perf_f float(8),
+ 		grade CHAR,
+ 		dept CHAR(5) NOT NULL,
+ 		dob DATE,
+ 		idnum INT,
+ 		addr VARCHAR(30) NOT NULL,
+ 		destn CHAR(6),
+ 		Gend CHAR,
+ 		samba BIGINT,
+ 		hgt float,
+ 		ctime TIME
+ );
+ INSERT INTO update_test VALUES (
+ 		nextval('update_test_bser_seq'::regclass),
+ 		TRUE,
+ 		'Test',
+ 		7.169,
+ 		'B',
+ 		'CSD',
+ 		'2000-01-01',
+ 		520,
+ 		'road2,
+ 		streeeeet2,
+ 		city2',
+ 		'dcy2',
+ 		'M',
+ 		12000,
+ 		50.4,
+ 		'00:00:00.0'
+ );
+ SELECT * from update_test;
+  bser | bln | ename | perf_f | grade | dept  |    dob     | idnum |            addr             | destn  | gend | samba | hgt  |  ctime   
+ ------+-----+-------+--------+-------+-------+------------+-------+-----------------------------+--------+------+-------+------+----------
+     1 | t   | Test  |  7.169 | B     | CSD   | 01-01-2000 |   520 | road2,                     +| dcy2   | M    | 12000 | 50.4 | 00:00:00
+       |     |       |        |       |       |            |       |                 streeeeet2,+|        |      |       |      | 
+       |     |       |        |       |       |            |       |                 city2       |        |      |       |      | 
+ (1 row)
+ 
+ -- update first column
+ UPDATE update_test SET bser = bser - 1 + 1;
+ -- update middle column
+ UPDATE update_test SET perf_f = 8.9;
+ -- update last column
+ UPDATE update_test SET ctime = '00:00:00.1';
+ -- update 3 continuos columns
+ UPDATE update_test SET destn = 'dcy2', samba = 0 WHERE Gend = 'M' and dept = 'CSD';
+ -- update two non continuos columns
+ UPDATE update_test SET destn = 'moved', samba = 0;
+ UPDATE update_test SET bln = FALSE, hgt = 10.1;
+ -- update causing some column alignment difference
+ UPDATE update_test SET ename = 'Tes';
+ UPDATE update_test SET dept = 'Test';
+ SELECT * from update_test;
+  bser | bln | ename | perf_f | grade | dept  |    dob     | idnum |            addr             | destn  | gend | samba | hgt  |   ctime    
+ ------+-----+-------+--------+-------+-------+------------+-------+-----------------------------+--------+------+-------+------+------------
+     1 | f   | Tes   |    8.9 | B     | Test  | 01-01-2000 |   520 | road2,                     +| moved  | M    |     0 | 10.1 | 00:00:00.1
+       |     |       |        |       |       |            |       |                 streeeeet2,+|        |      |       |      | 
+       |     |       |        |       |       |            |       |                 city2       |        |      |       |      | 
+ (1 row)
+ 
+ DROP TABLE update_test;
*** a/src/test/regress/sql/update.sql
--- b/src/test/regress/sql/update.sql
***************
*** 59,61 **** UPDATE update_test SET c = repeat('x', 10000) WHERE c = 'car';
--- 59,128 ----
  SELECT a, b, char_length(c) FROM update_test;
  
  DROP TABLE update_test;
+ 
+ 
+ --
+ -- Test to update continuos and non continuos columns
+ --
+ 
+ DROP TABLE IF EXISTS update_test;
+ CREATE TABLE update_test (
+ 		bser bigserial,
+ 		bln boolean,
+ 		ename VARCHAR(25),
+ 		perf_f float(8),
+ 		grade CHAR,
+ 		dept CHAR(5) NOT NULL,
+ 		dob DATE,
+ 		idnum INT,
+ 		addr VARCHAR(30) NOT NULL,
+ 		destn CHAR(6),
+ 		Gend CHAR,
+ 		samba BIGINT,
+ 		hgt float,
+ 		ctime TIME
+ );
+ 
+ INSERT INTO update_test VALUES (
+ 		nextval('update_test_bser_seq'::regclass),
+ 		TRUE,
+ 		'Test',
+ 		7.169,
+ 		'B',
+ 		'CSD',
+ 		'2000-01-01',
+ 		520,
+ 		'road2,
+ 		streeeeet2,
+ 		city2',
+ 		'dcy2',
+ 		'M',
+ 		12000,
+ 		50.4,
+ 		'00:00:00.0'
+ );
+ 
+ SELECT * from update_test;
+ 
+ -- update first column
+ UPDATE update_test SET bser = bser - 1 + 1;
+ 
+ -- update middle column
+ UPDATE update_test SET perf_f = 8.9;
+ 
+ -- update last column
+ UPDATE update_test SET ctime = '00:00:00.1';
+ 
+ -- update 3 continuos columns
+ UPDATE update_test SET destn = 'dcy2', samba = 0 WHERE Gend = 'M' and dept = 'CSD';
+ 
+ -- update two non continuos columns
+ UPDATE update_test SET destn = 'moved', samba = 0;
+ UPDATE update_test SET bln = FALSE, hgt = 10.1;
+ 
+ -- update causing some column alignment difference
+ UPDATE update_test SET ename = 'Tes';
+ UPDATE update_test SET dept = 'Test';
+ 
+ SELECT * from update_test;
+ DROP TABLE update_test;
