Please, find the new version of the patch attached. Now it has WAL
functionality.
Detailed description of the feature you can find in README draft
https://goo.gl/50O8Q0
This patch is pretty complicated, so I ask everyone, who interested in
this feature,
to help with reviewing and testing it. I will be grateful for any feedback.
But please, don't complain about code style, it is still work in progress.
Next things I'm going to do:
1. More debugging and testing. I'm going to attach in next message
couple of sql scripts for testing.
2. Fix NULLs processing
3. Add a flag into pg_index, that allows to enable/disable compression
for each particular index.
4. Recheck locking considerations. I tried to write code as less
invasive as possible, but we need to make sure that algorithm is still
correct.
5. Change BTMaxItemSize
6. Bring back microvacuum functionality.
--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e3c55eb..72acc0f 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -24,6 +24,8 @@
#include "storage/predicate.h"
#include "utils/tqual.h"
+#include "catalog/catalog.h"
+#include "utils/datum.h"
typedef struct
{
@@ -82,6 +84,7 @@ static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
+
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
@@ -113,6 +116,11 @@ _bt_doinsert(Relation rel, IndexTuple itup,
BTStack stack;
Buffer buf;
OffsetNumber offset;
+ Page page;
+ TupleDesc itupdesc;
+ int nipd;
+ IndexTuple olditup;
+ Size sizetoadd;
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
@@ -190,6 +198,7 @@ top:
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
+ bool updposting = false;
/*
* The only conflict predicate locking cares about for indexes is when
* an index tuple insert conflicts with an existing lock. Since the
@@ -201,7 +210,42 @@ top:
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
stack, heapRel);
- _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+
+ /*
+ * Decide, whether we can apply compression
+ */
+ page = BufferGetPage(buf);
+
+ if(!IsSystemRelation(rel)
+ && !rel->rd_index->indisunique
+ && offset != InvalidOffsetNumber
+ && offset <= PageGetMaxOffsetNumber(page))
+ {
+ itupdesc = RelationGetDescr(rel);
+ sizetoadd = sizeof(ItemPointerData);
+ olditup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
+
+ if(_bt_isbinaryequal(itupdesc, olditup,
+ rel->rd_index->indnatts, itup))
+ {
+ if (!BtreeTupleIsPosting(olditup))
+ {
+ nipd = 1;
+ sizetoadd = sizetoadd*2;
+ }
+ else
+ nipd = BtreeGetNPosting(olditup);
+
+ if ((IndexTupleSize(olditup) + sizetoadd) <= BTMaxItemSize(page)
+ && PageGetFreeSpace(page) > sizetoadd)
+ updposting = true;
+ }
+ }
+
+ if (updposting)
+ _bt_pgupdtup(rel, buf, offset, itup, olditup, nipd);
+ else
+ _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
else
{
@@ -1042,6 +1086,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
+
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
false, false) == InvalidOffsetNumber)
{
@@ -1072,13 +1117,39 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
}
- if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+
+ if (BtreeTupleIsPosting(item))
+ {
+ Size hikeysize = BtreeGetPostingOffset(item);
+ IndexTuple hikey = palloc0(hikeysize);
+
+ /* Truncate posting before insert it as a hikey. */
+ memcpy (hikey, item, hikeysize);
+ hikey->t_info &= ~INDEX_SIZE_MASK;
+ hikey->t_info |= hikeysize;
+ ItemPointerSet(&(hikey->t_tid), origpagenumber, P_HIKEY);
+
+ if (PageAddItem(leftpage, (Item) hikey, hikeysize, leftoff,
false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
+
+ pfree(hikey);
+ }
+ else
{
- memset(rightpage, 0, BufferGetPageSize(rbuf));
- elog(ERROR, "failed to add hikey to the left sibling"
- " while splitting block %u of index \"%s\"",
- origpagenumber, RelationGetRelationName(rel));
+ if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+ false, false) == InvalidOffsetNumber)
+ {
+ memset(rightpage, 0, BufferGetPageSize(rbuf));
+ elog(ERROR, "failed to add hikey to the left sibling"
+ " while splitting block %u of index \"%s\"",
+ origpagenumber, RelationGetRelationName(rel));
+ }
}
leftoff = OffsetNumberNext(leftoff);
@@ -2103,6 +2174,120 @@ _bt_pgaddtup(Page page,
}
/*
+ * _bt_pgupdtup() -- update a tuple in place.
+ * This function is used for purposes of deduplication of item pointers.
+ *
+ * If new tuple to insert is equal to the tuple that already exists
+ * on the page, we can avoid key insertion and just add new item pointer.
+ *
+ * offset is the position of olditup on the page.
+ * itup is the new tuple to insert.
+ * olditup is the old tuple itself.
+ * nipd is the number of item pointers in old tuple.
+ * The caller is responsible for checking of free space on the page.
+ */
+void
+_bt_pgupdtup(Relation rel, Buffer buf, OffsetNumber offset, IndexTuple itup,
+ IndexTuple olditup, int nipd)
+{
+ ItemPointerData *ipd;
+ IndexTuple newitup;
+ Size newitupsz;
+ Page page;
+
+ page = BufferGetPage(buf);
+
+ ipd = palloc0(sizeof(ItemPointerData)*(nipd + 1));
+
+ /* copy item pointers from old tuple into ipd */
+ if (BtreeTupleIsPosting(olditup))
+ memcpy(ipd, BtreeGetPosting(olditup), sizeof(ItemPointerData)*nipd);
+ else
+ memcpy(ipd, olditup, sizeof(ItemPointerData));
+
+ /* add item pointer of the new tuple into ipd */
+ memcpy(ipd+nipd, itup, sizeof(ItemPointerData));
+
+ newitup = BtreeReformPackedTuple(itup, ipd, nipd+1);
+
+ /*
+ * Update the tuple in place. We have already checked that the
+ * new tuple would fit into this page, so it's safe to delete
+ * old tuple and insert the new one without any side effects.
+ */
+ newitupsz = IndexTupleDSize(*newitup);
+ newitupsz = MAXALIGN(newitupsz);
+
+
+ START_CRIT_SECTION();
+
+ PageIndexTupleDelete(page, offset);
+
+ if (!_bt_pgaddtup(page, newitupsz, newitup, offset))
+ elog(ERROR, "failed to insert compressed item in index \"%s\"",
+ RelationGetRelationName(rel));
+
+ MarkBufferDirty(buf);
+
+ /* Xlog stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_btree_insert xlrec;
+ uint8 xlinfo;
+ XLogRecPtr recptr;
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ xlrec.offnum = offset;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfBtreeInsert);
+
+ /* TODO add some Xlog stuff for inner pages?
+ * Don't sure if we really need it?*/
+ Assert(P_ISLEAF(pageop));
+ xlinfo = XLOG_BTREE_UPDATE_TUPLE;
+
+ /* Read comments in _bt_pgaddtup */
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+
+ XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));
+
+ recptr = XLogInsert(RM_BTREE_ID, xlinfo);
+
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ pfree(ipd);
+ pfree(newitup);
+ _bt_relbuf(rel, buf);
+}
+
+/*
+ * _bt_pgrewritetup() -- update a tuple in place.
+ * This function is used for handling compressed tuples.
+ * It is used to update compressed tuple after vacuuming.
+ * and to rewirite hikey while building index.
+ * offset is the position of olditup on the page.
+ * itup is the new tuple to insert
+ * The caller is responsible for checking of free space on the page.
+ */
+void
+_bt_pgrewritetup(Relation rel, Buffer buf, Page page, OffsetNumber offset, IndexTuple itup)
+{
+ START_CRIT_SECTION();
+
+ PageIndexTupleDelete(page, offset);
+
+ if (!_bt_pgaddtup(page, IndexTupleSize(itup), itup, offset))
+ elog(ERROR, "failed to rewrite compressed item in index \"%s\"",
+ RelationGetRelationName(rel));
+
+ END_CRIT_SECTION();
+}
+
+/*
* _bt_isequal - used in _bt_doinsert in check for duplicates.
*
* This is very similar to _bt_compare, except for NULL handling.
@@ -2151,6 +2336,63 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
}
/*
+ * _bt_isbinaryequal - used in _bt_doinsert and _bt_load
+ * in check for duplicates. This is very similar to heap_tuple_attr_equals
+ * subroutine. And this function differs from _bt_isequal
+ * because here we require strict binary equality of tuples.
+ */
+bool
+_bt_isbinaryequal(TupleDesc itupdesc, IndexTuple itup,
+ int nindatts, IndexTuple ituptoinsert)
+{
+ AttrNumber attno;
+
+ for (attno = 1; attno <= nindatts; attno++)
+ {
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+ Form_pg_attribute att;
+
+ datum1 = index_getattr(itup, attno, itupdesc, &isnull1);
+ datum2 = index_getattr(ituptoinsert, attno, itupdesc, &isnull2);
+
+ /*
+ * If one value is NULL and other is not, then they are certainly not
+ * equal
+ */
+ if (isnull1 != isnull2)
+ return false;
+ /*
+ * We do simple binary comparison of the two datums. This may be overly
+ * strict because there can be multiple binary representations for the
+ * same logical value. But we should be OK as long as there are no false
+ * positives. Using a type-specific equality operator is messy because
+ * there could be multiple notions of equality in different operator
+ * classes; furthermore, we cannot safely invoke user-defined functions
+ * while holding exclusive buffer lock.
+ */
+ if (attno <= 0)
+ {
+ /* The only allowed system columns are OIDs, so do this */
+ if (DatumGetObjectId(datum1) != DatumGetObjectId(datum2))
+ return false;
+ }
+ else
+ {
+ Assert(attno <= itupdesc->natts);
+ att = itupdesc->attrs[attno - 1];
+ if(!datumIsEqual(datum1, datum2, att->attbyval, att->attlen))
+ return false;
+ }
+ }
+
+ /* if we get here, the keys are equal */
+ return true;
+}
+
+/*
* _bt_vacuum_one_page - vacuum just one index page.
*
* Try to remove LP_DEAD items from the given page. The passed buffer
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 67755d7..53c30d2 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -787,15 +787,36 @@ _bt_page_recyclable(Page page)
void
_bt_delitems_vacuum(Relation rel, Buffer buf,
OffsetNumber *itemnos, int nitems,
+ OffsetNumber *remainingoffset, IndexTuple *remaining, int nremaining,
BlockNumber lastBlockVacuumed)
{
Page page = BufferGetPage(buf);
BTPageOpaque opaque;
+ int i;
+ Size itemsz;
/* No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
- /* Fix the page */
+ /* Handle compressed tuples here. */
+ for (i = 0; i < nremaining; i++)
+ {
+ /* At first, delete the old tuple.*/
+ PageIndexTupleDelete(page, remainingoffset[i]);
+
+ itemsz = IndexTupleSize(remaining[i]);
+ itemsz = MAXALIGN(itemsz);
+
+ /* Add tuple with remaining ItemPointers to the page.*/
+ if (PageAddItem(page, (Item) remaining[i], itemsz, remainingoffset[i],
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to rewrite compressed item in index while doing vacuum");
+ }
+
+ /* Fix the page.
+ * After dealing with posting tuples,
+ * just delete all tuples to be deleted.
+ */
if (nitems > 0)
PageIndexMultiDelete(page, itemnos, nitems);
@@ -824,12 +845,28 @@ _bt_delitems_vacuum(Relation rel, Buffer buf,
xl_btree_vacuum xlrec_vacuum;
xlrec_vacuum.lastBlockVacuumed = lastBlockVacuumed;
+ xlrec_vacuum.nremaining = nremaining;
+ xlrec_vacuum.ndeleted = nitems;
XLogBeginInsert();
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterData((char *) &xlrec_vacuum, SizeOfBtreeVacuum);
/*
+ * Here we should save offnums and remaining tuples themselves.
+ * It's important to restore them in correct order.
+ * At first, we must handle remaining tuples and only after that
+ * other deleted items.
+ */
+ if (nremaining > 0)
+ {
+ int i;
+ XLogRegisterBufData(0, (char *) remainingoffset, nremaining * sizeof(OffsetNumber));
+ for (i = 0; i < nremaining; i++)
+ XLogRegisterBufData(0, (char *) remaining[i], IndexTupleSize(remaining[i]));
+ }
+
+ /*
* The target-offsets array is not in the buffer, but pretend that it
* is. When XLogInsert stores the whole buffer, the offsets array
* need not be stored too.
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f2905cb..39e125f 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -74,7 +74,8 @@ static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
BTCycleId cycleid);
static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
BlockNumber orig_blkno);
-
+static ItemPointer btreevacuumPosting(BTVacState *vstate,
+ ItemPointerData *items,int nitem, int *nremaining);
/*
* Btree handler function: return IndexAmRoutine with access method parameters
@@ -861,7 +862,7 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
RBM_NORMAL, info->strategy);
LockBufferForCleanup(buf);
_bt_checkpage(rel, buf);
- _bt_delitems_vacuum(rel, buf, NULL, 0, vstate.lastBlockVacuumed);
+ _bt_delitems_vacuum(rel, buf, NULL, 0, NULL, NULL, 0, vstate.lastBlockVacuumed);
_bt_relbuf(rel, buf);
}
@@ -962,6 +963,9 @@ restart:
OffsetNumber offnum,
minoff,
maxoff;
+ IndexTuple remaining[MaxOffsetNumber];
+ OffsetNumber remainingoffset[MaxOffsetNumber];
+ int nremaining;
/*
* Trade in the initial read lock for a super-exclusive write lock on
@@ -998,6 +1002,7 @@ restart:
* callback function.
*/
ndeletable = 0;
+ nremaining = 0;
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
if (callback)
@@ -1011,31 +1016,75 @@ restart:
itup = (IndexTuple) PageGetItem(page,
PageGetItemId(page, offnum));
- htup = &(itup->t_tid);
-
- /*
- * During Hot Standby we currently assume that
- * XLOG_BTREE_VACUUM records do not produce conflicts. That is
- * only true as long as the callback function depends only
- * upon whether the index tuple refers to heap tuples removed
- * in the initial heap scan. When vacuum starts it derives a
- * value of OldestXmin. Backends taking later snapshots could
- * have a RecentGlobalXmin with a later xid than the vacuum's
- * OldestXmin, so it is possible that row versions deleted
- * after OldestXmin could be marked as killed by other
- * backends. The callback function *could* look at the index
- * tuple state in isolation and decide to delete the index
- * tuple, though currently it does not. If it ever did, we
- * would need to reconsider whether XLOG_BTREE_VACUUM records
- * should cause conflicts. If they did cause conflicts they
- * would be fairly harsh conflicts, since we haven't yet
- * worked out a way to pass a useful value for
- * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
- * applies to *any* type of index that marks index tuples as
- * killed.
- */
- if (callback(htup, callback_state))
- deletable[ndeletable++] = offnum;
+ if(BtreeTupleIsPosting(itup))
+ {
+ ItemPointer newipd;
+ int nipd,
+ nnewipd;
+
+ nipd = BtreeGetNPosting(itup);
+
+ /*
+ * Delete from the posting list all ItemPointers
+ * which are no more valid. newipd contains list of remainig
+ * ItemPointers or NULL if none of the items need to be removed.
+ */
+ newipd = btreevacuumPosting(vstate, BtreeGetPosting(itup), nipd, &nnewipd);
+
+ if (newipd != NULL)
+ {
+ if (nnewipd > 0)
+ {
+ /*
+ * There are still some live tuples in the posting.
+ * We should update this tuple in place. It'll be done later
+ * in _bt_delitems_vacuum(). To do that we need to save
+ * information about the tuple. remainingoffset - offset of the
+ * old tuple to be deleted. And new tuple to insert on the same
+ * position, which contains remaining ItemPointers.
+ */
+ remainingoffset[nremaining] = offnum;
+ remaining[nremaining] = BtreeReformPackedTuple(itup, newipd, nnewipd);
+ nremaining++;
+ }
+ else
+ {
+ /*
+ * If all ItemPointers should be deleted,
+ * we can delete this tuple in a regular way.
+ */
+ deletable[ndeletable++] = offnum;
+ }
+ }
+ }
+ else
+ {
+ htup = &(itup->t_tid);
+
+ /*
+ * During Hot Standby we currently assume that
+ * XLOG_BTREE_VACUUM records do not produce conflicts. That is
+ * only true as long as the callback function depends only
+ * upon whether the index tuple refers to heap tuples removed
+ * in the initial heap scan. When vacuum starts it derives a
+ * value of OldestXmin. Backends taking later snapshots could
+ * have a RecentGlobalXmin with a later xid than the vacuum's
+ * OldestXmin, so it is possible that row versions deleted
+ * after OldestXmin could be marked as killed by other
+ * backends. The callback function *could* look at the index
+ * tuple state in isolation and decide to delete the index
+ * tuple, though currently it does not. If it ever did, we
+ * would need to reconsider whether XLOG_BTREE_VACUUM records
+ * should cause conflicts. If they did cause conflicts they
+ * would be fairly harsh conflicts, since we haven't yet
+ * worked out a way to pass a useful value for
+ * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
+ * applies to *any* type of index that marks index tuples as
+ * killed.
+ */
+ if (callback(htup, callback_state))
+ deletable[ndeletable++] = offnum;
+ }
}
}
@@ -1043,7 +1092,7 @@ restart:
* Apply any needed deletes. We issue just one _bt_delitems_vacuum()
* call per page, so as to minimize WAL traffic.
*/
- if (ndeletable > 0)
+ if (ndeletable > 0 || nremaining > 0)
{
BlockNumber lastBlockVacuumed = InvalidBlockNumber;
@@ -1070,7 +1119,7 @@ restart:
* doesn't seem worth the amount of bookkeeping it'd take to avoid
* that.
*/
- _bt_delitems_vacuum(rel, buf, deletable, ndeletable,
+ _bt_delitems_vacuum(rel, buf, deletable, ndeletable, remainingoffset, remaining, nremaining,
lastBlockVacuumed);
/*
@@ -1160,3 +1209,50 @@ btcanreturn(Relation index, int attno)
{
return true;
}
+
+/*
+ * btreevacuumPosting() -- vacuums a posting list.
+ * The size of the list must be specified via number of items (nitems).
+ *
+ * If none of the items need to be removed, returns NULL. Otherwise returns
+ * a new palloc'd array with the remaining items. The number of remaining
+ * items is returned via nremaining.
+ */
+ItemPointer
+btreevacuumPosting(BTVacState *vstate, ItemPointerData *items,
+ int nitem, int *nremaining)
+{
+ int i,
+ remaining = 0;
+ ItemPointer tmpitems = NULL;
+ IndexBulkDeleteCallback callback = vstate->callback;
+ void *callback_state = vstate->callback_state;
+
+ /*
+ * Iterate over TIDs array
+ */
+ for (i = 0; i < nitem; i++)
+ {
+ if (callback(items + i, callback_state))
+ {
+ if (!tmpitems)
+ {
+ /*
+ * First TID to be deleted: allocate memory to hold the
+ * remaining items.
+ */
+ tmpitems = palloc(sizeof(ItemPointerData) * nitem);
+ memcpy(tmpitems, items, sizeof(ItemPointerData) * i);
+ }
+ }
+ else
+ {
+ if (tmpitems)
+ tmpitems[remaining] = items[i];
+ remaining++;
+ }
+ }
+
+ *nremaining = remaining;
+ return tmpitems;
+}
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 14dffe0..2cb1769 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -29,6 +29,8 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
OffsetNumber offnum);
static void _bt_saveitem(BTScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup);
+static void _bt_savePostingitem(BTScanOpaque so, int itemIndex,
+ OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
@@ -1134,6 +1136,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
int itemIndex;
IndexTuple itup;
bool continuescan;
+ int i;
/*
* We must have the buffer pinned and locked, but the usual macro can't be
@@ -1168,6 +1171,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
/* initialize tuple workspace to empty */
so->currPos.nextTupleOffset = 0;
+ so->currPos.prevTupleOffset = 0;
/*
* Now that the current page has been made consistent, the macro should be
@@ -1188,8 +1192,19 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
if (itup != NULL)
{
/* tuple passes all scan key conditions, so remember it */
- _bt_saveitem(so, itemIndex, offnum, itup);
- itemIndex++;
+ if (BtreeTupleIsPosting(itup))
+ {
+ for (i = 0; i < BtreeGetNPosting(itup); i++)
+ {
+ _bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+ itemIndex++;
+ }
+ }
+ else
+ {
+ _bt_saveitem(so, itemIndex, offnum, itup);
+ itemIndex++;
+ }
}
if (!continuescan)
{
@@ -1201,7 +1216,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
offnum = OffsetNumberNext(offnum);
}
- Assert(itemIndex <= MaxIndexTuplesPerPage);
+ Assert(itemIndex <= MaxPackedIndexTuplesPerPage);
so->currPos.firstItem = 0;
so->currPos.lastItem = itemIndex - 1;
so->currPos.itemIndex = 0;
@@ -1209,7 +1224,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
else
{
/* load items[] in descending order */
- itemIndex = MaxIndexTuplesPerPage;
+ itemIndex = MaxPackedIndexTuplesPerPage;
offnum = Min(offnum, maxoff);
@@ -1219,8 +1234,20 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
if (itup != NULL)
{
/* tuple passes all scan key conditions, so remember it */
- itemIndex--;
- _bt_saveitem(so, itemIndex, offnum, itup);
+ if (BtreeTupleIsPosting(itup))
+ {
+ for (i = 0; i < BtreeGetNPosting(itup); i++)
+ {
+ itemIndex--;
+ _bt_savePostingitem(so, itemIndex, offnum, BtreeGetPostingN(itup, i), itup, i);
+ }
+ }
+ else
+ {
+ itemIndex--;
+ _bt_saveitem(so, itemIndex, offnum, itup);
+ }
+
}
if (!continuescan)
{
@@ -1234,8 +1261,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
Assert(itemIndex >= 0);
so->currPos.firstItem = itemIndex;
- so->currPos.lastItem = MaxIndexTuplesPerPage - 1;
- so->currPos.itemIndex = MaxIndexTuplesPerPage - 1;
+ so->currPos.lastItem = MaxPackedIndexTuplesPerPage - 1;
+ so->currPos.itemIndex = MaxPackedIndexTuplesPerPage - 1;
}
return (so->currPos.firstItem <= so->currPos.lastItem);
@@ -1261,6 +1288,37 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
}
/*
+ * Save an index item into so->currPos.items[itemIndex]
+ * Performing index-only scan, handle the first elem separately.
+ * Save the key once, and connect it with posting tids using tupleOffset.
+ */
+static void
+_bt_savePostingitem(BTScanOpaque so, int itemIndex,
+ OffsetNumber offnum, ItemPointer iptr, IndexTuple itup, int i)
+{
+ BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+ currItem->heapTid = *iptr;
+ currItem->indexOffset = offnum;
+
+ if (so->currTuples)
+ {
+ if (i == 0)
+ {
+ /* save key. the same for all tuples in the posting */
+ Size itupsz = BtreeGetPostingOffset(itup);
+ currItem->tupleOffset = so->currPos.nextTupleOffset;
+ memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+ so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+ so->currPos.prevTupleOffset = currItem->tupleOffset;
+ }
+ else
+ currItem->tupleOffset = so->currPos.prevTupleOffset;
+ }
+}
+
+
+/*
* _bt_steppage() -- Step to next page containing valid data for scan
*
* On entry, if so->currPos.buf is valid the buffer is pinned but not locked;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..906b9df 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -75,7 +75,7 @@
#include "utils/rel.h"
#include "utils/sortsupport.h"
#include "utils/tuplesort.h"
-
+#include "catalog/catalog.h"
/*
* Status record for spooling/sorting phase. (Note we may have two of
@@ -136,6 +136,9 @@ static void _bt_sortaddtup(Page page, Size itemsize,
static void _bt_buildadd(BTWriteState *wstate, BTPageState *state,
IndexTuple itup);
static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
+static SortSupport _bt_prepare_SortSupport(BTWriteState *wstate, int keysz);
+static int _bt_call_comparator(SortSupport sortKeys, int i,
+ IndexTuple itup, IndexTuple itup2, TupleDesc tupdes);
static void _bt_load(BTWriteState *wstate,
BTSpool *btspool, BTSpool *btspool2);
@@ -527,15 +530,120 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
Assert(last_off > P_FIRSTKEY);
ii = PageGetItemId(opage, last_off);
oitup = (IndexTuple) PageGetItem(opage, ii);
- _bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
/*
- * Move 'last' into the high key position on opage
+ * If the item is PostingTuple, we can cut it, because HIKEY
+ * is not considered as real data, and it need not to keep any
+ * ItemPointerData at all. And of course it need not to keep
+ * a list of ipd.
+ * But, if it had a big posting list, there will be plenty of
+ * free space on the opage. In that case we must split posting
+ * tuple into 2 pieces.
*/
- hii = PageGetItemId(opage, P_HIKEY);
- *hii = *ii;
- ItemIdSetUnused(ii); /* redundant */
- ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+ if (BtreeTupleIsPosting(oitup))
+ {
+ IndexTuple keytup;
+ Size keytupsz;
+ int nipd,
+ ntocut,
+ ntoleave;
+
+ nipd = BtreeGetNPosting(oitup);
+ ntocut = (sizeof(ItemIdData) + BtreeGetPostingOffset(oitup))/sizeof(ItemPointerData);
+ ntocut++; /* round up to be sure that we cut enough */
+ ntoleave = nipd - ntocut;
+
+ /*
+ * 0) Form key tuple, that doesn't contain any ipd.
+ * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+ * any function that uses keytup should handle them itself.
+ */
+ keytupsz = BtreeGetPostingOffset(oitup);
+ keytup = palloc0(keytupsz);
+ memcpy (keytup, oitup, keytupsz);
+ keytup->t_info &= ~INDEX_SIZE_MASK;
+ keytup->t_info |= keytupsz;
+ ItemPointerSet(&(keytup->t_tid), oblkno, P_HIKEY);
+
+ if (ntocut < nipd)
+ {
+ ItemPointerData *newipd;
+ IndexTuple newitup,
+ newlasttup;
+ /*
+ * 1) Cut part of old tuple to shift to npage.
+ * And insert it as P_FIRSTKEY.
+ * This tuple is based on keytup.
+ * Blkno & offnum are reset in BtreeFormPackedTuple.
+ */
+ newipd = palloc0(sizeof(ItemPointerData)*ntocut);
+ /* Note, that we cut last 'ntocut' items */
+ memcpy(newipd, BtreeGetPosting(oitup)+ntoleave, sizeof(ItemPointerData)*ntocut);
+ newitup = BtreeFormPackedTuple(keytup, newipd, ntocut);
+
+ _bt_sortaddtup(npage, IndexTupleSize(newitup), newitup, P_FIRSTKEY);
+ pfree(newipd);
+ pfree(newitup);
+
+ /*
+ * 2) set last item to the P_HIKEY linp
+ * Move 'last' into the high key position on opage
+ * NOTE: Do this because of indextuple deletion algorithm, which
+ * doesn't allow to delete an item while we have unused one before it.
+ */
+ hii = PageGetItemId(opage, P_HIKEY);
+ *hii = *ii;
+ ItemIdSetUnused(ii); /* redundant */
+ ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+ /* 3) delete "wrong" high key, insert keytup as P_HIKEY. */
+ _bt_pgrewritetup(wstate->index, InvalidBuffer, opage, P_HIKEY, keytup);
+
+ /* 4) form the part of old tuple with ntoleave ipds. And insert it as last tuple. */
+ newlasttup = BtreeFormPackedTuple(keytup, BtreeGetPosting(oitup), ntoleave);
+
+ _bt_sortaddtup(opage, IndexTupleSize(newlasttup), newlasttup, PageGetMaxOffsetNumber(opage)+1);
+
+ pfree(newlasttup);
+ }
+ else
+ {
+ /* The tuple isn't big enough to split it. Handle it as a regular tuple. */
+
+ /*
+ * 1) Shift the last tuple to npage.
+ * Insert it as P_FIRSTKEY.
+ */
+ _bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+ /* 2) set last item to the P_HIKEY linp */
+ /* Move 'last' into the high key position on opage */
+ hii = PageGetItemId(opage, P_HIKEY);
+ *hii = *ii;
+ ItemIdSetUnused(ii); /* redundant */
+ ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+
+ /* 3) delete "wrong" high key, insert keytup as P_HIKEY. */
+ _bt_pgrewritetup(wstate->index, InvalidBuffer, opage, P_HIKEY, keytup);
+
+ }
+ pfree(keytup);
+ }
+ else
+ {
+ /*
+ * 1) Shift the last tuple to npage.
+ * Insert it as P_FIRSTKEY.
+ */
+ _bt_sortaddtup(npage, ItemIdGetLength(ii), oitup, P_FIRSTKEY);
+
+ /* 2) set last item to the P_HIKEY linp */
+ /* Move 'last' into the high key position on opage */
+ hii = PageGetItemId(opage, P_HIKEY);
+ *hii = *ii;
+ ItemIdSetUnused(ii); /* redundant */
+ ((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+ }
/*
* Link the old page into its parent, using its minimum key. If we
@@ -547,6 +655,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
Assert(state->btps_minkey != NULL);
ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY);
+
_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
pfree(state->btps_minkey);
@@ -554,8 +663,12 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
* Save a copy of the minimum key for the new page. We have to copy
* it off the old page, not the new one, in case we are not at leaf
* level.
+ * We can not just copy oitup, because it could be posting tuple
+ * and it's more safe just to get new inserted hikey.
*/
- state->btps_minkey = CopyIndexTuple(oitup);
+ ItemId iihk = PageGetItemId(opage, P_HIKEY);
+ IndexTuple hikey = (IndexTuple) PageGetItem(opage, iihk);
+ state->btps_minkey = CopyIndexTuple(hikey);
/*
* Set the sibling links for both pages.
@@ -590,7 +703,29 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
if (last_off == P_HIKEY)
{
Assert(state->btps_minkey == NULL);
- state->btps_minkey = CopyIndexTuple(itup);
+
+ if (BtreeTupleIsPosting(itup))
+ {
+ Size keytupsz;
+ IndexTuple keytup;
+
+ /*
+ * 0) Form key tuple, that doesn't contain any ipd.
+ * NOTE: key tuple will have blkno & offset suitable for P_HIKEY.
+ * any function that uses keytup should handle them itself.
+ */
+ keytupsz = BtreeGetPostingOffset(itup);
+ keytup = palloc0(keytupsz);
+ memcpy (keytup, itup, keytupsz);
+
+ keytup->t_info &= ~INDEX_SIZE_MASK;
+ keytup->t_info |= keytupsz;
+ ItemPointerSet(&(keytup->t_tid), nblkno, P_HIKEY);
+
+ state->btps_minkey = CopyIndexTuple(keytup);
+ }
+ else
+ state->btps_minkey = CopyIndexTuple(itup);
}
/*
@@ -670,6 +805,71 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
}
/*
+ * Prepare SortSupport structure for indextuples comparison
+ */
+static SortSupport
+_bt_prepare_SortSupport(BTWriteState *wstate, int keysz)
+{
+ ScanKey indexScanKey;
+ SortSupport sortKeys;
+ int i;
+
+ /* Prepare SortSupport data for each column */
+ indexScanKey = _bt_mkscankey_nodata(wstate->index);
+ sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
+
+ for (i = 0; i < keysz; i++)
+ {
+ SortSupport sortKey = sortKeys + i;
+ ScanKey scanKey = indexScanKey + i;
+ int16 strategy;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = scanKey->sk_collation;
+ sortKey->ssup_nulls_first =
+ (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ sortKey->ssup_attno = scanKey->sk_attno;
+ /* Abbreviation is not supported here */
+ sortKey->abbreviate = false;
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+ PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
+ }
+
+ _bt_freeskey(indexScanKey);
+ return sortKeys;
+}
+
+/*
+ * Compare two tuples using sortKey on attribute i
+ */
+static int
+_bt_call_comparator(SortSupport sortKeys, int i,
+ IndexTuple itup, IndexTuple itup2, TupleDesc tupdes)
+{
+ SortSupport entry;
+ Datum attrDatum1,
+ attrDatum2;
+ bool isNull1,
+ isNull2;
+ int32 compare;
+
+ entry = sortKeys + i - 1;
+ attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
+ attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
+
+ compare = ApplySortComparator(attrDatum1, isNull1,
+ attrDatum2, isNull2,
+ entry);
+
+ return compare;
+}
+
+/*
* Read tuples in correct sort order from tuplesort, and load them into
* btree leaves.
*/
@@ -679,16 +879,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
BTPageState *state = NULL;
bool merge = (btspool2 != NULL);
IndexTuple itup,
- itup2 = NULL;
+ itup2 = NULL,
+ itupprev = NULL;
bool should_free,
should_free2,
load1;
TupleDesc tupdes = RelationGetDescr(wstate->index);
int i,
keysz = RelationGetNumberOfAttributes(wstate->index);
- ScanKey indexScanKey = NULL;
+ int ntuples = 0;
SortSupport sortKeys;
+ /* Prepare SortSupport structure for indextuples comparison */
+ sortKeys = (SortSupport)_bt_prepare_SortSupport(wstate, keysz);
+
if (merge)
{
/*
@@ -701,34 +905,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
true, &should_free);
itup2 = tuplesort_getindextuple(btspool2->sortstate,
true, &should_free2);
- indexScanKey = _bt_mkscankey_nodata(wstate->index);
-
- /* Prepare SortSupport data for each column */
- sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
-
- for (i = 0; i < keysz; i++)
- {
- SortSupport sortKey = sortKeys + i;
- ScanKey scanKey = indexScanKey + i;
- int16 strategy;
-
- sortKey->ssup_cxt = CurrentMemoryContext;
- sortKey->ssup_collation = scanKey->sk_collation;
- sortKey->ssup_nulls_first =
- (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
- sortKey->ssup_attno = scanKey->sk_attno;
- /* Abbreviation is not supported here */
- sortKey->abbreviate = false;
-
- AssertState(sortKey->ssup_attno != 0);
-
- strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
- BTGreaterStrategyNumber : BTLessStrategyNumber;
-
- PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
- }
-
- _bt_freeskey(indexScanKey);
for (;;)
{
@@ -742,20 +918,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
{
for (i = 1; i <= keysz; i++)
{
- SortSupport entry;
- Datum attrDatum1,
- attrDatum2;
- bool isNull1,
- isNull2;
- int32 compare;
-
- entry = sortKeys + i - 1;
- attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
- attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
-
- compare = ApplySortComparator(attrDatum1, isNull1,
- attrDatum2, isNull2,
- entry);
+ int32 compare = _bt_call_comparator(sortKeys, i, itup, itup2, tupdes);
+
if (compare > 0)
{
load1 = false;
@@ -794,16 +958,123 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
else
{
/* merge is unnecessary */
- while ((itup = tuplesort_getindextuple(btspool->sortstate,
+ Relation indexRelation = wstate->index;
+ Form_pg_index index = indexRelation->rd_index;
+
+ if (IsSystemRelation(indexRelation) || index->indisunique)
+ {
+ /* Do not use compression. */
+ while ((itup = tuplesort_getindextuple(btspool->sortstate,
true, &should_free)) != NULL)
+ {
+ /* When we see first tuple, create first index page */
+ if (state == NULL)
+ state = _bt_pagestate(wstate, 0);
+
+ _bt_buildadd(wstate, state, itup);
+ if (should_free)
+ pfree(itup);
+ }
+ }
+ else
{
- /* When we see first tuple, create first index page */
- if (state == NULL)
- state = _bt_pagestate(wstate, 0);
+ ItemPointerData *ipd = NULL;
+ IndexTuple postingtuple;
+ Size maxitemsize = 0,
+ maxpostingsize = 0;
- _bt_buildadd(wstate, state, itup);
- if (should_free)
- pfree(itup);
+ while ((itup = tuplesort_getindextuple(btspool->sortstate,
+ true, &should_free)) != NULL)
+ {
+ /* When we see first tuple, create first index page */
+ if (state == NULL)
+ {
+ state = _bt_pagestate(wstate, 0);
+ maxitemsize = BTMaxItemSize(state->btps_page);
+ }
+
+ /*
+ * Compare current tuple with previous one.
+ * If tuples are equal, we can unite them into a posting list.
+ */
+ if (itupprev != NULL)
+ {
+ if (_bt_isbinaryequal(tupdes, itupprev, index->indnatts, itup))
+ {
+ /* Tuples are equal. Create or update posting */
+ if (ntuples == 0)
+ {
+ /*
+ * We haven't suitable posting list yet, so allocate
+ * it and save both itupprev and current tuple.
+ */
+ ipd = palloc0(maxitemsize);
+
+ memcpy(ipd, itupprev, sizeof(ItemPointerData));
+ ntuples++;
+ memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+ ntuples++;
+ }
+ else
+ {
+ if ((ntuples+1)*sizeof(ItemPointerData) < maxpostingsize)
+ {
+ memcpy(ipd + ntuples, itup, sizeof(ItemPointerData));
+ ntuples++;
+ }
+ else
+ {
+ postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+ _bt_buildadd(wstate, state, postingtuple);
+ ntuples = 0;
+ pfree(ipd);
+ }
+ }
+
+ }
+ else
+ {
+ /* Tuples are not equal. Insert itupprev into index. */
+ if (ntuples == 0)
+ _bt_buildadd(wstate, state, itupprev);
+ else
+ {
+ postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+ _bt_buildadd(wstate, state, postingtuple);
+ ntuples = 0;
+ pfree(ipd);
+ }
+ }
+ }
+
+ /*
+ * Copy the tuple into temp variable itupprev
+ * to compare it with the following tuple
+ * and maybe unite them into a posting tuple
+ */
+ itupprev = CopyIndexTuple(itup);
+ if (should_free)
+ pfree(itup);
+
+ /* compute max size of ipd list */
+ maxpostingsize = maxitemsize - IndexInfoFindDataOffset(itupprev->t_info) - MAXALIGN(IndexTupleSize(itupprev));
+ }
+
+ /* Handle the last item.*/
+ if (ntuples == 0)
+ {
+ if (itupprev != NULL)
+ _bt_buildadd(wstate, state, itupprev);
+ }
+ else
+ {
+ Assert(ipd!=NULL);
+ Assert(itupprev != NULL);
+ postingtuple = BtreeFormPackedTuple(itupprev, ipd, ntuples);
+ _bt_buildadd(wstate, state, postingtuple);
+ ntuples = 0;
+ pfree(ipd);
+ }
}
}
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index b714b2c..53fcbcc 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -1814,7 +1814,9 @@ _bt_killitems(IndexScanDesc scan)
ItemId iid = PageGetItemId(page, offnum);
IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
- if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
+ /* No microvacuum for posting tuples */
+ if (!BtreeTupleIsPosting(ituple)
+ && (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid)))
{
/* found the item */
ItemIdMarkDead(iid);
@@ -2056,3 +2058,69 @@ btoptions(Datum reloptions, bool validate)
{
return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
}
+
+/*
+ * Already have basic index tuple that contains key datum
+ */
+IndexTuple
+BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+ uint32 newsize;
+ IndexTuple itup = CopyIndexTuple(tuple);
+
+ /*
+ * Determine and store offset to the posting list.
+ */
+ newsize = IndexTupleSize(itup);
+ newsize = SHORTALIGN(newsize);
+
+ /*
+ * Set meta info about the posting list.
+ */
+ BtreeSetPostingOffset(itup, newsize);
+ BtreeSetNPosting(itup, nipd);
+ /*
+ * Add space needed for posting list, if any. Then check that the tuple
+ * won't be too big to store.
+ */
+ newsize += sizeof(ItemPointerData)*nipd;
+ newsize = MAXALIGN(newsize);
+
+ /*
+ * Resize tuple if needed
+ */
+ if (newsize != IndexTupleSize(itup))
+ {
+ itup = repalloc(itup, newsize);
+
+ /*
+ * PostgreSQL 9.3 and earlier did not clear this new space, so we
+ * might find uninitialized padding when reading tuples from disk.
+ */
+ memset((char *) itup + IndexTupleSize(itup),
+ 0, newsize - IndexTupleSize(itup));
+ /* set new size in tuple header */
+ itup->t_info &= ~INDEX_SIZE_MASK;
+ itup->t_info |= newsize;
+ }
+
+ /*
+ * Copy data into the posting tuple
+ */
+ memcpy(BtreeGetPosting(itup), data, sizeof(ItemPointerData)*nipd);
+ return itup;
+}
+
+IndexTuple
+BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd)
+{
+ int size;
+ if (BtreeTupleIsPosting(tuple))
+ {
+ size = BtreeGetPostingOffset(tuple);
+ tuple->t_info &= ~INDEX_SIZE_MASK;
+ tuple->t_info |= size;
+ }
+
+ return BtreeFormPackedTuple(tuple, data, nipd);
+}
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 0d094ca..6ced76c 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -475,14 +475,40 @@ btree_xlog_vacuum(XLogReaderState *record)
if (len > 0)
{
- OffsetNumber *unused;
- OffsetNumber *unend;
+ OffsetNumber *offset;
+ IndexTuple remaining;
+ int i;
+ Size itemsz;
- unused = (OffsetNumber *) ptr;
- unend = (OffsetNumber *) ((char *) ptr + len);
+ offset = (OffsetNumber *) ptr;
+ remaining = (IndexTuple)(ptr + xlrec->nremaining*sizeof(OffsetNumber));
+
+ /* Handle posting tuples */
+ for(i = 0; i < xlrec->nremaining; i++)
+ {
+ PageIndexTupleDelete(page, offset[i]);
+
+ itemsz = IndexTupleSize(remaining);
+ itemsz = MAXALIGN(itemsz);
+
+ if (PageAddItem(page, (Item) remaining, itemsz, offset[i],
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "btree_xlog_vacuum: failed to add remaining item");
- if ((unend - unused) > 0)
- PageIndexMultiDelete(page, unused, unend - unused);
+ remaining = (IndexTuple)((char*) remaining + itemsz);
+ }
+
+ if (xlrec->ndeleted > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ((char *)remaining);
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
}
/*
@@ -713,6 +739,75 @@ btree_xlog_delete(XLogReaderState *record)
UnlockReleaseBuffer(buffer);
}
+/*
+ * Applies changes performed by _bt_pgupdtup().
+ * TODO Add some stuff for inner pages. Don't sure if we really need it?
+ * See comment in _bt_pgupdtup().
+ */
+static void
+btree_xlog_update(bool isleaf, XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ Size datalen;
+ char *datapos = XLogRecGetBlockData(record, 0, &datalen);
+ ItemPointerData *ipd;
+ IndexTuple olditup,
+ newitup;
+ Size newitupsz;
+ int nipd;
+
+ /*TODO Following code needs some refactoring. Maybe one more function.*/
+ page = BufferGetPage(buffer);
+
+ olditup = (IndexTuple) PageGetItem(page, PageGetItemId(page, xlrec->offnum));
+
+ if (!BtreeTupleIsPosting(olditup))
+ nipd = 1;
+ else
+ nipd = BtreeGetNPosting(olditup);
+
+ ipd = palloc0(sizeof(ItemPointerData)*(nipd + 1));
+
+ /* copy item pointers from old tuple into ipd */
+ if (BtreeTupleIsPosting(olditup))
+ memcpy(ipd, BtreeGetPosting(olditup), sizeof(ItemPointerData)*nipd);
+ else
+ memcpy(ipd, olditup, sizeof(ItemPointerData));
+
+ /* add item pointer of the new tuple into ipd */
+ memcpy(ipd+nipd, (Item) datapos, sizeof(ItemPointerData));
+
+ newitup = BtreeReformPackedTuple((Item) datapos, ipd, nipd+1);
+
+ /*
+ * Update the tuple in place. We have already checked that the
+ * new tuple would fit into this page, so it's safe to delete
+ * old tuple and insert the new one without any side effects.
+ */
+ newitupsz = IndexTupleDSize(*newitup);
+ newitupsz = MAXALIGN(newitupsz);
+
+ PageIndexTupleDelete(page, xlrec->offnum);
+
+ if (PageAddItem(page, (Item) newitup, newitupsz, xlrec->offnum,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "failed to update compressed tuple while doing recovery");
+
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
static void
btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
{
@@ -988,6 +1083,9 @@ btree_redo(XLogReaderState *record)
case XLOG_BTREE_INSERT_META:
btree_xlog_insert(false, true, record);
break;
+ case XLOG_BTREE_UPDATE_TUPLE:
+ btree_xlog_update(true, record);
+ break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, false, record);
break;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..3dd19c0 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -138,7 +138,6 @@ typedef IndexAttributeBitMapData *IndexAttributeBitMap;
((int) ((BLCKSZ - SizeOfPageHeaderData) / \
(MAXALIGN(sizeof(IndexTupleData) + 1) + sizeof(ItemIdData))))
-
/* routines in indextuple.c */
extern IndexTuple index_form_tuple(TupleDesc tupleDescriptor,
Datum *values, bool *isnull);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 9046b16..5496e94 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -122,6 +122,15 @@ typedef struct BTMetaPageData
MAXALIGN(sizeof(BTPageOpaqueData))) / 3)
/*
+ * If compression is applied, the page could contain more tuples
+ * than if it has only uncompressed tuples, so we need new max value.
+ * Note that it is a rough upper estimate.
+ */
+#define MaxPackedIndexTuplesPerPage \
+ ((int) ((BLCKSZ - SizeOfPageHeaderData) / \
+ (sizeof(ItemPointerData))))
+
+/*
* The leaf-page fillfactor defaults to 90% but is user-adjustable.
* For pages above the leaf level, we use a fixed 70% fillfactor.
* The fillfactor is applied during index build and when splitting
@@ -226,6 +235,7 @@ typedef struct BTMetaPageData
* vacuum */
#define XLOG_BTREE_REUSE_PAGE 0xD0 /* old page is about to be reused from
* FSM */
+#define XLOG_BTREE_UPDATE_TUPLE 0xE0 /* update index tuple in place */
/*
* All that we need to regenerate the meta-data page
@@ -348,15 +358,31 @@ typedef struct xl_btree_reuse_page
*
* Note that the *last* WAL record in any vacuum of an index is allowed to
* have a zero length array of offsets. Earlier records must have at least one.
+ * TODO: update this comment
*/
typedef struct xl_btree_vacuum
{
BlockNumber lastBlockVacuumed;
- /* TARGET OFFSET NUMBERS FOLLOW */
+ /*
+ * This filed helps us to find beginning of the remining tuples
+ * which follow array of offset numbers.
+ */
+ int nremaining;
+
+ /*
+ * TODO: Don't sure if we do need following variable,
+ * maybe just a flag would be enough to determine
+ * if there is some data about deleted tuples
+ */
+ int ndeleted;
+
+ /* REMAINING OFFSET NUMBERS FOLLOW (nremaining values) */
+ /* REMAINING TUPLES TO INSERT FOLLOW (if nremaining > 0) */
+ /* TARGET OFFSET NUMBERS FOLLOW (if any) */
} xl_btree_vacuum;
-#define SizeOfBtreeVacuum (offsetof(xl_btree_vacuum, lastBlockVacuumed) + sizeof(BlockNumber))
+#define SizeOfBtreeVacuum (offsetof(xl_btree_vacuum, lastBlockVacuumed) + sizeof(BlockNumber) + 2*sizeof(int))
/*
* This is what we need to know about marking an empty branch for deletion.
@@ -538,6 +564,8 @@ typedef struct BTScanPosData
* location in the associated tuple storage workspace.
*/
int nextTupleOffset;
+ /* prevTupleOffset is for Posting list handling*/
+ int prevTupleOffset;
/*
* The items array is always ordered in index order (ie, increasing
@@ -550,7 +578,7 @@ typedef struct BTScanPosData
int lastItem; /* last valid index in items[] */
int itemIndex; /* current index in items[] */
- BTScanPosItem items[MaxIndexTuplesPerPage]; /* MUST BE LAST */
+ BTScanPosItem items[MaxPackedIndexTuplesPerPage]; /* MUST BE LAST */
} BTScanPosData;
typedef BTScanPosData *BTScanPos;
@@ -650,6 +678,27 @@ typedef BTScanOpaqueData *BTScanOpaque;
#define SK_BT_DESC (INDOPTION_DESC << SK_BT_INDOPTION_SHIFT)
#define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
+
+/*
+ * We use our own ItemPointerGet(BlockNumber|OffsetNumber)
+ * to avoid Asserts, since sometimes the ip_posid isn't "valid"
+ */
+#define BtreeItemPointerGetBlockNumber(pointer) \
+ BlockIdGetBlockNumber(&(pointer)->ip_blkid)
+
+#define BtreeItemPointerGetOffsetNumber(pointer) \
+ ((pointer)->ip_posid)
+
+#define BT_POSTING (1<<31)
+#define BtreeGetNPosting(itup) BtreeItemPointerGetOffsetNumber(&(itup)->t_tid)
+#define BtreeSetNPosting(itup,n) ItemPointerSetOffsetNumber(&(itup)->t_tid,n)
+
+#define BtreeGetPostingOffset(itup) (BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & (~BT_POSTING))
+#define BtreeSetPostingOffset(itup,n) ItemPointerSetBlockNumber(&(itup)->t_tid,(n)|BT_POSTING)
+#define BtreeTupleIsPosting(itup) (BtreeItemPointerGetBlockNumber(&(itup)->t_tid) & BT_POSTING)
+#define BtreeGetPosting(itup) (ItemPointerData*) ((char*)(itup) + BtreeGetPostingOffset(itup))
+#define BtreeGetPostingN(itup,n) (ItemPointerData*) (BtreeGetPosting(itup) + n)
+
/*
* prototypes for functions in nbtree.c (external entry points for btree)
*/
@@ -683,6 +732,10 @@ extern bool _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel);
extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
extern void _bt_finish_split(Relation rel, Buffer bbuf, BTStack stack);
+extern void _bt_pgupdtup(Relation rel, Buffer buf, OffsetNumber offset, IndexTuple itup,
+ IndexTuple olditup, int nipd);
+extern void _bt_pgrewritetup(Relation rel, Buffer buf, Page page, OffsetNumber offset, IndexTuple itup);
+extern bool _bt_isbinaryequal(TupleDesc itupdesc, IndexTuple itup, int nindatts, IndexTuple ituptoinsert);
/*
* prototypes for functions in nbtpage.c
@@ -702,6 +755,8 @@ extern void _bt_delitems_delete(Relation rel, Buffer buf,
OffsetNumber *itemnos, int nitems, Relation heapRel);
extern void _bt_delitems_vacuum(Relation rel, Buffer buf,
OffsetNumber *itemnos, int nitems,
+ OffsetNumber *remainingoffset,
+ IndexTuple *remaining, int nremaining,
BlockNumber lastBlockVacuumed);
extern int _bt_pagedel(Relation rel, Buffer buf);
@@ -714,8 +769,8 @@ extern BTStack _bt_search(Relation rel,
extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
int access);
-extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
- ScanKey scankey, bool nextkey);
+extern OffsetNumber _bt_binsrch( Relation rel, Buffer buf, int keysz,
+ ScanKey scankey, bool nextkey);
extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
Page page, OffsetNumber offnum);
extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
@@ -746,6 +801,8 @@ extern void _bt_end_vacuum_callback(int code, Datum arg);
extern Size BTreeShmemSize(void);
extern void BTreeShmemInit(void);
extern bytea *btoptions(Datum reloptions, bool validate);
+extern IndexTuple BtreeFormPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
+extern IndexTuple BtreeReformPackedTuple(IndexTuple tuple, ItemPointerData *data, int nipd);
/*
* prototypes for functions in nbtvalidate.c
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers