Please, find the new version of the patch attached. Now it has WAL functionality.

Detailed description of the feature you can find in README draft https://goo.gl/50O8Q0

This patch is pretty complicated, so I ask everyone, who interested in this feature,
to help with reviewing and testing it. I will be grateful for any feedback.
But please, don't complain about code style, it is still work in progress.

Next things I'm going to do:
1. More debugging and testing. I'm going to attach in next message couple of sql scripts for testing.
2. Fix NULLs processing
3. Add a flag into pg_index, that allows to enable/disable compression for each particular index. 4. Recheck locking considerations. I tried to write code as less invasive as possible, but we need to make sure that algorithm is still correct.
5. Change BTMaxItemSize
6. Bring back microvacuum functionality.

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e3c55eb..72acc0f 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -24,6 +24,8 @@
 #include "storage/predicate.h"
 #include "utils/tqual.h"
 
+#include "catalog/catalog.h"
+#include "utils/datum.h"
 
 typedef struct
 {
@@ -82,6 +84,7 @@ static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
 			 OffsetNumber itup_off);
 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
 			int keysz, ScanKey scankey);
+
 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
 
 
@@ -113,6 +116,11 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
+	Page 		page;
+	TupleDesc	itupdesc;
+	int			nipd;
+	IndexTuple 	olditup;
+	Size 		sizetoadd;
 
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
@@ -190,6 +198,7 @@ top:
 
 	if (checkUnique != UNIQUE_CHECK_EXISTING)
 	{
+		bool updposting = false;
 		/*
 		 * The only conflict predicate locking cares about for indexes is when
 		 * an index tuple insert conflicts with an existing lock.  Since the
@@ -201,7 +210,42 @@ top:
 		/* do the insertion */
 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
 						  stack, heapRel);
-		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+
+		/*
+		 * Decide, whether we can apply compression
+		 */
+		page = BufferGetPage(buf);
+
+		if(!IsSystemRelation(rel)
+			&& !rel->rd_index->indisunique
+			&& offset != InvalidOffsetNumber
+			&& offset <= PageGetMaxOffsetNumber(page))
+		{
+			itupdesc = RelationGetDescr(rel);
+			sizetoadd = sizeof(ItemPointerData);
+			olditup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+
+			if(_bt_isbinaryequal(itupdesc, olditup,
+									rel->rd_index->indnatts, itup))
+			{
+				if (!BtreeTupleIsPosting(olditup))
+				{
+					nipd = 1;
+					sizetoadd = sizetoadd*2;
+				}
+				else
+					nipd = BtreeGetNPosting(olditup);
+
+				if ((IndexTupleSize(olditup) + sizetoadd) <= BTMaxItemSize(page)
+					&& PageGetFreeSpace(page) > sizetoadd)
+					updposting = true;
+			}
+		}
+
+		if (updposting)
+			_bt_pgupdtup(rel, buf, offset, itup, olditup, nipd);
+		else
+			_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
 	}
 	else
 	{
@@ -1042,6 +1086,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
 		itemid = PageGetItemId(origpage, P_HIKEY);
 		itemsz = ItemIdGetLength(itemid);
 		item = (IndexTuple) PageGetItem(origpage, itemid);
+
 		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
 						false, false) == InvalidOffsetNumber)
 		{
@@ -1072,13 +1117,39 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
 		itemsz = ItemIdGetLength(itemid);
 		item = (IndexTuple) PageGetItem(origpage, itemid);
 	}
-	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+
+	if (BtreeTupleIsPosting(item))
+	{
+		Size hikeysize =  BtreeGetPostingOffset(item);
+		IndexTuple hikey = palloc0(hikeysize);
+
+		/* Truncate posting before insert it as a hikey. */
+		memcpy (hikey, item, hikeysize);
+		hikey->t_info &= ~INDEX_SIZE_MASK;
+		hikey->t_info |= hikeysize;
+		ItemPointerSet(&(hikey->t_tid), origpagenumber, P_HIKEY);
+
+		if (PageAddItem(leftpage, (Item) hikey, hikeysize, leftoff,
 					false, false) == InvalidOffsetNumber)
+		{
+			memset(rightpage, 0, BufferGetPageSize(rbuf));
+			elog(ERROR, "failed to add hikey to the left sibling"
+				" while splitting block %u of index \"%s\"",
+				origpagenumber, RelationGetRelationName(rel));
+		}
+
+		pfree(hikey);
+	}
+	else
 	{
-		memset(rightpage, 0, BufferGetPageSize(rbuf));
-		elog(ERROR, "failed to add hikey to the left sibling"
-			 " while splitting block %u of index \"%s\"",
-			 origpagenumber, RelationGetRelationName(rel));
+		if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+						false, false) == InvalidOffsetNumber)
+		{
+			memset(rightpage, 0, BufferGetPageSize(rbuf));
+			elog(ERROR, "failed to add hikey to the left sibling"
+				" while splitting block %u of index \"%s\"",
+				origpagenumber, RelationGetRelationName(rel));
+		}
 	}
 	leftoff = OffsetNumberNext(leftoff);
 
