29.01.2016 19:01, Thom Brown:
On 29 January 2016 at 15:47, Aleksander Alekseev
<a.aleks...@postgrespro.ru> wrote:
I tested this patch on x64 and ARM servers for a few hours today. The
only problem I could find is that INSERT works considerably slower after
applying a patch. Beside that everything looks fine - no crashes, tests
pass, memory doesn't seem to leak, etc.
Thank you for testing. I rechecked that, and insertions are really very very very slow. It seems like a bug.
Okay, now for some badness.  I've restored a database containing 2
tables, one 318MB, another 24kB.  The 318MB table contains 5 million
rows with a sequential id column.  I get a problem if I try to delete
many rows from it:
# delete from contacts where id % 3 != 0 ;
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
I didn't manage to reproduce this. Thom, could you describe exact steps
to reproduce this issue please?
Sure, I used my pg_rep_test tool to create a primary (pg_rep_test
-r0), which creates an instance with a custom config, which is as
follows:

shared_buffers = 8MB
max_connections = 7
wal_level = 'hot_standby'
cluster_name = 'primary'
max_wal_senders = 3
wal_keep_segments = 6

Then create a pgbench data set (I didn't originally use pgbench, but
you can get the same results with it):

createdb -p 5530 pgbench
pgbench -p 5530 -i -s 100 pgbench

And delete some stuff:

thom@swift:~/Development/test$ psql -p 5530 pgbench
Timing is on.
psql (9.6devel)
Type "help" for help.


  ➤ psql://thom@[local]:5530/pgbench

# DELETE FROM pgbench_accounts WHERE aid % 3 != 0;
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
WARNING:  out of shared memory
...
WARNING:  out of shared memory
WARNING:  out of shared memory
DELETE 6666667
Time: 22218.804 ms

There were 358 lines of that warning message.  I don't get these
messages without the patch.

Thom

Thank you for this report.
I tried to reproduce it, but I couldn't. Debug will be much easier now.

I hope I'll fix these issueswithin the next few days.

BTW, I found a dummy mistake, the previous patch contains some unrelated changes. I fixed it in the new version (attached).

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e3c55eb..3908cc1 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -24,6 +24,7 @@
 #include "storage/predicate.h"
 #include "utils/tqual.h"
 
+#include "catalog/catalog.h"
 
 typedef struct
 {
@@ -60,7 +61,8 @@ static void _bt_findinsertloc(Relation rel,
 				  ScanKey scankey,
 				  IndexTuple newtup,
 				  BTStack stack,
-				  Relation heapRel);
+				  Relation heapRel,
+				  bool *updposing);
 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
 			   BTStack stack,
 			   IndexTuple itup,
@@ -113,6 +115,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
+	bool updposting = false;
 
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
@@ -162,8 +165,9 @@ top:
 	{
 		TransactionId xwait;
 		uint32		speculativeToken;
+		bool fakeupdposting = false; /* Never update posting in unique index */
 
-		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false, &fakeupdposting);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
 								 checkUnique, &is_unique, &speculativeToken);
 
@@ -200,8 +204,54 @@ top:
 		CheckForSerializableConflictIn(rel, NULL, buf);
 		/* do the insertion */
 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
-						  stack, heapRel);
-		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+						  stack, heapRel, &updposting);
+
+		if (IsSystemRelation(rel))
+			updposting = false;
+
+		/*
+		 * New tuple has the same key with tuple at the page.
+		 * Unite them into one posting.
+		 */
+		if (updposting)
+		{
+			Page		page;
+			IndexTuple olditup, newitup;
+			ItemPointerData *ipd;
+			int nipd;
+
+			page = BufferGetPage(buf);
+			olditup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+
+			if (BtreeTupleIsPosting(olditup))
+				nipd = BtreeGetNPosting(olditup);
+			else
+				nipd = 1;
+
+			ipd = palloc0(sizeof(ItemPointerData)*(nipd + 1));
+			/* copy item pointers from old tuple into ipd */
+			if (BtreeTupleIsPosting(olditup))
+				memcpy(ipd, BtreeGetPosting(olditup), sizeof(ItemPointerData)*nipd);
+			else
+				memcpy(ipd, olditup, sizeof(ItemPointerData));
+
+			/* add item pointer of the new tuple into ipd */
+			memcpy(ipd+nipd, itup, sizeof(ItemPointerData));
+
+			/*
+			 * Form posting tuple, then delete old tuple and insert posting tuple.
+			 */
+			newitup = BtreeReformPackedTuple(itup, ipd, nipd+1);
+			PageIndexTupleDelete(page, offset);
+			_bt_insertonpg(rel, buf, InvalidBuffer, stack, newitup, offset, false);
+
+			pfree(ipd);
+			pfree(newitup);
+		}
+		else
+		{
+			_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+		}
 	}
 	else
 	{
@@ -306,6 +356,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 
 				/* okay, we gotta fetch the heap tuple ... */
 				curitup = (IndexTuple) PageGetItem(page, curitemid);
+
+				Assert (!BtreeTupleIsPosting(curitup));
 				htid = curitup->t_tid;
 
 				/*
@@ -535,7 +587,8 @@ _bt_findinsertloc(Relation rel,
 				  ScanKey scankey,
 				  IndexTuple newtup,
 				  BTStack stack,
-				  Relation heapRel)
+				  Relation heapRel,
+				  bool *updposting)
 {
 	Buffer		buf = *bufptr;
 	Page		page = BufferGetPage(buf);
@@ -681,7 +734,7 @@ _bt_findinsertloc(Relation rel,
 	else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
 		newitemoff = firstlegaloff;
 	else
-		newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
+		newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false, updposting);
 
 	*bufptr = buf;
 	*offsetptr = newitemoff;
@@ -1042,6 +1095,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
 		itemid = PageGetItemId(origpage, P_HIKEY);
 		itemsz = ItemIdGetLength(itemid);
 		item = (IndexTuple) PageGetItem(origpage, itemid);
+
+		Assert(!BtreeTupleIsPosting(item));
+
 		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
 						false, false) == InvalidOffsetNumber)
 		{
@@ -1072,13 +1128,40 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
 		itemsz = ItemIdGetLength(itemid);
 		item = (IndexTuple) PageGetItem(origpage, itemid);
 	}
-	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+
+	if (BtreeTupleIsPosting(item))
+	{
+		Size hikeysize =  BtreeGetPostingOffset(item);
+		IndexTuple hikey = palloc0(hikeysize);
+		/*
+		 * Truncate posting before insert it as a hikey.
+		 */
+		memcpy (hikey, item, hikeysize);
+		hikey->t_info &= ~INDEX_SIZE_MASK;
+		hikey->t_info |= hikeysize;
+		ItemPointerSet(&(hikey->t_tid), origpagenumber, P_HIKEY);
+
+		if (PageAddItem(leftpage, (Item) hikey, hikeysize, leftoff,
 					false, false) == InvalidOffsetNumber)
+		{
+			memset(rightpage, 0, BufferGetPageSize(rbuf));
+			elog(ERROR, "failed to add hikey to the left sibling"
+				" while splitting block %u of index \"%s\"",
+				origpagenumber, RelationGetRelationName(rel));
+		}
+
+		pfree(hikey);
+	}
+	else
 	{
-		memset(rightpage, 0, BufferGetPageSize(rbuf));
-		elog(ERROR, "failed to add hikey to the left sibling"
-			 " while splitting block %u of index \"%s\"",
-			 origpagenumber, RelationGetRelationName(rel));
+		if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+						false, false) == InvalidOffsetNumber)
+		{
+			memset(rightpage, 0, BufferGetPageSize(rbuf));
+			elog(ERROR, "failed to add hikey to the left sibling"
+				" while splitting block %u of index \"%s\"",
+				origpagenumber, RelationGetRelationName(rel));
+		}
 	}
 	leftoff = OffsetNumberNext(leftoff);
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f2905cb..f56c90f 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -75,6 +75,9 @@ static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
 			 BlockNumber orig_blkno);
 
+static ItemPointer
+btreevacuumPosting(BTVacState *vstate, ItemPointerData *items,
+					  int nitem, int *nremaining);
 
 /*
  * Btree handler function: return IndexAmRoutine with access method parameters
@@ -962,6 +965,7 @@ restart:
 		OffsetNumber offnum,
 					minoff,
 					maxoff;
+		IndexTupleData *remaining;
 
 		/*
 		 * Trade in the initial read lock for a super-exclusive write lock on
@@ -1011,31 +1015,62 @@ restart:
 
 				itup = (IndexTuple) PageGetItem(page,
 												PageGetItemId(page, offnum));
-				htup = &(itup->t_tid);
-
-				/*
-				 * During Hot Standby we currently assume that
-				 * XLOG_BTREE_VACUUM records do not produce conflicts. That is
-				 * only true as long as the callback function depends only
-				 * upon whether the index tuple refers to heap tuples removed
-				 * in the initial heap scan. When vacuum starts it derives a
-				 * value of OldestXmin. Backends taking later snapshots could
-				 * have a RecentGlobalXmin with a later xid than the vacuum's
-				 * OldestXmin, so it is possible that row versions deleted
-				 * after OldestXmin could be marked as killed by other
-				 * backends. The callback function *could* look at the index
-				 * tuple state in isolation and decide to delete the index
-				 * tuple, though currently it does not. If it ever did, we
-				 * would need to reconsider whether XLOG_BTREE_VACUUM records
-				 * should cause conflicts. If they did cause conflicts they
-				 * would be fairly harsh conflicts, since we haven't yet
-				 * worked out a way to pass a useful value for
-				 * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
-				 * applies to *any* type of index that marks index tuples as
-				 * killed.
-				 */
-				if (callback(htup, callback_state))
-					deletable[ndeletable++] = offnum;
+				if(BtreeTupleIsPosting(itup))
+				{
+					int nipd, nnewipd;
+					ItemPointer newipd;
+
+					nipd = BtreeGetNPosting(itup);
+					newipd = btreevacuumPosting(vstate, BtreeGetPosting(itup), nipd, &nnewipd);
+
+					if (newipd != NULL)
+					{
+						if (nnewipd > 0)
+						{
+							/* There are still some live tuples in the posting.
+							 * 1) form new posting tuple, that contains remaining ipds
+							 * 2) delete "old" posting
+							 * 3) insert new posting back to the page
+							 */
+							remaining = BtreeReformPackedTuple(itup, newipd, nnewipd);
+							PageIndexTupleDelete(page, offnum);
+
+							if (PageAddItem(page, (Item) remaining, IndexTupleSize(remaining), offnum, false, false) != offnum)
+								elog(ERROR, "failed to add vacuumed posting tuple to index page in \"%s\"",
+										RelationGetRelationName(info->index));
+						}
+						else
+							deletable[ndeletable++] = offnum;
+					}
+				}
+				else
+				{
+					htup = &(itup->t_tid);
+
+					/*
+					* During Hot Standby we currently assume that
+					* XLOG_BTREE_VACUUM records do not produce conflicts. That is
+					* only true as long as the callback function depends only
+					* upon whether the index tuple refers to heap tuples removed
+					* in the initial heap scan. When vacuum starts it derives a
+					* value of OldestXmin. Backends taking later snapshots could
+					* have a RecentGlobalXmin with a later xid than the vacuum's
+					* OldestXmin, so it is possible that row versions deleted
+					* after OldestXmin could be marked as killed by other
+					* backends. The callback function *could* look at the index
+					* tuple state in isolation and decide to delete the index
+					* tuple, though currently it does not. If it ever did, we
+					* would need to reconsider whether XLOG_BTREE_VACUUM records
+					* should cause conflicts. If they did cause conflicts they
+					* would be fairly harsh conflicts, since we haven't yet
+					* worked out a way to pass a useful value for
+					* latestRemovedXid on the XLOG_BTREE_VACUUM records. This
+					* applies to *any* type of index that marks index tuples as
+					* killed.
+					*/
+					if (callback(htup, callback_state))
+						deletable[ndeletable++] = offnum;
+				}
 			}
 		}
 