@@ -2103,6 +2174,120 @@ _bt_pgaddtup(Page page,
 }
 
 /*
+ * _bt_pgupdtup() -- update a tuple in place.
+ * This function is used for purposes of deduplication of item pointers.
+ *
+ * If new tuple to insert is equal to the tuple that already exists
+ * on the page, we can avoid key insertion and just add new item pointer.
+ *
+ * offset is the position of olditup on the page.
+ * itup is the new tuple to insert.
+ * olditup is the old tuple itself.
+ * nipd is the number of item pointers in old tuple.
+ * The caller is responsible for checking of free space on the page.
+ */
+void
+_bt_pgupdtup(Relation rel, Buffer buf, OffsetNumber offset, IndexTuple itup,
+			 IndexTuple olditup, int nipd)
+{
+	ItemPointerData *ipd;
+	IndexTuple 		newitup;
+	Size 			newitupsz;
+	Page			page;
+
+	page = BufferGetPage(buf);
+
+	ipd = palloc0(sizeof(ItemPointerData)*(nipd + 1));
+
+	/* copy item pointers from old tuple into ipd */
+	if (BtreeTupleIsPosting(olditup))
+		memcpy(ipd, BtreeGetPosting(olditup), sizeof(ItemPointerData)*nipd);
+	else
+		memcpy(ipd, olditup, sizeof(ItemPointerData));
+
+	/* add item pointer of the new tuple into ipd */
+	memcpy(ipd+nipd, itup, sizeof(ItemPointerData));
+
+	newitup = BtreeReformPackedTuple(itup, ipd, nipd+1);
+
+	/*
+	* Update the tuple in place. We have already checked that the
+	* new tuple would fit into this page, so it's safe to delete
+	* old tuple and insert the new one without any side effects.
+	*/
+	newitupsz = IndexTupleDSize(*newitup);
+	newitupsz = MAXALIGN(newitupsz);
+
+
+	START_CRIT_SECTION();
+
+	PageIndexTupleDelete(page, offset);
+
+	if (!_bt_pgaddtup(page, newitupsz, newitup, offset))
+		elog(ERROR, "failed to insert compressed item in index \"%s\"",
+			 RelationGetRelationName(rel));
+
+	MarkBufferDirty(buf);
+
+	/* Xlog stuff */
+	if (RelationNeedsWAL(rel))
+	{
+		xl_btree_insert xlrec;
+		uint8		xlinfo;
+		XLogRecPtr	recptr;
+		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+		xlrec.offnum = offset;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
+
+		/* TODO add some Xlog stuff for inner pages?
+		 * Don't sure if we really need it?*/
+		Assert(P_ISLEAF(pageop));
+		xlinfo = XLOG_BTREE_UPDATE_TUPLE;
+
+		/* Read comments in _bt_pgaddtup */
+		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+
+		XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
+
+		recptr = XLogInsert(RM_BTREE_ID, xlinfo);
+
+		PageSetLSN(page, recptr);
+	}
+
+	END_CRIT_SECTION();
+
+	pfree(ipd);
+	pfree(newitup);
+	_bt_relbuf(rel, buf);
+}
+
+/*
+ * _bt_pgrewritetup() -- update a tuple in place.
+ * This function is used for handling compressed tuples.
+ * It is used to update compressed tuple after vacuuming.
+ * and to rewirite hikey while building index.
+ * offset is the position of olditup on the page.
+ * itup is the new tuple to insert
+ * The caller is responsible for checking of free space on the page.
+ */
+void
+_bt_pgrewritetup(Relation rel, Buffer buf, Page page, OffsetNumber offset, IndexTuple itup)
+{
+	START_CRIT_SECTION();
+
+	PageIndexTupleDelete(page, offset);
+
+	if (!_bt_pgaddtup(page, IndexTupleSize(itup), itup, offset))
+		elog(ERROR, "failed to rewrite compressed item in index \"%s\"",
+			 RelationGetRelationName(rel));
+
+	END_CRIT_SECTION();
+}
+
+/*
  * _bt_isequal - used in _bt_doinsert in check for duplicates.
  *
  * This is very similar to _bt_compare, except for NULL handling.
@@ -2151,6 +2336,63 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
 }
 
 /*
+ * _bt_isbinaryequal -  used in _bt_doinsert and _bt_load
+ * in check for duplicates. This is very similar to heap_tuple_attr_equals
+ * subroutine. And this function differs from _bt_isequal
+ * because here we require strict binary equality of tuples.
+ */
+bool
+_bt_isbinaryequal(TupleDesc itupdesc, IndexTuple itup,
+			int nindatts, IndexTuple ituptoinsert)
+{
+	AttrNumber	attno;
+
+	for (attno = 1; attno <= nindatts; attno++)
+	{
+		Datum		datum1,
+					datum2;
+		bool		isnull1,
+					isnull2;
+		Form_pg_attribute att;
+
+		datum1 = index_getattr(itup, attno, itupdesc, &isnull1);
+		datum2 = index_getattr(ituptoinsert, attno, itupdesc, &isnull2);
+
+		/*
+		 * If one value is NULL and other is not, then they are certainly not
+		 * equal
+		 */
+		if (isnull1 != isnull2)
+			return false;
+		/*
+		 * We do simple binary comparison of the two datums.  This may be overly
+		 * strict because there can be multiple binary representations for the
+		 * same logical value.  But we should be OK as long as there are no false
+		 * positives.  Using a type-specific equality operator is messy because
+		 * there could be multiple notions of equality in different operator
+		 * classes; furthermore, we cannot safely invoke user-defined functions
+		 * while holding exclusive buffer lock.
+		 */
+		if (attno <= 0)
+		{
+			/* The only allowed system columns are OIDs, so do this */
+			if (DatumGetObjectId(datum1) != DatumGetObjectId(datum2))
+				return false;
+		}
+		else
+		{
+			Assert(attno <= itupdesc->natts);
+			att = itupdesc->attrs[attno - 1];
+			if(!datumIsEqual(datum1, datum2, att->attbyval, att->attlen))
+				return false;
+		}
+	}
+
+	/* if we get here, the keys are equal */
+	return true;
+}
+
+/*
  * _bt_vacuum_one_page - vacuum just one index page.
  *
  * Try to remove LP_DEAD items from the given page.  The passed buffer
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 67755d7..53c30d2 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -787,15 +787,36 @@ _bt_page_recyclable(Page page)
 void
 _bt_delitems_vacuum(Relation rel, Buffer buf,
 					OffsetNumber *itemnos, int nitems,
+					OffsetNumber *remainingoffset, IndexTuple *remaining, int nremaining,
 					BlockNumber lastBlockVacuumed)
 {
 	Page		page = BufferGetPage(buf);
 	BTPageOpaque opaque;
+	int i;
+	Size itemsz;
 
 	/* No ereport(ERROR) until changes are logged */
 	START_CRIT_SECTION();
 
-	/* Fix the page */
+	/* Handle compressed tuples here. */
+	for (i = 0; i < nremaining; i++)
+	{
+		/* At first, delete the old tuple.*/
+		PageIndexTupleDelete(page, remainingoffset[i]);
+
+		itemsz = IndexTupleSize(remaining[i]);
+		itemsz = MAXALIGN(itemsz);
+
+		/* Add tuple with remaining ItemPointers to the page.*/
+		if (PageAddItem(page, (Item) remaining[i], itemsz, remainingoffset[i],
+							false, false) == InvalidOffsetNumber)
+				elog(PANIC, "failed to rewrite compressed item in index while doing vacuum");
+	}
+
+	/* Fix the page.
+	 * After dealing with posting tuples,
+	 * just delete all tuples to be deleted.
+	 */
 	if (nitems > 0)
 		PageIndexMultiDelete(page, itemnos, nitems);
 
@@ -824,12 +845,28 @@ _bt_delitems_vacuum(Relation rel, Buffer buf,
 		xl_btree_vacuum xlrec_vacuum;
 
 		xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
+		xlrec_vacuum.nremaining = nremaining;
+		xlrec_vacuum.ndeleted = nitems;
 
 		XLogBeginInsert();
 		XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
 		XLogRegisterData((char *) &xlrec_vacuum, SizeOfBtreeVacuum);
 
 		/*
+		 * Here we should save offnums and remaining tuples themselves.
+		 * It's important to restore them in correct order.
+		 * At first, we must handle remaining tuples and only after that
+		 * other deleted items.
+		 */
+		if (nremaining > 0)
+		{
+			int i;
+			XLogRegisterBufData(0, (char *) remainingoffset, nremaining * sizeof(OffsetNumber));
+			for (i = 0; i < nremaining; i++)
+				XLogRegisterBufData(0, (char *) remaining[i], IndexTupleSize(remaining[i]));
+		}
+
+		/*
 		 * The target-offsets array is not in the buffer, but pretend that it
 		 * is.  When XLogInsert stores the whole buffer, the offsets array
 		 * need not be stored too.
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f2905cb..39e125f 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -74,7 +74,8 @@ static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 			 BTCycleId cycleid);
 static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
 			 BlockNumber orig_blkno);
-
+static ItemPointer btreevacuumPosting(BTVacState *vstate,
+						ItemPointerData *items,int nitem, int *nremaining);
 
 /*
  * Btree handler function: return IndexAmRoutine with access method parameters
@@ -861,7 +862,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 								 RBM_NORMAL, info->strategy);
 		LockBufferForCleanup(buf);
 		_bt_checkpage(rel, buf);
-		_bt_delitems_vacuum(rel, buf, NULL, 0, vstate.lastBlockVacuumed);
+		_bt_delitems_vacuum(rel, buf, NULL, 0, NULL, NULL, 0, vstate.lastBlockVacuumed);
 		_bt_relbuf(rel, buf);
 	}
 
@@ -962,6 +963,9 @@ restart:
 		OffsetNumber offnum,
 					minoff,
 					maxoff;
+		IndexTuple	remaining[MaxOffsetNumber];
+		OffsetNumber remainingoffset[MaxOffsetNumber];
+		int			nremaining;
 
 		/*
 		 * Trade in the initial read lock for a super-exclusive write lock on
@@ -998,6 +1002,7 @@ restart:
 		 * callback function.
 		 */
 		ndeletable = 0;
+		nremaining = 0;
 		minoff = P_FIRSTDATAKEY(opaque);
 		maxoff = PageGetMaxOffsetNumber(page);
 		if (callback)
@@ -1011,31 +1016,75 @@ restart:
 
 				itup = (IndexTuple) PageGetItem(page,
 												PageGetItemId(page, offnum));
-				htup = &(itup->t_tid);
-
-				/*
-				 * During Hot Standby we currently assume that
-				 * XLOG_BTREE_VACUUM records do not produce conflicts. That is
-				 * only true as long as the callback function depends only
-				 * upon whether the index tuple refers to heap tuples removed
-				 * in the initial heap scan. When vacuum starts it derives a
-				 * value of OldestXmin. Backends taking later snapshots could
-				 * have a RecentGlobalXmin with a later xid than the vacuum's
-				 * OldestXmin, so it is possible that row versions deleted
-				 * after OldestXmin could be marked as killed by other
-				 * backends. The callback function *could* look at the index
-				 * tuple state in isolation and decide to delete the index
-				 * tuple, though currently it does not. If it ever did, we
-				 * would need to reconsider whether XLOG_BTREE_VACUUM records
-				 * should cause conflicts. If they did cause conflicts they
-				 * would be fairly harsh conflicts, since we haven't yet
-				 * worked out a way to pass a useful value for
-				 * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
-				 * applies to *any* type of index that marks index tuples as
-				 * killed.
-				 */
-				if (callback(htup, callback_state))
-					deletable[ndeletable++] = offnum;
+				if(BtreeTupleIsPosting(itup))
+				{
+					ItemPointer newipd;
+					int 		nipd,
+								nnewipd;
+
+					nipd = BtreeGetNPosting(itup);
+
+					/*
+					 * Delete from the posting list all ItemPointers
+					 * which are no more valid. newipd contains list of remainig
+					 * ItemPointers or NULL if none of the items need to be removed.
+					 */
+					newipd = btreevacuumPosting(vstate, BtreeGetPosting(itup), nipd, &nnewipd);
+
+					if (newipd != NULL)
+					{
+						if (nnewipd > 0)
+						{
+							/*
+							 * There are still some live tuples in the posting.
+							 * We should update this tuple in place. It'll be done later
+							 * in _bt_delitems_vacuum(). To do that we need to save
+							 * information about the tuple. remainingoffset - offset of the
+							 * old tuple to be deleted. And new tuple to insert on the same
+							 * position, which contains remaining ItemPointers.
+							 */
+							remainingoffset[nremaining] = offnum;
+							remaining[nremaining] = BtreeReformPackedTuple(itup, newipd, nnewipd);
+							nremaining++;
+						}
+						else
+						{
+							/*
+							 * If all ItemPointers should be deleted,
+							 * we can delete this tuple in a regular way.
+							 */
+							deletable[ndeletable++] = offnum;
+						}
+					}
+				}
+				else
+				{
+					htup = &(itup->t_tid);
+
+					/*
+					* During Hot Standby we currently assume that
+					* XLOG_BTREE_VACUUM records do not produce conflicts. That is
+					* only true as long as the callback function depends only
+					* upon whether the index tuple refers to heap tuples removed
+					* in the initial heap scan. When vacuum starts it derives a
+					* value of OldestXmin. Backends taking later snapshots could
+					* have a RecentGlobalXmin with a later xid than the vacuum's
+					* OldestXmin, so it is possible that row versions deleted
+					* after OldestXmin could be marked as killed by other
+					* backends. The callback function *could* look at the index
+					* tuple state in isolation and decide to delete the index
+					* tuple, though currently it does not. If it ever did, we
+					* would need to reconsider whether XLOG_BTREE_VACUUM records
+					* should cause conflicts. If they did cause conflicts they
+					* would be fairly harsh conflicts, since we haven't yet
+					* worked out a way to pass a useful value for
+					* latestRemovedXid on the XLOG_BTREE_VACUUM records. This
+					* applies to *any* type of index that marks index tuples as
+					* killed.
+					*/
+					if (callback(htup, callback_state))
+						deletable[ndeletable++] = offnum;
+				}
 			}
 		}
 
@@ -1043,7 +1092,7 @@ restart:
 		 * Apply any needed deletes.  We issue just one _bt_delitems_vacuum()
 		 * call per page, so as to minimize WAL traffic.
 		 */
-		if (ndeletable > 0)
+		if (ndeletable > 0 || nremaining > 0)
 		{
 			BlockNumber	lastBlockVacuumed = InvalidBlockNumber;
 
@@ -1070,7 +1119,7 @@ restart:
 			 * doesn't seem worth the amount of bookkeeping it'd take to avoid
 			 * that.
 			 */
-			_bt_delitems_vacuum(rel, buf, deletable, ndeletable,
+			_bt_delitems_vacuum(rel, buf, deletable, ndeletable, remainingoffset, remaining, nremaining,
 								lastBlockVacuumed);
 
 			/*
@@ -1160,3 +1209,50 @@ btcanreturn(Relation index, int attno)
 {
 	return true;
 }
+
+/*
+ * btreevacuumPosting() -- vacuums a posting list.
+ * The size of the list must be specified via number of items (nitems).
+ *
+ * If none of the items need to be removed, returns NULL. Otherwise returns
+ * a new palloc'd array with the remaining items. The number of remaining
+ * items is returned via nremaining.
+ */
+ItemPointer
+btreevacuumPosting(BTVacState *vstate, ItemPointerData *items,
+				   int nitem, int *nremaining)
+{
+	int			i,
+				remaining = 0;
+	ItemPointer tmpitems = NULL;
+	IndexBulkDeleteCallback callback = vstate->callback;
+	void	   *callback_state = vstate->callback_state;
+
+	/*
+	 * Iterate over TIDs array
+	 */
+	for (i = 0; i < nitem; i++)
+	{
+		if (callback(items + i, callback_state))
+		{
+			if (!tmpitems)
+			{
+				/*
+				 * First TID to be deleted: allocate memory to hold the
+				 * remaining items.
+				 */
+				tmpitems = palloc(sizeof(ItemPointerData) * nitem);
+				memcpy(tmpitems, items, sizeof(ItemPointerData) * i);
+			}
+		}
+		else
+		{
+			if (tmpitems)
+				tmpitems[remaining] = items[i];
+			remaining++;
+		}
+	}
+
+	*nremaining = remaining;
+	return tmpitems;
+}
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 14dffe0..2cb1769 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -29,6 +29,8 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 			 OffsetNumber offnum);
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
 			 OffsetNumber offnum, IndexTuple itup);