@@ -1160,3 +1195,51 @@ btcanreturn(Relation index, int attno)
 {
 	return true;
 }
+
+
+/*
+ * Vacuums a posting list. The size of the list must be specified
+ * via number of items (nitems).
+ *
+ * If none of the items need to be removed, returns NULL. Otherwise returns
+ * a new palloc'd array with the remaining items. The number of remaining
+ * items is returned via nremaining.
+ */
+ItemPointer
+btreevacuumPosting(BTVacState *vstate, ItemPointerData *items,
+					  int nitem, int *nremaining)
+{
+	int			i,
+				remaining = 0;
+	ItemPointer tmpitems = NULL;
+	IndexBulkDeleteCallback callback = vstate->callback;
+	void	   *callback_state = vstate->callback_state;
+
+	/*
+	 * Iterate over TIDs array
+	 */
+	for (i = 0; i < nitem; i++)
+	{
+		if (callback(items + i, callback_state))
+		{
+			if (!tmpitems)
+			{
+				/*
+				 * First TID to be deleted: allocate memory to hold the
+				 * remaining items.
+				 */
+				tmpitems = palloc(sizeof(ItemPointerData) * nitem);
+				memcpy(tmpitems, items, sizeof(ItemPointerData) * i);
+			}
+		}
+		else
+		{
+			if (tmpitems)
+				tmpitems[remaining] = items[i];
+			remaining++;
+		}
+	}
+
+	*nremaining = remaining;
+	return tmpitems;
+}
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 3db32e8..0428f04 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -29,6 +29,8 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 			 OffsetNumber offnum);
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
 			 OffsetNumber offnum, IndexTuple itup);
+static void _bt_savePostingitem(BTScanOpaque so, int itemIndex,
+			 OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
 static Buffer _bt_walk_left(Relation rel, Buffer buf);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
@@ -90,6 +92,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 		   Buffer *bufP, int access)
 {
 	BTStack		stack_in = NULL;
+	bool fakeupdposting = false; /* fake variable for _bt_binsrch */
 
 	/* Get the root page to start with */
 	*bufP = _bt_getroot(rel, access);
@@ -136,7 +139,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 		 * Find the appropriate item on the internal page, and get the child
 		 * page that it points to.
 		 */
-		offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey);
+		offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey, &fakeupdposting);
 		itemid = PageGetItemId(page, offnum);
 		itup = (IndexTuple) PageGetItem(page, itemid);
 		blkno = ItemPointerGetBlockNumber(&(itup->t_tid));
@@ -310,7 +313,8 @@ _bt_binsrch(Relation rel,
 			Buffer buf,
 			int keysz,
 			ScanKey scankey,
-			bool nextkey)
+			bool nextkey,
+			bool *updposing)
 {
 	Page		page;
 	BTPageOpaque opaque;
@@ -373,7 +377,17 @@ _bt_binsrch(Relation rel,
 	 * scan key), which could be the last slot + 1.
 	 */
 	if (P_ISLEAF(opaque))
+	{
+		if (low <= PageGetMaxOffsetNumber(page))
+		{
+			IndexTuple oitup = (IndexTuple) PageGetItem(page, PageGetItemId(page, low));
+			/* one excessive check of equality. for possible posting tuple update or creation */
+			if ((_bt_compare(rel, keysz, scankey, page, low) == 0)
+				&& (IndexTupleSize(oitup) + sizeof(ItemPointerData) < BTMaxItemSize(page)))
+				*updposing = true;
+		}
 		return low;
+	}
 
 	/*
 	 * On a non-leaf page, return the last key < scan key (resp. <= scan key).
@@ -536,6 +550,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	int			i;
 	StrategyNumber strat_total;
 	BTScanPosItem *currItem;
+	bool fakeupdposing = false; /* fake variable for _bt_binsrch */
 
 	Assert(!BTScanPosIsValid(so->currPos));
 
@@ -1003,7 +1018,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	so->markItemIndex = -1;		/* ditto */
 
 	/* position to the precise item on the page */
-	offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
+	offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey, &fakeupdposing);
 
 	/*
 	 * If nextkey = false, we are positioned at the first item >= scan key, or
@@ -1161,6 +1176,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 	int			itemIndex;
 	IndexTuple	itup;
 	bool		continuescan;
+	int i;
 
 	/*
 	 * We must have the buffer pinned and locked, but the usual macro can't be
@@ -1195,6 +1211,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 	/* initialize tuple workspace to empty */
 	so->currPos.nextTupleOffset = 0;