+static void _bt_savePostingitem(BTScanOpaque so, int itemIndex,
+			 OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
 static Buffer _bt_walk_left(Relation rel, Buffer buf);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
@@ -1134,6 +1136,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 	int			itemIndex;
 	IndexTuple	itup;
 	bool		continuescan;
+	int 		i;
 
 	/*
 	 * We must have the buffer pinned and locked, but the usual macro can't be
@@ -1168,6 +1171,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 	/* initialize tuple workspace to empty */
 	so->currPos.nextTupleOffset = 0;
+	so->currPos.prevTupleOffset = 0;
 
 	/*
 	 * Now that the current page has been made consistent, the macro should be
@@ -1188,8 +1192,19 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			if (itup != NULL)
 			{
 				/* tuple passes all scan key conditions, so remember it */
-				_bt_saveitem(so, itemIndex, offnum, itup);
-				itemIndex++;
+				if (BtreeTupleIsPosting(itup))
+				{
+					for (i = 0; i < BtreeGetNPosting(itup); i++)
+					{
+						_bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+						itemIndex++;
+					}
+				}
+				else
+				{
+					_bt_saveitem(so, itemIndex, offnum, itup);
+					itemIndex++;
+				}
 			}
 			if (!continuescan)
 			{
@@ -1201,7 +1216,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			offnum = OffsetNumberNext(offnum);
 		}
 
-		Assert(itemIndex <= MaxIndexTuplesPerPage);
+		Assert(itemIndex <= MaxPackedIndexTuplesPerPage);
 		so->currPos.firstItem = 0;
 		so->currPos.lastItem = itemIndex - 1;
 		so->currPos.itemIndex = 0;
@@ -1209,7 +1224,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 	else
 	{
 		/* load items[] in descending order */
-		itemIndex = MaxIndexTuplesPerPage;
+		itemIndex = MaxPackedIndexTuplesPerPage;
 
 		offnum = Min(offnum, maxoff);
 
@@ -1219,8 +1234,20 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			if (itup != NULL)
 			{
 				/* tuple passes all scan key conditions, so remember it */
-				itemIndex--;
-				_bt_saveitem(so, itemIndex, offnum, itup);
+				if (BtreeTupleIsPosting(itup))
+				{
+					for (i = 0; i < BtreeGetNPosting(itup); i++)
+					{
+						itemIndex--;
+						_bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+					}
+				}
+				else
+				{
+					itemIndex--;
+					_bt_saveitem(so, itemIndex, offnum, itup);
+				}
+
 			}
 			if (!continuescan)
 			{
@@ -1234,8 +1261,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 		Assert(itemIndex >= 0);
 		so->currPos.firstItem = itemIndex;
-		so->currPos.lastItem = MaxIndexTuplesPerPage - 1;
-		so->currPos.itemIndex = MaxIndexTuplesPerPage - 1;
+		so->currPos.lastItem = MaxPackedIndexTuplesPerPage - 1;
+		so->currPos.itemIndex = MaxPackedIndexTuplesPerPage - 1;
 	}
 
 	return (so->currPos.firstItem <= so->currPos.lastItem);
@@ -1261,6 +1288,37 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
 }
 
 /*
+ * Save an index item into so->currPos.items[itemIndex]
+ * Performing index-only scan, handle the first elem separately.
+ * Save the key once, and connect it with posting tids using tupleOffset.
+ */
+static void
+_bt_savePostingitem(BTScanOpaque so, int itemIndex,
+			 OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i)
+{
+	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+	currItem->heapTid = *iptr;
+	currItem->indexOffset = offnum;
+
+	if (so->currTuples)
+	{
+		if (i == 0)
+		{
+			/* save key. the same for all tuples in the posting */
+			Size itupsz = BtreeGetPostingOffset(itup);
+			currItem->tupleOffset = so->currPos.nextTupleOffset;
+			memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+			so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+			so->currPos.prevTupleOffset = currItem->tupleOffset;
+		}
+		else
+			currItem->tupleOffset = so->currPos.prevTupleOffset;
+	}
+}
+
+
+/*
  *	_bt_steppage() -- Step to next page containing valid data for scan
  *
  * On entry, if so->currPos.buf is valid the buffer is pinned but not locked;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..906b9df 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -75,7 +75,7 @@
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
 #include "utils/tuplesort.h"
-
+#include "catalog/catalog.h"
 
 /*
  * Status record for spooling/sorting phase.  (Note we may have two of
@@ -136,6 +136,9 @@ static void _bt_sortaddtup(Page page, Size itemsize,
 static void _bt_buildadd(BTWriteState *wstate, BTPageState *state,
 			 IndexTuple itup);
 static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
+static SortSupport _bt_prepare_SortSupport(BTWriteState *wstate, int keysz);
+static int	_bt_call_comparator(SortSupport sortKeys, int i,
+				IndexTuple itup, IndexTuple itup2, TupleDesc tupdes);
 static void _bt_load(BTWriteState *wstate,
 		 BTSpool *btspool, BTSpool *btspool2);
 
@@ -527,15 +530,120 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		Assert(last_off > P_FIRSTKEY);
 		ii = PageGetItemId(opage, last_off);
 		oitup = (IndexTuple) PageGetItem(opage, ii);
-		_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
 
 		/*
-		 * Move 'last' into the high key position on opage
+		 * If the item is PostingTuple, we can cut it, because HIKEY
+		 * is not considered as real data, and it need not to keep any
+		 * ItemPointerData at all. And of course it need not to keep
+		 * a list of ipd.
+		 * But, if it had a big posting list, there will be plenty of
+		 * free space on the opage. In that case we must split posting
+		 * tuple into 2 pieces.
 		 */