+	so->currPos.prevTupleOffset = 0;
 
 	/*
 	 * Now that the current page has been made consistent, the macro should be
@@ -1215,8 +1232,19 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			if (itup != NULL)
 			{
 				/* tuple passes all scan key conditions, so remember it */
-				_bt_saveitem(so, itemIndex, offnum, itup);
-				itemIndex++;
+				if (BtreeTupleIsPosting(itup))
+				{
+					for (i = 0; i < BtreeGetNPosting(itup); i++)
+					{
+						_bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+						itemIndex++;
+					}
+				}
+				else
+				{
+					_bt_saveitem(so, itemIndex, offnum, itup);
+					itemIndex++;
+				}
 			}
 			if (!continuescan)
 			{
@@ -1228,7 +1256,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			offnum = OffsetNumberNext(offnum);
 		}
 
-		Assert(itemIndex <= MaxIndexTuplesPerPage);
+		Assert(itemIndex <= MaxPackedIndexTuplesPerPage);
 		so->currPos.firstItem = 0;
 		so->currPos.lastItem = itemIndex - 1;
 		so->currPos.itemIndex = 0;
@@ -1236,7 +1264,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 	else
 	{
 		/* load items[] in descending order */
-		itemIndex = MaxIndexTuplesPerPage;
+		itemIndex = MaxPackedIndexTuplesPerPage;
 
 		offnum = Min(offnum, maxoff);
 
@@ -1246,8 +1274,20 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			if (itup != NULL)
 			{
 				/* tuple passes all scan key conditions, so remember it */
-				itemIndex--;
-				_bt_saveitem(so, itemIndex, offnum, itup);
+				if (BtreeTupleIsPosting(itup))
+				{
+					for (i = 0; i < BtreeGetNPosting(itup); i++)
+					{
+						itemIndex--;
+						_bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+					}
+				}
+				else
+				{
+					itemIndex--;
+					_bt_saveitem(so, itemIndex, offnum, itup);
+				}
+
 			}
 			if (!continuescan)
 			{
@@ -1261,8 +1301,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 		Assert(itemIndex >= 0);
 		so->currPos.firstItem = itemIndex;
-		so->currPos.lastItem = MaxIndexTuplesPerPage - 1;
-		so->currPos.itemIndex = MaxIndexTuplesPerPage - 1;
+		so->currPos.lastItem = MaxPackedIndexTuplesPerPage - 1;
+		so->currPos.itemIndex = MaxPackedIndexTuplesPerPage - 1;
 	}
 
 	return (so->currPos.firstItem <= so->currPos.lastItem);
@@ -1275,6 +1315,8 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
 {
 	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
 
+	Assert (!BtreeTupleIsPosting(itup));
+
 	currItem->heapTid = itup->t_tid;
 	currItem->indexOffset = offnum;
 	if (so->currTuples)
@@ -1288,6 +1330,37 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
 }
 
 /*
+ * Save an index item into so->currPos.items[itemIndex]
+ * Performing index-only scan, handle the first elem separately.
+ * Save the key once, and connect it with posting tids using tupleOffset.
+ */
+static void
+_bt_savePostingitem(BTScanOpaque so, int itemIndex,
+			 OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i)
+{
+	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+	currItem->heapTid = *iptr;
+	currItem->indexOffset = offnum;
+
+	if (so->currTuples)
+	{
+		if (i == 0)
+		{
+			/* save key. the same for all tuples in the posting */
+			Size		itupsz = BtreeGetPostingOffset(itup);
+			currItem->tupleOffset = so->currPos.nextTupleOffset;
+			memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+			so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+			so->currPos.prevTupleOffset = currItem->tupleOffset;
+		}
+		else
+			currItem->tupleOffset = so->currPos.prevTupleOffset;
+	}
+}
+
+
+/*
  *	_bt_steppage() -- Step to next page containing valid data for scan
  *
  * On entry, if so->currPos.buf is valid the buffer is pinned but not locked;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..e29d63f 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -75,6 +75,7 @@
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
 #include "utils/tuplesort.h"
+#include "catalog/catalog.h"
 
 
 /*
@@ -527,15 +528,120 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		Assert(last_off > P_FIRSTKEY);
 		ii = PageGetItemId(opage, last_off);
 		oitup = (IndexTuple) PageGetItem(opage, ii);
-		_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
 
 		/*
-		 * Move 'last' into the high key position on opage
+		 * If the item is PostingTuple, we can cut it.
+		 * Because HIKEY is not considered as real data, and it needn't to keep any ItemPointerData at all.
+		 * And of course it needn't to keep a list of ipd.
+		 * But, if it had a big posting list, there will be plenty of free space on the opage.
+		 * So we must split Posting tuple into 2 pieces.
 		 */
-		hii = PageGetItemId(opage, P_HIKEY);
-		*hii = *ii;
-		ItemIdSetUnused(ii);	/* redundant */
-		((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+		 if (BtreeTupleIsPosting(oitup))
+		 {
+			int nipd, ntocut, ntoleave;
+			Size keytupsz;
+			IndexTuple keytup;
+			nipd = BtreeGetNPosting(oitup);
+			ntocut = (sizeof(ItemIdData) + BtreeGetPostingOffset(oitup))/sizeof(ItemPointerData);
+			ntocut++; /* round up to be sure that we cut enough */
+			ntoleave = nipd - ntocut;
+
+			/*
+			 * 0) Form key tuple, that doesn't contain any ipd.
+			 * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+			 * any function that uses keytup should handle them itself.
+			 */
+			keytupsz =  BtreeGetPostingOffset(oitup);
+			keytup = palloc0(keytupsz);
+			memcpy (keytup, oitup, keytupsz);
+			keytup->t_info &= ~INDEX_SIZE_MASK;
+			keytup->t_info |= keytupsz;
+			ItemPointerSet(&(keytup->t_tid), oblkno, P_HIKEY);
+
+			if (ntocut < nipd)
+			{
+				ItemPointerData *newipd;
+				IndexTuple newitup, newlasttup;
+				/*
+				 * 1) Cut part of old tuple to shift to npage.
+				 * And insert it as P_FIRSTKEY.
+				 * This tuple is based on keytup.
+				 * Blkno & offnum are reset in BtreeFormPackedTuple.
+				 */
+				newipd = palloc0(sizeof(ItemPointerData)*ntocut);
+				/* Note, that we cut last 'ntocut' items */
+				memcpy(newipd, BtreeGetPosting(oitup)+ntoleave, sizeof(ItemPointerData)*ntocut);
+				newitup = BtreeFormPackedTuple(keytup, newipd, ntocut);
+
+				_bt_sortaddtup(npage, IndexTupleSize(newitup), newitup, P_FIRSTKEY);
+				pfree(newipd);
+				pfree(newitup);
+
+				/*
+				 * 2) set last item to the P_HIKEY linp
+				 * Move 'last' into the high key position on opage
+				 * NOTE: Do this because of indextuple deletion algorithm, which
+				 * doesn't allow to delete an item while we have unused one before it.
+				 */
+				hii = PageGetItemId(opage, P_HIKEY);
+				*hii = *ii;
+				ItemIdSetUnused(ii);	/* redundant */
+				((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+				/* 3) delete "wrong" high key */
+				PageIndexTupleDelete(opage, P_HIKEY);
+
+				/* 4)Insert keytup as P_HIKEY. */
+				_bt_sortaddtup(opage, IndexTupleSize(keytup), keytup,  P_HIKEY);
+
+				/* 5) form the part of old tuple with ntoleave ipds. And insert it as last tuple. */
+				newlasttup = BtreeFormPackedTuple(keytup, BtreeGetPosting(oitup), ntoleave);
+
+				_bt_sortaddtup(opage, IndexTupleSize(newlasttup), newlasttup, PageGetMaxOffsetNumber(opage)+1);
+
+				pfree(newlasttup);
+			}
+			else
+			{
+				/* The tuple isn't big enough to split it. Handle it as a normal tuple. */
+
+				/*
+				 * 1) Shift the last tuple to npage.
+				 * Insert it as P_FIRSTKEY.
+				 */
+				_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+				/* 2) set last item to the P_HIKEY linp */
+				/* Move 'last' into the high key position on opage */
+				hii = PageGetItemId(opage, P_HIKEY);
+				*hii = *ii;
+				ItemIdSetUnused(ii);	/* redundant */
+				((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+				/* 3) delete "wrong" high key */
+				PageIndexTupleDelete(opage, P_HIKEY);
+
+				/* 4)Insert keytup as P_HIKEY. */
+				_bt_sortaddtup(opage, IndexTupleSize(keytup), keytup,  P_HIKEY);
+
+			}
+			pfree(keytup);
+		 }
+		 else
+		 {
+			/*
+			 * 1) Shift the last tuple to npage.
+			 * Insert it as P_FIRSTKEY.
+			 */
+			_bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+			/* 2) set last item to the P_HIKEY linp */
+			/* Move 'last' into the high key position on opage */
+			hii = PageGetItemId(opage, P_HIKEY);
+			*hii = *ii;
+			ItemIdSetUnused(ii);	/* redundant */
+			((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+		}
 
 		/*
 		 * Link the old page into its parent, using its minimum key. If we
@@ -547,6 +653,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 
 		Assert(state->btps_minkey != NULL);
 		ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY);
+
 		_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
 		pfree(state->btps_minkey);
 
@@ -555,7 +662,9 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		 * it off the old page, not the new one, in case we are not at leaf
 		 * level.
 		 */
-		state->btps_minkey = CopyIndexTuple(oitup);
+		ItemId iihk = PageGetItemId(opage, P_HIKEY);
+		IndexTuple hikey = (IndexTuple) PageGetItem(opage, iihk);
+		state->btps_minkey = CopyIndexTuple(hikey);
 
 		/*
 		 * Set the sibling links for both pages.
@@ -590,7 +699,29 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 	if (last_off == P_HIKEY)
 	{
 		Assert(state->btps_minkey == NULL);
-		state->btps_minkey = CopyIndexTuple(itup);
+
+		if (BtreeTupleIsPosting(itup))
+		{
+			Size keytupsz;
+			IndexTuple keytup;
+
+			/*
+			 * 0) Form key tuple, that doesn't contain any ipd.
+			 * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+			 * any function that uses keytup should handle them itself.
+			 */
+			keytupsz =  BtreeGetPostingOffset(itup);
+			keytup = palloc0(keytupsz);
+			memcpy (keytup, itup, keytupsz);
+
+			keytup->t_info &= ~INDEX_SIZE_MASK;
+			keytup->t_info |= keytupsz;
+			ItemPointerSet(&(keytup->t_tid), nblkno, P_HIKEY);
+
+			state->btps_minkey = CopyIndexTuple(keytup);
+		}
+		else
+			state->btps_minkey = CopyIndexTuple(itup);
 	}
 
 	/*
@@ -670,6 +801,67 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 }
 
 /*
+ * Prepare SortSupport structure for indextuples comparison
+ */
+SortSupport
+_bt_prepare_SortSupport(BTWriteState *wstate, int keysz)
+{
+	/* Prepare SortSupport data for each column */
+	ScanKey		indexScanKey = _bt_mkscankey_nodata(wstate->index);
+	SortSupport sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
+	int i;
+
+	for (i = 0; i < keysz; i++)
+	{
+		SortSupport sortKey = sortKeys + i;
+		ScanKey		scanKey = indexScanKey + i;
+		int16		strategy;
+
+		sortKey->ssup_cxt = CurrentMemoryContext;
+		sortKey->ssup_collation = scanKey->sk_collation;
+		sortKey->ssup_nulls_first =
+			(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+		sortKey->ssup_attno = scanKey->sk_attno;
+		/* Abbreviation is not supported here */
+		sortKey->abbreviate = false;
+
+		AssertState(sortKey->ssup_attno != 0);
+
+		strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+			BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+		PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
+	}
+
+	_bt_freeskey(indexScanKey);
+	return sortKeys;
+}
+
+/*
+ * Compare two tuples using sortKey i
+ */
+int _bt_call_comparator(SortSupport sortKeys, int i,
+						 IndexTuple itup, IndexTuple itup2, TupleDesc tupdes)
+{
+		SortSupport entry;
+		Datum		attrDatum1,
+					attrDatum2;
+		bool		isNull1,
+					isNull2;
+		int32		compare;
+
+		entry = sortKeys + i - 1;
+		attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
+		attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
+
+		compare = ApplySortComparator(attrDatum1, isNull1,
+										attrDatum2, isNull2,
+										entry);
+
+		return compare;
+}
+
+/*
  * Read tuples in correct sort order from tuplesort, and load them into
  * btree leaves.
  */
@@ -679,16 +871,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	BTPageState *state = NULL;
 	bool		merge = (btspool2 != NULL);
 	IndexTuple	itup,
-				itup2 = NULL;
+				itup2 = NULL,
+				itupprev = NULL;
 	bool		should_free,
 				should_free2,
 				load1;
 	TupleDesc	tupdes = RelationGetDescr(wstate->index);
 	int			i,
 				keysz = RelationGetNumberOfAttributes(wstate->index);
-	ScanKey		indexScanKey = NULL;
+	int			ntuples = 0;
 	SortSupport sortKeys;
 
+	/* Prepare SortSupport data */
+	sortKeys = (SortSupport)_bt_prepare_SortSupport(wstate, keysz);
+
 	if (merge)
 	{
 		/*
@@ -701,34 +897,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 									   true, &should_free);
 		itup2 = tuplesort_getindextuple(btspool2->sortstate,
 										true, &should_free2);
-		indexScanKey = _bt_mkscankey_nodata(wstate->index);
-
-		/* Prepare SortSupport data for each column */
-		sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
-
-		for (i = 0; i < keysz; i++)
-		{
-			SortSupport sortKey = sortKeys + i;
-			ScanKey		scanKey = indexScanKey + i;
-			int16		strategy;
-
-			sortKey->ssup_cxt = CurrentMemoryContext;
-			sortKey->ssup_collation = scanKey->sk_collation;
-			sortKey->ssup_nulls_first =
-				(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
-			sortKey->ssup_attno = scanKey->sk_attno;
-			/* Abbreviation is not supported here */
-			sortKey->abbreviate = false;
-
-			AssertState(sortKey->ssup_attno != 0);
-
-			strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
-				BTGreaterStrategyNumber : BTLessStrategyNumber;
-
-			PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
-		}
-
-		_bt_freeskey(indexScanKey);
 
 		for (;;)
 		{
@@ -742,20 +910,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 			{
 				for (i = 1; i <= keysz; i++)
 				{
-					SortSupport entry;
-					Datum		attrDatum1,
-								attrDatum2;
-					bool		isNull1,
-								isNull2;
-					int32		compare;
-
-					entry = sortKeys + i - 1;
-					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
-					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
-
-					compare = ApplySortComparator(attrDatum1, isNull1,
-												  attrDatum2, isNull2,
-												  entry);
+					int32 compare = _bt_call_comparator(sortKeys, i, itup, itup2, tupdes);
+
 					if (compare > 0)
 					{
 						load1 = false;
@@ -794,19 +950,137 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	else
 	{
 		/* merge is unnecessary */
-		while ((itup = tuplesort_getindextuple(btspool->sortstate,
+
+		Relation indexRelation = wstate->index;
+		Form_pg_index index = indexRelation->rd_index;
+
+		if (index->indisunique)
+		{
+			/* Do not use compression for unique indexes. */
+			while ((itup = tuplesort_getindextuple(btspool->sortstate,
 											   true, &should_free)) != NULL)
+			{
+				/* When we see first tuple, create first index page */
+				if (state == NULL)
+					state = _bt_pagestate(wstate, 0);
+
+				_bt_buildadd(wstate, state, itup);
+				if (should_free)
+					pfree(itup);
+			}
+		}
+		else
 		{
-			/* When we see first tuple, create first index page */
-			if (state == NULL)
-				state = _bt_pagestate(wstate, 0);
+			ItemPointerData *ipd = NULL;
+			IndexTuple 		postingtuple;
+			Size			maxitemsize = 0,
+							maxpostingsize = 0;
+			int32 			compare = 0;
 
-			_bt_buildadd(wstate, state, itup);
-			if (should_free)
-				pfree(itup);
+			while ((itup = tuplesort_getindextuple(btspool->sortstate,
+											   true, &should_free)) != NULL)
+			{
+				/* When we see first tuple, create first index page */
+				if (state == NULL)
+				{
+					state = _bt_pagestate(wstate, 0);
+					maxitemsize = BTMaxItemSize(state->btps_page);
+				}
+
+				/*
+				 * Compare current tuple with previous one.
+				 * If tuples are equal, we can unite them into a posting list.
+				 */
+				if (itupprev != NULL)
+				{
+					/* compare tuples */
+					compare = 0;
+					for (i = 1; i <= keysz; i++)
+					{
+						compare = _bt_call_comparator(sortKeys, i, itup, itupprev, tupdes);
+						if (compare != 0)
+							break;
+					}
+
+					if (compare == 0)
+					{
+						/* Tuples are equal. Create or update posting */
+						if (ntuples == 0)
+						{
+							/*
+							 * We haven't suitable posting list yet, so allocate
+							 * it and save both itupprev and current tuple.
+							 */
+
+							ipd = palloc0(maxitemsize);
+
+							memcpy(ipd, itupprev, sizeof(ItemPointerData));
+							ntuples++;
+							memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+							ntuples++;
+						}
+						else
+						{
+							if ((ntuples+1)*sizeof(ItemPointerData) < maxpostingsize)
+							{
+								memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+								ntuples++;
+							}
+							else
+							{
+								postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+								_bt_buildadd(wstate, state, postingtuple);
+								ntuples = 0;
+								pfree(ipd);
+							}
+						}
+
+					}
+					else
+					{
+						/* Tuples aren't equal. Insert itupprev into index. */
+						if (ntuples == 0)
+							_bt_buildadd(wstate, state, itupprev);
+						else
+						{
+							postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+							_bt_buildadd(wstate, state, postingtuple);
+							ntuples = 0;
+							pfree(ipd);
+						}
+					}
+				}
+
+				/*
+				 * Copy the tuple into temp variable itupprev
+				 * to compare it with the following tuple
+				 * and maybe unite them into a posting tuple
+				 */
+				itupprev = CopyIndexTuple(itup);
+				if (should_free)
+					pfree(itup);
+
+				/* compute max size of ipd list */
+				maxpostingsize = maxitemsize - IndexInfoFindDataOffset(itupprev->t_info) - MAXALIGN(IndexTupleSize(itupprev));
+			}
+
+			/* Handle the last item.*/
+			if (ntuples == 0)
+			{
+				if (itupprev != NULL)
+					_bt_buildadd(wstate, state, itupprev);
+			}
+			else
+			{
+				Assert(ipd!=NULL);
+				Assert(itupprev != NULL);
+				postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+				_bt_buildadd(wstate, state, postingtuple);
+				ntuples = 0;
+				pfree(ipd);
+			}
 		}
 	}
-
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
 
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index c850b48..0291342 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1821,7 +1821,9 @@ _bt_killitems(IndexScanDesc scan)
 			ItemId		iid = PageGetItemId(page, offnum);
 			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
 
-			if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
+			/* No microvacuum for posting tuples */
+			if (!BtreeTupleIsPosting(ituple)
+				&& (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid)))
 			{
 				/* found the item */
 				ItemIdMarkDead(iid);
@@ -2063,3 +2065,71 @@ btoptions(Datum reloptions, bool validate)
 {
 	return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
 }
+
+
+/*
+ * Already have basic index tuple that contains key datum
+ */
+IndexTuple
+BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+	int i;
+	uint32		newsize;
+	IndexTuple itup = CopyIndexTuple(tuple);
+
+	/*
+	 * Determine and store offset to the posting list.
+	 */
+	newsize = IndexTupleSize(itup);
+	newsize = SHORTALIGN(newsize);
+
+	/*
+	 * Set meta info about the posting list.
+	 */
+	BtreeSetPostingOffset(itup, newsize);
+	BtreeSetNPosting(itup, nipd);
+	/*
+	 * Add space needed for posting list, if any.  Then check that the tuple
+	 * won't be too big to store.
+	 */
+	newsize += sizeof(ItemPointerData)*nipd;
+	newsize = MAXALIGN(newsize);
+
+	/*
+	 * Resize tuple if needed
+	 */
+	if (newsize != IndexTupleSize(itup))
+	{
+		itup = repalloc(itup, newsize);
+
+		/*
+		 * PostgreSQL 9.3 and earlier did not clear this new space, so we
+		 * might find uninitialized padding when reading tuples from disk.
+		 */
+		memset((char *) itup + IndexTupleSize(itup),
+			   0, newsize - IndexTupleSize(itup));
+		/* set new size in tuple header */
+		itup->t_info &= ~INDEX_SIZE_MASK;
+		itup->t_info |= newsize;
+	}
+
+	/*
+	 * Copy data into the posting tuple
+	 */
+	memcpy(BtreeGetPosting(itup), data, sizeof(ItemPointerData)*nipd);
+	return itup;
+}
+
+IndexTuple
+BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+	int size;
+	if (BtreeTupleIsPosting(tuple))
+	{
+		size = BtreeGetPostingOffset(tuple);
+		tuple->t_info &= ~INDEX_SIZE_MASK;
+		tuple->t_info |= size;
+	}
+
+	return BtreeFormPackedTuple(tuple, data, nipd);
+}
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..eb4467a 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -137,7 +137,12 @@ typedef IndexAttributeBitMapData *IndexAttributeBitMap;
 #define MaxIndexTuplesPerPage	\
 	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
 			(MAXALIGN(sizeof(IndexTupleData) + 1) + sizeof(ItemIdData))))
-
+#define MaxPackedIndexTuplesPerPage	\
+	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
+			(sizeof(ItemPointerData))))
+// #define MaxIndexTuplesPerPage	\
+// 	((int) ((BLCKSZ - SizeOfPageHeaderData) / \
+// 			(sizeof(ItemPointerData))))
 
 /* routines in indextuple.c */
 extern IndexTuple index_form_tuple(TupleDesc tupleDescriptor,
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 06822fa..41e407d 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -75,6 +75,7 @@ typedef BTPageOpaqueData *BTPageOpaque;
 #define BTP_SPLIT_END	(1 << 5)	/* rightmost page of split group */
 #define BTP_HAS_GARBAGE (1 << 6)	/* page has LP_DEAD tuples */
 #define BTP_INCOMPLETE_SPLIT (1 << 7)	/* right sibling's downlink is missing */
+#define BTP_HAS_POSTING (1 << 8)		/* page contains compressed duplicates (only for leaf pages) */
 
 /*
  * The max allowed value of a cycle ID is a bit less than 64K.  This is
@@ -181,6 +182,8 @@ typedef struct BTMetaPageData
 #define P_IGNORE(opaque)		((opaque)->btpo_flags & (BTP_DELETED|BTP_HALF_DEAD))
 #define P_HAS_GARBAGE(opaque)	((opaque)->btpo_flags & BTP_HAS_GARBAGE)
 #define P_INCOMPLETE_SPLIT(opaque)	((opaque)->btpo_flags & BTP_INCOMPLETE_SPLIT)
+#define P_HAS_POSTING(opaque)		((opaque)->btpo_flags & BTP_HAS_POSTING)
+
 
 /*
  *	Lehman and Yao's algorithm requires a ``high key'' on every non-rightmost
@@ -538,6 +541,8 @@ typedef struct BTScanPosData
 	 * location in the associated tuple storage workspace.
 	 */
 	int			nextTupleOffset;
+	/* prevTupleOffset is for Posting list handling*/
+	int			prevTupleOffset;
 
 	/*
 	 * The items array is always ordered in index order (ie, increasing
@@ -550,7 +555,7 @@ typedef struct BTScanPosData
 	int			lastItem;		/* last valid index in items[] */
 	int			itemIndex;		/* current index in items[] */
 
-	BTScanPosItem items[MaxIndexTuplesPerPage]; /* MUST BE LAST */
+	BTScanPosItem items[MaxPackedIndexTuplesPerPage]; /* MUST BE LAST */
 } BTScanPosData;
 
 typedef BTScanPosData *BTScanPos;
@@ -651,6 +656,28 @@ typedef BTScanOpaqueData *BTScanOpaque;
 #define SK_BT_DESC			(INDOPTION_DESC << SK_BT_INDOPTION_SHIFT)
 #define SK_BT_NULLS_FIRST	(INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
 
+
+/*
+ * We use our own ItemPointerGet(BlockNumber|OffsetNumber)
+ * to avoid Asserts, since sometimes the ip_posid isn't "valid"
+ */
+#define BtreeItemPointerGetBlockNumber(pointer) \
+	BlockIdGetBlockNumber(&(pointer)->ip_blkid)
+
+#define BtreeItemPointerGetOffsetNumber(pointer) \
+	((pointer)->ip_posid)
+
+#define BT_POSTING (1<<31)
+#define BtreeGetNPosting(itup)			BtreeItemPointerGetOffsetNumber(&(itup)->t_tid)
+#define BtreeSetNPosting(itup,n)		ItemPointerSetOffsetNumber(&(itup)->t_tid,n)
+
+#define BtreeGetPostingOffset(itup)		(BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & (~BT_POSTING))
+#define BtreeSetPostingOffset(itup,n)	ItemPointerSetBlockNumber(&(itup)->t_tid,(n)|BT_POSTING)
+#define BtreeTupleIsPosting(itup)    	(BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & BT_POSTING)
+#define BtreeGetPosting(itup)			(ItemPointerData*) ((char*)(itup) + BtreeGetPostingOffset(itup))
+#define BtreeGetPostingN(itup,n)		(ItemPointerData*) (BtreeGetPosting(itup) + n)
+
+
 /*
  * prototypes for functions in nbtree.c (external entry points for btree)
  */
@@ -715,8 +742,8 @@ extern BTStack _bt_search(Relation rel,
 extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
 			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
 			  int access);
-extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
-			ScanKey scankey, bool nextkey);
+extern OffsetNumber _bt_binsrch( Relation rel, Buffer buf, int keysz,
+								ScanKey scankey, bool nextkey, bool* updposting);
 extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
 			Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
@@ -747,6 +774,8 @@ extern void _bt_end_vacuum_callback(int code, Datum arg);
 extern Size BTreeShmemSize(void);
 extern void BTreeShmemInit(void);
 extern bytea *btoptions(Datum reloptions, bool validate);
+extern IndexTuple BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
+extern IndexTuple BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
 
 /*
  * prototypes for functions in nbtvalidate.c
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to