-		hii = PageGetItemId(opage, P_HIKEY);
-		*hii = *ii;
-		ItemIdSetUnused(ii);	/* redundant */
-		((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+		 if (BtreeTupleIsPosting(oitup))
+		 {
+			IndexTuple  keytup;
+			Size 		keytupsz;
+			int 		nipd,
+						ntocut,
+						ntoleave;
+
+			nipd = BtreeGetNPosting(oitup);
+			ntocut = (sizeof(ItemIdData) + BtreeGetPostingOffset(oitup))/sizeof(ItemPointerData);
+			ntocut++; /* round up to be sure that we cut enough */
+			ntoleave = nipd - ntocut;
+
+			/*
+			 * 0) Form key tuple, that doesn't contain any ipd.
+			 * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+			 * any function that uses keytup should handle them itself.
+			 */
+			keytupsz =  BtreeGetPostingOffset(oitup);
+			keytup = palloc0(keytupsz);
+			memcpy (keytup, oitup, keytupsz);
+			keytup->t_info &= ~INDEX_SIZE_MASK;
+			keytup->t_info |= keytupsz;
+			ItemPointerSet(&(keytup->t_tid), oblkno, P_HIKEY);
+
+			if (ntocut < nipd)
+			{
+				ItemPointerData *newipd;
+				IndexTuple		newitup,
+								newlasttup;
+				/*
+				 * 1) Cut part of old tuple to shift to npage.
+				 * And insert it as P_FIRSTKEY.
+				 * This tuple is based on keytup.
+				 * Blkno & offnum are reset in BtreeFormPackedTuple.
+				 */
+				newipd = palloc0(sizeof(ItemPointerData)*ntocut);
+				/* Note, that we cut last 'ntocut' items */
+				memcpy(newipd, BtreeGetPosting(oitup)+ntoleave, sizeof(ItemPointerData)*ntocut);
+				newitup = BtreeFormPackedTuple(keytup, newipd, ntocut);
+
+				_bt_sortaddtup(npage, IndexTupleSize(newitup), newitup, P_FIRSTKEY);
+				pfree(newipd);
+				pfree(newitup);
+
+				/*
+				 * 2) set last item to the P_HIKEY linp
+				 * Move 'last' into the high key position on opage
+				 * NOTE: Do this because of indextuple deletion algorithm, which
+				 * doesn't allow to delete an item while we have unused one before it.
+				 */
+				hii = PageGetItemId(opage, P_HIKEY);
+				*hii = *ii;
+				ItemIdSetUnused(ii);	/* redundant */
+				((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+				/* 3) delete "wrong" high key, insert keytup as P_HIKEY. */
+				_bt_pgrewritetup(wstate->index, InvalidBuffer, opage, P_HIKEY, keytup);
+
+				/* 4) form the part of old tuple with ntoleave ipds. And insert it as last tuple. */
+				newlasttup = BtreeFormPackedTuple(keytup, BtreeGetPosting(oitup), ntoleave);
+
+				_bt_sortaddtup(opage, IndexTupleSize(newlasttup), newlasttup, PageGetMaxOffsetNumber(opage)+1);
+
+				pfree(newlasttup);
+			}
+			else
+			{
+				/* The tuple isn't big enough to split it. Handle it as a regular tuple. */
+
+				/*
+				 * 1) Shift the last tuple to npage.
+				 * Insert it as P_FIRSTKEY.
+				 */
+				_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+				/* 2) set last item to the P_HIKEY linp */
+				/* Move 'last' into the high key position on opage */
+				hii = PageGetItemId(opage, P_HIKEY);
+				*hii = *ii;
+				ItemIdSetUnused(ii);	/* redundant */
+				((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+				/* 3) delete "wrong" high key, insert keytup as P_HIKEY. */
+				_bt_pgrewritetup(wstate->index, InvalidBuffer, opage, P_HIKEY, keytup);
+
+			}
+			pfree(keytup);
+		 }
+		 else
+		 {
+			/*
+			 * 1) Shift the last tuple to npage.
+			 * Insert it as P_FIRSTKEY.
+			 */
+			_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+			/* 2) set last item to the P_HIKEY linp */
+			/* Move 'last' into the high key position on opage */
+			hii = PageGetItemId(opage, P_HIKEY);
+			*hii = *ii;
+			ItemIdSetUnused(ii);	/* redundant */
+			((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+		}
 
 		/*
 		 * Link the old page into its parent, using its minimum key. If we
@@ -547,6 +655,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 
 		Assert(state->btps_minkey != NULL);
 		ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY);
+
 		_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
 		pfree(state->btps_minkey);
 
@@ -554,8 +663,12 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		 * Save a copy of the minimum key for the new page.  We have to copy
 		 * it off the old page, not the new one, in case we are not at leaf
 		 * level.
+		 * We can not just copy oitup, because it could be posting tuple
+		 * and it's more safe just to get new inserted hikey.
 		 */
-		state->btps_minkey = CopyIndexTuple(oitup);
+		ItemId iihk = PageGetItemId(opage, P_HIKEY);
+		IndexTuple hikey = (IndexTuple) PageGetItem(opage, iihk);
+		state->btps_minkey = CopyIndexTuple(hikey);
 
 		/*
 		 * Set the sibling links for both pages.
@@ -590,7 +703,29 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 	if (last_off == P_HIKEY)
 	{
 		Assert(state->btps_minkey == NULL);
-		state->btps_minkey = CopyIndexTuple(itup);
+
+		if (BtreeTupleIsPosting(itup))
+		{
+			Size		keytupsz;
+			IndexTuple  keytup;
+
+			/*
+			 * 0) Form key tuple, that doesn't contain any ipd.
+			 * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+			 * any function that uses keytup should handle them itself.
+			 */
+			keytupsz =  BtreeGetPostingOffset(itup);
+			keytup = palloc0(keytupsz);
+			memcpy (keytup, itup, keytupsz);
+
+			keytup->t_info &= ~INDEX_SIZE_MASK;
+			keytup->t_info |= keytupsz;
+			ItemPointerSet(&(keytup->t_tid), nblkno, P_HIKEY);
+
+			state->btps_minkey = CopyIndexTuple(keytup);
+		}
+		else
+			state->btps_minkey = CopyIndexTuple(itup);
 	}
 
 	/*
@@ -670,6 +805,71 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 }
 
 /*
+ * Prepare SortSupport structure for indextuples comparison
+ */
+static SortSupport
+_bt_prepare_SortSupport(BTWriteState *wstate, int keysz)
+{
+	ScanKey		indexScanKey;
+	SortSupport sortKeys;
+	int 		i;
+
+	/* Prepare SortSupport data for each column */
+	indexScanKey = _bt_mkscankey_nodata(wstate->index);
+	sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
+
+	for (i = 0; i < keysz; i++)
+	{
+		SortSupport sortKey = sortKeys + i;
+		ScanKey		scanKey = indexScanKey + i;
+		int16		strategy;
+
+		sortKey->ssup_cxt = CurrentMemoryContext;
+		sortKey->ssup_collation = scanKey->sk_collation;
+		sortKey->ssup_nulls_first =
+			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+		sortKey->ssup_attno = scanKey->sk_attno;
+		/* Abbreviation is not supported here */
+		sortKey->abbreviate = false;
+
+		AssertState(sortKey->ssup_attno != 0);
+
+		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+			BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+		PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
+	}
+
+	_bt_freeskey(indexScanKey);
+	return sortKeys;
+}
+
+/*
+ * Compare two tuples using sortKey on attribute i
+ */
+static int
+_bt_call_comparator(SortSupport sortKeys, int i,
+						 IndexTuple itup, IndexTuple itup2, TupleDesc tupdes)
+{
+		SortSupport entry;
+		Datum		attrDatum1,
+					attrDatum2;
+		bool		isNull1,
+					isNull2;
+		int32		compare;
+
+		entry = sortKeys + i - 1;
+		attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
+		attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
+
+		compare = ApplySortComparator(attrDatum1, isNull1,
+										attrDatum2, isNull2,
+										entry);
+
+		return compare;
+}
+
+/*
  * Read tuples in correct sort order from tuplesort, and load them into
  * btree leaves.
  */
@@ -679,16 +879,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	BTPageState *state = NULL;
 	bool		merge = (btspool2 != NULL);
 	IndexTuple	itup,
-				itup2 = NULL;
+				itup2 = NULL,
+				itupprev = NULL;
 	bool		should_free,
 				should_free2,
 				load1;
 	TupleDesc	tupdes = RelationGetDescr(wstate->index);
 	int			i,
 				keysz = RelationGetNumberOfAttributes(wstate->index);
-	ScanKey		indexScanKey = NULL;
+	int			ntuples = 0;
 	SortSupport sortKeys;
 
+	/* Prepare SortSupport structure for indextuples comparison */
+	sortKeys = (SortSupport)_bt_prepare_SortSupport(wstate, keysz);
+
 	if (merge)
 	{
 		/*
@@ -701,34 +905,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 									   true, &should_free);
 		itup2 = tuplesort_getindextuple(btspool2->sortstate,
 										true, &should_free2);
-		indexScanKey = _bt_mkscankey_nodata(wstate->index);
-
-		/* Prepare SortSupport data for each column */
-		sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
-
-		for (i = 0; i < keysz; i++)
-		{
-			SortSupport sortKey = sortKeys + i;
-			ScanKey		scanKey = indexScanKey + i;
-			int16		strategy;
-
-			sortKey->ssup_cxt = CurrentMemoryContext;
-			sortKey->ssup_collation = scanKey->sk_collation;
-			sortKey->ssup_nulls_first =
-				(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
-			sortKey->ssup_attno = scanKey->sk_attno;
-			/* Abbreviation is not supported here */
-			sortKey->abbreviate = false;
-
-			AssertState(sortKey->ssup_attno != 0);
-
-			strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
-				BTGreaterStrategyNumber : BTLessStrategyNumber;
-
-			PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
-		}
-
-		_bt_freeskey(indexScanKey);
 
 		for (;;)
 		{
@@ -742,20 +918,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 			{
 				for (i = 1; i <= keysz; i++)
 				{
-					SortSupport entry;
-					Datum		attrDatum1,
-								attrDatum2;
-					bool		isNull1,
-								isNull2;
-					int32		compare;
-
-					entry = sortKeys + i - 1;
-					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
-					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
-
-					compare = ApplySortComparator(attrDatum1, isNull1,
-												  attrDatum2, isNull2,
-												  entry);
+					int32 compare = _bt_call_comparator(sortKeys, i, itup, itup2, tupdes);
+
 					if (compare > 0)
 					{
 						load1 = false;
@@ -794,16 +958,123 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	else
 	{
 		/* merge is unnecessary */
-		while ((itup = tuplesort_getindextuple(btspool->sortstate,
+		Relation indexRelation = wstate->index;
+		Form_pg_index index = indexRelation->rd_index;
+
+		if (IsSystemRelation(indexRelation) || index->indisunique)
+		{
+			/* Do not use compression. */
+			while ((itup = tuplesort_getindextuple(btspool->sortstate,
 											   true, &should_free)) != NULL)
+			{
+				/* When we see first tuple, create first index page */
+				if (state == NULL)
+					state = _bt_pagestate(wstate, 0);
+
+				_bt_buildadd(wstate, state, itup);
+				if (should_free)
+					pfree(itup);
+			}
+		}
+		else
 		{
-			/* When we see first tuple, create first index page */
-			if (state == NULL)
-				state = _bt_pagestate(wstate, 0);
+			ItemPointerData *ipd = NULL;
+			IndexTuple 		postingtuple;
+			Size			maxitemsize = 0,
+							maxpostingsize = 0;
 
-			_bt_buildadd(wstate, state, itup);
-			if (should_free)
-				pfree(itup);
+			while ((itup = tuplesort_getindextuple(btspool->sortstate,
+											   true, &should_free)) != NULL)
+			{
+				/* When we see first tuple, create first index page */
+				if (state == NULL)
+				{
+					state = _bt_pagestate(wstate, 0);
+					maxitemsize = BTMaxItemSize(state->btps_page);
+				}
+
+				/*
+				 * Compare current tuple with previous one.
+				 * If tuples are equal, we can unite them into a posting list.
+				 */
+				if (itupprev != NULL)
+				{
+					if (_bt_isbinaryequal(tupdes, itupprev, index->indnatts, itup))
+					{
+						/* Tuples are equal. Create or update posting */
+						if (ntuples == 0)
+						{
+							/*
+							 * We haven't suitable posting list yet, so allocate
+							 * it and save both itupprev and current tuple.
+							 */
+							ipd = palloc0(maxitemsize);
+
+							memcpy(ipd, itupprev, sizeof(ItemPointerData));
+							ntuples++;
+							memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+							ntuples++;
+						}
+						else
+						{
+							if ((ntuples+1)*sizeof(ItemPointerData) < maxpostingsize)
+							{
+								memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+								ntuples++;
+							}
+							else
+							{
+								postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+								_bt_buildadd(wstate, state, postingtuple);
+								ntuples = 0;
+								pfree(ipd);
+							}
+						}
+
+					}
+					else
+					{
+						/* Tuples are not equal. Insert itupprev into index. */
+						if (ntuples == 0)
+							_bt_buildadd(wstate, state, itupprev);
+						else
+						{
+							postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+							_bt_buildadd(wstate, state, postingtuple);
+							ntuples = 0;
+							pfree(ipd);
+						}
+					}
+				}
+
+				/*
+				 * Copy the tuple into temp variable itupprev
+				 * to compare it with the following tuple
+				 * and maybe unite them into a posting tuple
+				 */
+				itupprev = CopyIndexTuple(itup);
+				if (should_free)
+					pfree(itup);
+
+				/* compute max size of ipd list */
+				maxpostingsize = maxitemsize - IndexInfoFindDataOffset(itupprev->t_info) - MAXALIGN(IndexTupleSize(itupprev));
+			}
+
+			/* Handle the last item.*/
+			if (ntuples == 0)
+			{
+				if (itupprev != NULL)
+					_bt_buildadd(wstate, state, itupprev);
+			}
+			else
+			{
+				Assert(ipd!=NULL);
+				Assert(itupprev != NULL);
+				postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+				_bt_buildadd(wstate, state, postingtuple);
+				ntuples = 0;
+				pfree(ipd);
+			}
 		}
 	}
 
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index b714b2c..53fcbcc 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1814,7 +1814,9 @@ _bt_killitems(IndexScanDesc scan)
 			ItemId		iid = PageGetItemId(page, offnum);
 			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
 
-			if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
+			/* No microvacuum for posting tuples */
+			if (!BtreeTupleIsPosting(ituple)
+				&& (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid)))
 			{
 				/* found the item */
 				ItemIdMarkDead(iid);
@@ -2056,3 +2058,69 @@ btoptions(Datum reloptions, bool validate)
 {
 	return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
 }
+
+/*
+ * Already have basic index tuple that contains key datum
+ */
+IndexTuple
+BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+	uint32	   newsize;
+	IndexTuple itup = CopyIndexTuple(tuple);
+
+	/*
+	 * Determine and store offset to the posting list.
+	 */
+	newsize = IndexTupleSize(itup);
+	newsize = SHORTALIGN(newsize);
+
+	/*
+	 * Set meta info about the posting list.
+	 */
+	BtreeSetPostingOffset(itup, newsize);
+	BtreeSetNPosting(itup, nipd);
+	/*
+	 * Add space needed for posting list, if any.  Then check that the tuple
+	 * won't be too big to store.
+	 */
+	newsize += sizeof(ItemPointerData)*nipd;
+	newsize = MAXALIGN(newsize);
+
+	/*
+	 * Resize tuple if needed
+	 */
+	if (newsize != IndexTupleSize(itup))
+	{
+		itup = repalloc(itup, newsize);
+
+		/*
+		 * PostgreSQL 9.3 and earlier did not clear this new space, so we
+		 * might find uninitialized padding when reading tuples from disk.
+		 */
+		memset((char *) itup + IndexTupleSize(itup),
+			   0, newsize - IndexTupleSize(itup));
+		/* set new size in tuple header */
+		itup->t_info &= ~INDEX_SIZE_MASK;
+		itup->t_info |= newsize;
+	}
+
+	/*
+	 * Copy data into the posting tuple
+	 */
+	memcpy(BtreeGetPosting(itup), data, sizeof(ItemPointerData)*nipd);
+	return itup;
+}
+
+IndexTuple
+BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+	int size;
+	if (BtreeTupleIsPosting(tuple))
+	{
+		size = BtreeGetPostingOffset(tuple);
+		tuple->t_info &= ~INDEX_SIZE_MASK;
+		tuple->t_info |= size;
+	}
+
+	return BtreeFormPackedTuple(tuple, data, nipd);
+}
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 0d094ca..6ced76c 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -475,14 +475,40 @@ btree_xlog_vacuum(XLogReaderState *record)
 
 		if (len > 0)
 		{
-			OffsetNumber *unused;
-			OffsetNumber *unend;
+			OffsetNumber *offset;
+			IndexTuple 	remaining;
+			int 		i;
+			Size		itemsz;
 
-			unused = (OffsetNumber *) ptr;
-			unend = (OffsetNumber *) ((char *) ptr + len);
+			offset = (OffsetNumber *) ptr;
+			remaining = (IndexTuple)(ptr + xlrec->nremaining*sizeof(OffsetNumber));
+
+			/* Handle posting tuples */
+			for(i = 0; i < xlrec->nremaining; i++)
+			{
+				PageIndexTupleDelete(page, offset[i]);
+
+				itemsz = IndexTupleSize(remaining);
+				itemsz = MAXALIGN(itemsz);
+
+				if (PageAddItem(page, (Item) remaining, itemsz, offset[i],
+						false, false) == InvalidOffsetNumber)
+						elog(PANIC, "btree_xlog_vacuum: failed to add remaining item");
 
-			if ((unend - unused) > 0)
-				PageIndexMultiDelete(page, unused, unend - unused);
+				remaining = (IndexTuple)((char*) remaining + itemsz);
+			}
+
+			if (xlrec->ndeleted > 0)
+			{
+				OffsetNumber *unused;
+				OffsetNumber *unend;
+
+				unused = (OffsetNumber *) ((char *)remaining);
+				unend = (OffsetNumber *) ((char *) ptr + len);
+
+				if ((unend - unused) > 0)
+					PageIndexMultiDelete(page, unused, unend - unused);
+			}
 		}
 
 		/*
@@ -713,6 +739,75 @@ btree_xlog_delete(XLogReaderState *record)
 		UnlockReleaseBuffer(buffer);
 }
 
+/*
+ * Applies changes performed by _bt_pgupdtup().
+ * TODO Add some stuff for inner pages. Don't sure if we really need it?
+ * See comment in _bt_pgupdtup().
+ */
+static void
+btree_xlog_update(bool isleaf, XLogReaderState *record)
+{
+	XLogRecPtr	lsn = record->EndRecPtr;
+	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+
+	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+	{
+		Size		datalen;
+		char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);
+		ItemPointerData *ipd;
+		IndexTuple 		olditup,
+						newitup;
+		Size 			newitupsz;
+		int				nipd;
+
+		/*TODO Following code needs some refactoring. Maybe one more function.*/
+		page = BufferGetPage(buffer);
+
+		olditup = (IndexTuple) PageGetItem(page, PageGetItemId(page, xlrec->offnum));
+
+		if (!BtreeTupleIsPosting(olditup))
+			nipd = 1;
+		else
+			nipd = BtreeGetNPosting(olditup);
+
+		ipd = palloc0(sizeof(ItemPointerData)*(nipd + 1));
+
+		/* copy item pointers from old tuple into ipd */
+		if (BtreeTupleIsPosting(olditup))
+			memcpy(ipd, BtreeGetPosting(olditup), sizeof(ItemPointerData)*nipd);
+		else
+			memcpy(ipd, olditup, sizeof(ItemPointerData));
+
+		/* add item pointer of the new tuple into ipd */
+		memcpy(ipd+nipd, (Item) datapos, sizeof(ItemPointerData));
+
+		newitup = BtreeReformPackedTuple((Item) datapos, ipd, nipd+1);
+
+		/*
+		* Update the tuple in place. We have already checked that the
+		* new tuple would fit into this page, so it's safe to delete
+		* old tuple and insert the new one without any side effects.
+		*/
+		newitupsz = IndexTupleDSize(*newitup);
+		newitupsz = MAXALIGN(newitupsz);
+
+		PageIndexTupleDelete(page, xlrec->offnum);
+
+		if (PageAddItem(page, (Item) newitup, newitupsz, xlrec->offnum,
+						false, false) == InvalidOffsetNumber)
+			elog(PANIC, "failed to update compressed tuple while doing recovery");
+
+
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+}
+
 static void
 btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
 {
@@ -988,6 +1083,9 @@ btree_redo(XLogReaderState *record)
 		case XLOG_BTREE_INSERT_META:
 			btree_xlog_insert(false, true, record);
 			break;
+		case XLOG_BTREE_UPDATE_TUPLE:
+			btree_xlog_update(true, record);
+			break;
 		case XLOG_BTREE_SPLIT_L:
 			btree_xlog_split(true, false, record);
 			break;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..3dd19c0 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -138,7 +138,6 @@ typedef IndexAttributeBitMapData *IndexAttributeBitMap;
 	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
 			(MAXALIGN(sizeof(IndexTupleData) + 1) + sizeof(ItemIdData))))
 
-
 /* routines in indextuple.c */
 extern IndexTuple index_form_tuple(TupleDesc tupleDescriptor,
 				 Datum *values, bool *isnull);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 9046b16..5496e94 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -122,6 +122,15 @@ typedef struct BTMetaPageData
 				   MAXALIGN(sizeof(BTPageOpaqueData))) / 3)
 
 /*
+ * If compression is applied, the page could contain more tuples
+ * than if it has only uncompressed tuples, so we need new max value.
+ * Note that it is a rough upper estimate.
+ */
+#define MaxPackedIndexTuplesPerPage	\
+	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
+			(sizeof(ItemPointerData))))
+
+/*
  * The leaf-page fillfactor defaults to 90% but is user-adjustable.
  * For pages above the leaf level, we use a fixed 70% fillfactor.
  * The fillfactor is applied during index build and when splitting
@@ -226,6 +235,7 @@ typedef struct BTMetaPageData
 										 * vacuum */
 #define XLOG_BTREE_REUSE_PAGE	0xD0	/* old page is about to be reused from
 										 * FSM */
+#define XLOG_BTREE_UPDATE_TUPLE	0xE0	/* update index tuple in place */
 
 /*
  * All that we need to regenerate the meta-data page
@@ -348,15 +358,31 @@ typedef struct xl_btree_reuse_page
  *
  * Note that the *last* WAL record in any vacuum of an index is allowed to
  * have a zero length array of offsets. Earlier records must have at least one.
+ * TODO: update this comment
  */
 typedef struct xl_btree_vacuum
 {
 	BlockNumber lastBlockVacuumed;
 
-	/* TARGET OFFSET NUMBERS FOLLOW */
+	/* 
+	 * This filed helps us to find beginning of the remining tuples
+	 * which follow array of offset numbers.
+	 */
+	int			nremaining;
+
+	/*
+	 * TODO: Don't sure if we do need following variable,
+	 * maybe just a flag would be enough to determine
+	 * if there is some data about deleted tuples
+	 */
+	int			ndeleted;
+
+	/* REMAINING OFFSET NUMBERS FOLLOW (nremaining values) */
+	/* REMAINING TUPLES TO INSERT FOLLOW (if nremaining > 0) */
+	/* TARGET OFFSET NUMBERS FOLLOW (if any) */
 } xl_btree_vacuum;
 
-#define SizeOfBtreeVacuum	(offsetof(xl_btree_vacuum, lastBlockVacuumed) + sizeof(BlockNumber))
+#define SizeOfBtreeVacuum	(offsetof(xl_btree_vacuum, lastBlockVacuumed) + sizeof(BlockNumber) + 2*sizeof(int))
 
 /*
  * This is what we need to know about marking an empty branch for deletion.
@@ -538,6 +564,8 @@ typedef struct BTScanPosData
 	 * location in the associated tuple storage workspace.
 	 */
 	int			nextTupleOffset;
+	/* prevTupleOffset is for Posting list handling*/
+	int			prevTupleOffset;
 
 	/*
 	 * The items array is always ordered in index order (ie, increasing
@@ -550,7 +578,7 @@ typedef struct BTScanPosData
 	int			lastItem;		/* last valid index in items[] */
 	int			itemIndex;		/* current index in items[] */
 
-	BTScanPosItem items[MaxIndexTuplesPerPage]; /* MUST BE LAST */
+	BTScanPosItem items[MaxPackedIndexTuplesPerPage]; /* MUST BE LAST */
 } BTScanPosData;
 
 typedef BTScanPosData *BTScanPos;
@@ -650,6 +678,27 @@ typedef BTScanOpaqueData *BTScanOpaque;
 #define SK_BT_DESC			(INDOPTION_DESC << SK_BT_INDOPTION_SHIFT)
 #define SK_BT_NULLS_FIRST	(INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
 
+
+/*
+ * We use our own ItemPointerGet(BlockNumber|OffsetNumber)
+ * to avoid Asserts, since sometimes the ip_posid isn't "valid"
+ */
+#define BtreeItemPointerGetBlockNumber(pointer) \
+	BlockIdGetBlockNumber(&(pointer)->ip_blkid)
+
+#define BtreeItemPointerGetOffsetNumber(pointer) \
+	((pointer)->ip_posid)
+
+#define BT_POSTING (1<<31)
+#define BtreeGetNPosting(itup)			BtreeItemPointerGetOffsetNumber(&(itup)->t_tid)
+#define BtreeSetNPosting(itup,n)		ItemPointerSetOffsetNumber(&(itup)->t_tid,n)
+
+#define BtreeGetPostingOffset(itup)		(BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & (~BT_POSTING))
+#define BtreeSetPostingOffset(itup,n)	ItemPointerSetBlockNumber(&(itup)->t_tid,(n)|BT_POSTING)
+#define BtreeTupleIsPosting(itup)    	(BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & BT_POSTING)
+#define BtreeGetPosting(itup)			(ItemPointerData*) ((char*)(itup) + BtreeGetPostingOffset(itup))
+#define BtreeGetPostingN(itup,n)		(ItemPointerData*) (BtreeGetPosting(itup) + n)
+
 /*
  * prototypes for functions in nbtree.c (external entry points for btree)
  */
@@ -683,6 +732,10 @@ extern bool _bt_doinsert(Relation rel, IndexTuple itup,
 			 IndexUniqueCheck checkUnique, Relation heapRel);
 extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
 extern void _bt_finish_split(Relation rel, Buffer bbuf, BTStack stack);
+extern void _bt_pgupdtup(Relation rel, Buffer buf, OffsetNumber offset, IndexTuple itup,
+			 IndexTuple olditup, int nipd);
+extern void _bt_pgrewritetup(Relation rel, Buffer buf, Page page, OffsetNumber offset, IndexTuple itup);
+extern bool _bt_isbinaryequal(TupleDesc itupdesc, IndexTuple itup, int nindatts, IndexTuple ituptoinsert);
 
 /*
  * prototypes for functions in nbtpage.c
@@ -702,6 +755,8 @@ extern void _bt_delitems_delete(Relation rel, Buffer buf,
 					OffsetNumber *itemnos, int nitems, Relation heapRel);
 extern void _bt_delitems_vacuum(Relation rel, Buffer buf,
 					OffsetNumber *itemnos, int nitems,
+					OffsetNumber *remainingoffset,
+					IndexTuple *remaining, int nremaining,
 					BlockNumber lastBlockVacuumed);
 extern int	_bt_pagedel(Relation rel, Buffer buf);
 
@@ -714,8 +769,8 @@ extern BTStack _bt_search(Relation rel,
 extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
 			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
 			  int access);
-extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
-			ScanKey scankey, bool nextkey);
+extern OffsetNumber _bt_binsrch( Relation rel, Buffer buf, int keysz,
+								ScanKey scankey, bool nextkey);
 extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
 			Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
@@ -746,6 +801,8 @@ extern void _bt_end_vacuum_callback(int code, Datum arg);
 extern Size BTreeShmemSize(void);
 extern void BTreeShmemInit(void);
 extern bytea *btoptions(Datum reloptions, bool validate);
+extern IndexTuple BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
+extern IndexTuple BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
 
 /*
  * prototypes for functions in nbtvalidate.c
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to