-- Targeting PG15; if too early / noise then please ignore.

I've noticed there are a lot of places in the btree index
infrastructure (and also some other index AMs) that effectively
iterate over the attributes of the index tuple, but won't use
index_deform_tuple for reasons. However, this implies that they must
repeatedly call index_getattr, which in the worst case is O(n) for the
n-th attribute, slowing down extraction of multi-column indexes
significantly. As such, I've added some API that allows for iteration
(ish) over index attributes.

Please find attached patch 0001 that improves the runtime complexity
of many of these places by storing and reusing the offset of the last
extracted attribute. This improves the worst-case runtime of
extracting all attributes to O(n) for incremental attribute extraction
(from O(n*n)). Note that finding the first offsets is still an O(n)
worst case for starting at the n-th attribute, but nothing can be done
about that.

Almost all workloads for multi-column nbtree indexes that cannot use
attcacheoff should see a benefit from this patch; only those that only
use row scans cannot use this optimization. Additionally, multi-column
gist indexes could also see some (albeit limited) benefit, which is
indeed useful when considering the newly added INCLUDE support in the
gist AM.

Also attached is 0002, which dynamically truncates attribute prefixes
of tuples whilst _binsrch-ing through a nbtree page. It greatly uses
the improved performance of 0001; they work very well together. The
problems that Peter (cc-ed) mentions in [0] only result in invalid
search bounds when traversing the tree, but on the page level valid
bounds can be constructed.

This is patchset 1 of a series of patches I'm starting for eventually
adding static prefix truncation into nbtree infrastructure in
PostgreSQL. I've put up a wiki page [1] with my current research and
thoughts on that topic.

Performance
-----------

I've run some tests with regards to performance on my laptop; which
tests nbtree index traversal. The test is based on a recent UK land
registry sales prices dataset (25744780 rows), being copied from one
table into an unlogged table with disabled autovacuum, with one index
as specified by the result. Master @ 99964c4a, patched is with both
0001 and 0002. The results are averages over 3 runs, with plain
configure, compiled by gcc (Debian 6.3.0-18+deb9u1).

INSERT (index definition)                | master (s) | patched (s) | improv(%)
UNIQUE (transaction)                     |     256851 |      251705 | 2.00
(county, city, locality)                 |     154529 |      147495 | 4.55
(county COLLATE "en_US", city, locality) |     174028 |      164165 | 5.67
(always_null, county, city, locality)    |     173090 |      166851 | 3.60

Some testing for reindex indicates improvements there as well: Same
compiled version; all indexes on an unlogged table; REINDEX run 4
times on each index, last 3 were averaged.

REINDEX (index definition)               | master (s) | patched (s) | improv(%)
UNIQUE (transaction)                     |      11623 |       11692 | -0.6
(county, city, locality)                 |      58299 |       54770 |  6.1
(county COLLATE "en_US", city, locality) |      61790 |       55887 |  9.6
(always_null, county, city, locality)    |      69703 |       63925 |  8.3

I am quite suprised with the results for the single-column unique
index insertions, as that was one of the points where I was suspecting
a slight decrease in performance for inserts. I haven't really checked
why the performance increased, but I suspect it has to do with an
improved fast-path for finding the first attribute (we know it always
starts at offset 0 of the data section), but it might also just as
well be due to throttling (sadly, I do not have a stable benchmarking
machine, so my laptop will do).

I'm also slightly disappointed with the results of the always_null
insert load; I had hoped for better results there, seeing the results
for the other 2 multi-column indexes.


With regards,

Matthias van de Meent.

[0] 
https://www.postgresql.org/message-id/cah2-wzn_nayk4pr0hrwo0stwhmxjp5qyu+x8vppt030xpqr...@mail.gmail.com
[1] https://wiki.postgresql.org/wiki/NBTree_Prefix_Truncation
From b832318acad276ef991785141f606d624c67cb69 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm@gmail.com>
Date: Mon, 22 Feb 2021 14:13:13 +0100
Subject: [PATCH v1 2/2] Implement page-level dynamic prefix truncation for
 _bt_binsrch*

Because tuples are ordered on the page, if some prefix of the scan columns on
both sides of the compared tuple are equal to the scankey, then the current
tuple that is being compared must also have those prefixing columns that equal
the scankey.

We cannot propagate this information to _binsrch on subsequent pages, as this
downstream page may concurrently have split and/or have merged with its
deleted left neighbour (see [0]), moving the keyspace of the linked page, so
we can only trust the current state of the page for this optimization, which
means we must validate this state each time we pin this page.

[0] https://www.postgresql.org/message-id/CAH2-Wzn_NAyK4pR0HRWO0StwHmxjP5qyu+X8vppt030XpqrO6w@mail.gmail.com
---
 contrib/amcheck/verify_nbtree.c       | 25 +++++----
 src/backend/access/nbtree/README      | 24 +++++++++
 src/backend/access/nbtree/nbtinsert.c | 28 ++++++----
 src/backend/access/nbtree/nbtsearch.c | 74 +++++++++++++++++++++------
 src/include/access/nbtree.h           |  9 ++--
 5 files changed, 120 insertions(+), 40 deletions(-)

diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index 3d06be5563..1bcb67648f 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -2651,6 +2651,7 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
 	BTStack		stack;
 	Buffer		lbuf;
 	bool		exists;
+	AttrNumber	cmpcol = 1;
 
 	key = _bt_mkscankey(state->rel, itup);
 	Assert(key->heapkeyspace && key->scantid != NULL);
@@ -2681,13 +2682,13 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
 		insertstate.buf = lbuf;
 
 		/* Get matching tuple on leaf page */
-		offnum = _bt_binsrch_insert(state->rel, &insertstate);
+		offnum = _bt_binsrch_insert(state->rel, &insertstate, 1);
 		/* Compare first >= matching item on leaf page, if any */
 		page = BufferGetPage(lbuf);
 		/* Should match on first heap TID when tuple has a posting list */
 		if (offnum <= PageGetMaxOffsetNumber(page) &&
 			insertstate.postingoff <= 0 &&
-			_bt_compare(state->rel, key, page, offnum) == 0)
+			_bt_compare(state->rel, key, page, offnum, &cmpcol) == 0)
 			exists = true;
 		_bt_relbuf(state->rel, lbuf);
 	}
@@ -2748,7 +2749,8 @@ invariant_l_offset(BtreeCheckState *state, BTScanInsert key,
 				   OffsetNumber upperbound)
 {
 	ItemId		itemid;
-	int32		cmp;
+	int32		cmp,
+				cmpcol = 1;
 
 	Assert(key->pivotsearch);
 
@@ -2759,7 +2761,7 @@ invariant_l_offset(BtreeCheckState *state, BTScanInsert key,
 	if (!key->heapkeyspace)
 		return invariant_leq_offset(state, key, upperbound);
 
-	cmp = _bt_compare(state->rel, key, state->target, upperbound);
+	cmp = _bt_compare(state->rel, key, state->target, upperbound, &cmpcol);
 
 	/*
 	 * _bt_compare() is capable of determining that a scankey with a
@@ -2810,11 +2812,12 @@ static inline bool
 invariant_leq_offset(BtreeCheckState *state, BTScanInsert key,
 					 OffsetNumber upperbound)
 {
-	int32		cmp;
+	int32		cmp,
+				cmpcol = 1;
 
 	Assert(key->pivotsearch);
 
-	cmp = _bt_compare(state->rel, key, state->target, upperbound);
+	cmp = _bt_compare(state->rel, key, state->target, upperbound, &cmpcol);
 
 	return cmp <= 0;
 }
@@ -2833,11 +2836,12 @@ static inline bool
 invariant_g_offset(BtreeCheckState *state, BTScanInsert key,
 				   OffsetNumber lowerbound)
 {
-	int32		cmp;
+	int32		cmp,
+				cmpcol = 1;
 
 	Assert(key->pivotsearch);
 
-	cmp = _bt_compare(state->rel, key, state->target, lowerbound);
+	cmp = _bt_compare(state->rel, key, state->target, lowerbound, &cmpcol);
 
 	/* pg_upgrade'd indexes may legally have equal sibling tuples */
 	if (!key->heapkeyspace)
@@ -2871,14 +2875,15 @@ invariant_l_nontarget_offset(BtreeCheckState *state, BTScanInsert key,
 							 OffsetNumber upperbound)
 {
 	ItemId		itemid;
-	int32		cmp;
+	int32		cmp,
+				cmpcol = 1;
 
 	Assert(key->pivotsearch);
 
 	/* Verify line pointer before checking tuple */
 	itemid = PageGetItemIdCareful(state, nontargetblock, nontarget,
 								  upperbound);
-	cmp = _bt_compare(state->rel, key, nontarget, upperbound);
+	cmp = _bt_compare(state->rel, key, nontarget, upperbound, &cmpcol);
 
 	/* pg_upgrade'd indexes may legally have equal sibling tuples */
 	if (!key->heapkeyspace)
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index bfe33b6b43..79a179afad 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -874,6 +874,30 @@ large groups of duplicates, maximizing space utilization.  Note also that
 deduplication more efficient.  Deduplication can be performed infrequently,
 without merging together existing posting list tuples too often.
 
+Notes about dynamic prefix truncation
+-------------------------------------
+
+Because NBTrees have a sorted keyspace, when we have determined that some
+prefixing columns of tuples on both sides of the tuple that is being
+compared are equal to the scankey, then the current tuple must also share
+this prefix with the scankey. This allows us to skip comparing those columns,
+potentially saving cycles.
+
+We can only use this constraint if we have proven this information while we
+hold a pin on the page, so this is only useful on the page level: Concurrent
+page deletions and splits may have moved the keyspace of the page referenced
+by an inner page to the right. If we re-used high- and low-column-prefixes,
+we would not be able to detect a change of keyspace from e.g. (2,2) to (1,2),
+and subsequently return invalid results. This race condition can only be
+prevented by re-establishing the prefix-equal-columns for each page.
+
+The positive part of this, is that we already have results of the highest
+value of a page: a pages' highkey is compared to the scankey while we have
+a pin on the page in the _bt_moveright procedure. The _bt_binsrch procedure
+will use this result as a rightmost prefix compare, and for each step in the
+binary search (that does not compare less than the insert key) improve the
+equal-prefix bounds.
+
 Notes about deduplication
 -------------------------
 
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 6ac205c98e..c5b32e7ce5 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -325,6 +325,7 @@ _bt_search_insert(Relation rel, BTInsertState insertstate)
 		{
 			Page		page;
 			BTPageOpaque opaque;
+			AttrNumber	comparecol = 1;
 
 			_bt_checkpage(rel, insertstate->buf);
 			page = BufferGetPage(insertstate->buf);
@@ -343,7 +344,7 @@ _bt_search_insert(Relation rel, BTInsertState insertstate)
 				!P_IGNORE(opaque) &&
 				PageGetFreeSpace(page) > insertstate->itemsz &&
 				PageGetMaxOffsetNumber(page) >= P_HIKEY &&
-				_bt_compare(rel, insertstate->itup_key, page, P_HIKEY) > 0)
+				_bt_compare(rel, insertstate->itup_key, page, P_HIKEY, &comparecol) > 0)
 			{
 				/*
 				 * Caller can use the fastpath optimization because cached
@@ -437,7 +438,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 	 * in the fastpath below, but also in the _bt_findinsertloc() call later.
 	 */
 	Assert(!insertstate->bounds_valid);
-	offset = _bt_binsrch_insert(rel, insertstate);
+	offset = _bt_binsrch_insert(rel, insertstate, 1);
 
 	/*
 	 * Scan over all equal tuples, looking for live conflicts.
@@ -447,6 +448,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 	Assert(itup_key->scantid == NULL);
 	for (;;)
 	{
+		AttrNumber	cmpcol = 1;
 		/*
 		 * Each iteration of the loop processes one heap TID, not one index
 		 * tuple.  Current offset number for page isn't usually advanced on
@@ -482,7 +484,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				Assert(insertstate->bounds_valid);
 				Assert(insertstate->low >= P_FIRSTDATAKEY(opaque));
 				Assert(insertstate->low <= insertstate->stricthigh);
-				Assert(_bt_compare(rel, itup_key, page, offset) < 0);
+				Assert(_bt_compare(rel, itup_key, page, offset, &cmpcol) < 0);
 				break;
 			}
 
@@ -507,7 +509,7 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				if (!inposting)
 				{
 					/* Plain tuple, or first TID in posting list tuple */
-					if (_bt_compare(rel, itup_key, page, offset) != 0)
+					if (_bt_compare(rel, itup_key, page, offset, &cmpcol) != 0)
 						break;	/* we're past all the equal tuples */
 
 					/* Advanced curitup */
@@ -717,11 +719,12 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 		else
 		{
 			int			highkeycmp;
+			cmpcol = 1;
 
 			/* If scankey == hikey we gotta check the next page too */
 			if (P_RIGHTMOST(opaque))
 				break;
-			highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY);
+			highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY, &cmpcol);
 			Assert(highkeycmp <= 0);
 			if (highkeycmp != 0)
 				break;
@@ -823,6 +826,7 @@ _bt_findinsertloc(Relation rel,
 	Page		page = BufferGetPage(insertstate->buf);
 	BTPageOpaque opaque;
 	OffsetNumber newitemoff;
+	AttrNumber	cmpcol = 1;
 
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
@@ -867,6 +871,7 @@ _bt_findinsertloc(Relation rel,
 
 			for (;;)
 			{
+				cmpcol = 1;
 				/*
 				 * Does the new tuple belong on this page?
 				 *
@@ -884,7 +889,7 @@ _bt_findinsertloc(Relation rel,
 
 				/* Test '<=', not '!=', since scantid is set now */
 				if (P_RIGHTMOST(opaque) ||
-					_bt_compare(rel, itup_key, page, P_HIKEY) <= 0)
+					_bt_compare(rel, itup_key, page, P_HIKEY, &cmpcol) <= 0)
 					break;
 
 				_bt_stepright(rel, insertstate, stack);
@@ -937,6 +942,7 @@ _bt_findinsertloc(Relation rel,
 		 */
 		while (PageGetFreeSpace(page) < insertstate->itemsz)
 		{
+			cmpcol = 1;
 			/*
 			 * Before considering moving right, see if we can obtain enough
 			 * space by erasing LP_DEAD items
@@ -967,7 +973,7 @@ _bt_findinsertloc(Relation rel,
 				break;
 
 			if (P_RIGHTMOST(opaque) ||
-				_bt_compare(rel, itup_key, page, P_HIKEY) != 0 ||
+				_bt_compare(rel, itup_key, page, P_HIKEY, &cmpcol) != 0 ||
 				random() <= (MAX_RANDOM_VALUE / 100))
 				break;
 
@@ -982,10 +988,10 @@ _bt_findinsertloc(Relation rel,
 	 * We should now be on the correct page.  Find the offset within the page
 	 * for the new tuple. (Possibly reusing earlier search bounds.)
 	 */
-	Assert(P_RIGHTMOST(opaque) ||
-		   _bt_compare(rel, itup_key, page, P_HIKEY) <= 0);
+	Assert(((cmpcol = 1), (P_RIGHTMOST(opaque) ||
+			   _bt_compare(rel, itup_key, page, P_HIKEY, &cmpcol) <= 0)));
 
-	newitemoff = _bt_binsrch_insert(rel, insertstate);
+	newitemoff = _bt_binsrch_insert(rel, insertstate, cmpcol);
 
 	if (insertstate->postingoff == -1)
 	{
@@ -1004,7 +1010,7 @@ _bt_findinsertloc(Relation rel,
 		 */
 		Assert(!insertstate->bounds_valid);
 		insertstate->postingoff = 0;
-		newitemoff = _bt_binsrch_insert(rel, insertstate);
+		newitemoff = _bt_binsrch_insert(rel, insertstate, cmpcol);
 		Assert(insertstate->postingoff == 0);
 	}
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index e002c11e8b..214e699274 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -25,7 +25,7 @@
 
 
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
-static OffsetNumber _bt_binsrch(Relation rel, BTScanInsert key, Buffer buf);
+static OffsetNumber _bt_binsrch(Relation rel, BTScanInsert key, Buffer buf, AttrNumber highkeycmpcol);
 static int	_bt_binsrch_posting(BTScanInsert key, Page page,
 								OffsetNumber offnum);
 static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
@@ -121,6 +121,7 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
 		IndexTuple	itup;
 		BlockNumber child;
 		BTStack		new_stack;
+		AttrNumber	comparecol = 1;
 
 		/*
 		 * Race -- the page we just grabbed may have split since we read its
@@ -135,7 +136,7 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
 		 * opportunity to finish splits of internal pages too.
 		 */
 		*bufP = _bt_moveright(rel, key, *bufP, (access == BT_WRITE), stack_in,
-							  page_access, snapshot);
+							  page_access, snapshot, &comparecol);
 
 		/* if this is a leaf page, we're done */
 		page = BufferGetPage(*bufP);
@@ -147,7 +148,7 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
 		 * Find the appropriate pivot tuple on this page.  Its downlink points
 		 * to the child page that we're about to descend to.
 		 */
-		offnum = _bt_binsrch(rel, key, *bufP);
+		offnum = _bt_binsrch(rel, key, *bufP, comparecol);
 		itemid = PageGetItemId(page, offnum);
 		itup = (IndexTuple) PageGetItem(page, itemid);
 		Assert(BTreeTupleIsPivot(itup) || !key->heapkeyspace);
@@ -186,6 +187,7 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
 	 */
 	if (access == BT_WRITE && page_access == BT_READ)
 	{
+		AttrNumber	comparecol = 1;
 		/* trade in our read lock for a write lock */
 		_bt_unlockbuf(rel, *bufP);
 		_bt_lockbuf(rel, *bufP, BT_WRITE);
@@ -196,7 +198,7 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
 		 * move right to its new sibling.  Do that.
 		 */
 		*bufP = _bt_moveright(rel, key, *bufP, true, stack_in, BT_WRITE,
-							  snapshot);
+							  snapshot, &comparecol);
 	}
 
 	return stack_in;
@@ -238,18 +240,22 @@ _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
  * positioning for an insert or delete, so NULL is used for those cases.
  */
 Buffer
-_bt_moveright(Relation rel,
+_bt_moveright(
+			  Relation rel,
 			  BTScanInsert key,
 			  Buffer buf,
 			  bool forupdate,
 			  BTStack stack,
 			  int access,
-			  Snapshot snapshot)
+			  Snapshot snapshot,
+			  AttrNumber *comparecol)
 {
 	Page		page;
 	BTPageOpaque opaque;
 	int32		cmpval;
 
+	Assert(PointerIsValid(comparecol));
+
 	/*
 	 * When nextkey = false (normal case): if the scan key that brought us to
 	 * this page is > the high key stored on the page, then the page has split
@@ -271,12 +277,16 @@ _bt_moveright(Relation rel,
 
 	for (;;)
 	{
+		AttrNumber cmpcol = 1;
 		page = BufferGetPage(buf);
 		TestForOldSnapshot(snapshot, rel, page);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
 		if (P_RIGHTMOST(opaque))
+		{
+			*comparecol = cmpcol;
 			break;
+		}
 
 		/*
 		 * Finish any incomplete splits we encounter along the way.
@@ -302,14 +312,18 @@ _bt_moveright(Relation rel,
 			continue;
 		}
 
-		if (P_IGNORE(opaque) || _bt_compare(rel, key, page, P_HIKEY) >= cmpval)
+		if (P_IGNORE(opaque) || _bt_compare(rel, key, page, P_HIKEY, &cmpcol) >= cmpval)
 		{
 			/* step right one page */
 			buf = _bt_relandgetbuf(rel, buf, opaque->btpo_next, access);
+			*comparecol = 1;
 			continue;
 		}
 		else
+		{
+			*comparecol = cmpcol;
 			break;
+		}
 	}
 
 	if (P_IGNORE(opaque))
@@ -342,7 +356,8 @@ _bt_moveright(Relation rel,
 static OffsetNumber
 _bt_binsrch(Relation rel,
 			BTScanInsert key,
-			Buffer buf)
+			Buffer buf,
+			AttrNumber highkeycmpcol)
 {
 	Page		page;
 	BTPageOpaque opaque;
@@ -350,6 +365,9 @@ _bt_binsrch(Relation rel,
 				high;
 	int32		result,
 				cmpval;
+	AttrNumber	curcmpcol = 1,
+				highcmpcol = highkeycmpcol,
+				lowcmpcol = 1;
 
 	page = BufferGetPage(buf);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -394,12 +412,20 @@ _bt_binsrch(Relation rel,
 
 		/* We have low <= mid < high, so mid points at a real slot */
 
-		result = _bt_compare(rel, key, page, mid);
+		result = _bt_compare(rel, key, page, mid, &curcmpcol);
+		Assert(((curcmpcol = 1), result == _bt_compare(rel, key, page, mid, &curcmpcol)));
 
 		if (result >= cmpval)
+		{
 			low = mid + 1;
+			lowcmpcol = curcmpcol;
+		}
 		else
+		{
 			high = mid;
+			highcmpcol = curcmpcol;
+		}
+		curcmpcol = Min(highcmpcol, lowcmpcol);
 	}
 
 	/*
@@ -444,7 +470,7 @@ _bt_binsrch(Relation rel,
  * list split).
  */
 OffsetNumber
-_bt_binsrch_insert(Relation rel, BTInsertState insertstate)
+_bt_binsrch_insert(Relation rel, BTInsertState insertstate, AttrNumber highcmpcol)
 {
 	BTScanInsert key = insertstate->itup_key;
 	Page		page;
@@ -454,6 +480,9 @@ _bt_binsrch_insert(Relation rel, BTInsertState insertstate)
 				stricthigh;
 	int32		result,
 				cmpval;
+	AttrNumber	cmpcol = 1,
+				highcol = highcmpcol,
+				lowcol = 1;
 
 	page = BufferGetPage(insertstate->buf);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -507,17 +536,23 @@ _bt_binsrch_insert(Relation rel, BTInsertState insertstate)
 
 		/* We have low <= mid < high, so mid points at a real slot */
 
-		result = _bt_compare(rel, key, page, mid);
+		result = _bt_compare(rel, key, page, mid, &cmpcol);
 
 		if (result >= cmpval)
+		{
 			low = mid + 1;
+			lowcol = cmpcol;
+		}
 		else
 		{
 			high = mid;
+			highcol = cmpcol;
 			if (result != 0)
 				stricthigh = high;
 		}
 
+		cmpcol = Min(highcol, lowcol);
+
 		/*
 		 * If tuple at offset located by binary search is a posting list whose
 		 * TID range overlaps with caller's scantid, perform posting list
@@ -644,7 +679,8 @@ int32
 _bt_compare(Relation rel,
 			BTScanInsert key,
 			Page page,
-			OffsetNumber offnum)
+			OffsetNumber offnum,
+			AttrNumber *comparecol)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -659,6 +695,7 @@ _bt_compare(Relation rel,
 	Assert(_bt_check_natts(rel, key->heapkeyspace, page, offnum));
 	Assert(key->keysz <= IndexRelationGetNumberOfKeyAttributes(rel));
 	Assert(key->heapkeyspace || key->scantid == NULL);
+	Assert(*comparecol >= 1 && *comparecol <= key->keysz + 1);
 
 	/*
 	 * Force result ">" if target item is first data item on an internal page
@@ -685,11 +722,11 @@ _bt_compare(Relation rel,
 	ncmpkey = Min(ntupatts, key->keysz);
 	Assert(key->heapkeyspace || ncmpkey == key->keysz);
 	Assert(!BTreeTupleIsPosting(itup) || key->allequalimage);
-	scankey = key->scankeys;
+	scankey = key->scankeys + (*comparecol - 1);
 
-	index_attiterinit(itup, 1, itupdesc, &iter);
+	index_attiterinit(itup, *comparecol, itupdesc, &iter);
 
-	for (int i = 1; i <= ncmpkey; i++)
+	for (int i = *comparecol; i <= ncmpkey; i++)
 	{
 		Datum		datum;
 
@@ -732,7 +769,10 @@ _bt_compare(Relation rel,
 
 		/* if the keys are unequal, return the difference */
 		if (result != 0)
+		{
+			*comparecol = i;
 			return result;
+		}
 
 		scankey++;
 	}
@@ -746,6 +786,8 @@ _bt_compare(Relation rel,
 	 * scankey won't, so explicitly excluding non-key attributes isn't
 	 * necessary.
 	 */
+	*comparecol = ncmpkey + 1;
+
 	if (key->keysz > ntupatts)
 		return 1;
 
@@ -1383,7 +1425,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	_bt_initialize_more_data(so, dir);
 
 	/* position to the precise item on the page */
-	offnum = _bt_binsrch(rel, &inskey, buf);
+	offnum = _bt_binsrch(rel, &inskey, buf, 1);
 
 	/*
 	 * If nextkey = false, we are positioned at the first item >= scan key, or
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index a645c42e68..154945db05 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -1224,9 +1224,12 @@ extern void _bt_pendingfsm_finalize(Relation rel, BTVacState *vstate);
 extern BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP,
 						  int access, Snapshot snapshot);
 extern Buffer _bt_moveright(Relation rel, BTScanInsert key, Buffer buf,
-							bool forupdate, BTStack stack, int access, Snapshot snapshot);
-extern OffsetNumber _bt_binsrch_insert(Relation rel, BTInsertState insertstate);
-extern int32 _bt_compare(Relation rel, BTScanInsert key, Page page, OffsetNumber offnum);
+							bool forupdate, BTStack stack, int access, Snapshot snapshot,
+							AttrNumber *comparecol);
+extern OffsetNumber _bt_binsrch_insert(Relation rel, BTInsertState insertstate,
+									   AttrNumber highcmpcol);
+extern int32 _bt_compare(Relation rel, BTScanInsert key, Page page, OffsetNumber offnum,
+						 AttrNumber *comparecol);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
 extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
-- 
2.20.1

From b7bc4759f0f68ee4a75bcfc208aaf190c2ab484a Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm@gmail.com>
Date: Sat, 10 Apr 2021 19:01:50 +0200
Subject: [PATCH v1 1/2] Implement and use index tuple attribute iteration

index_getattr's fast-path only works for fixed-size prefixes with non-null
attrubutes, and null attributes, but for all other cases this is a O(n)
lookup. Because it is also often called in a loop for each attribute, we
can re-use the offset results from earlier attribute lookups, and using
that speed up this attribute lookup.
---
 src/backend/access/common/indextuple.c | 149 +++++++++++++++++++++++++
 src/backend/access/gist/gistutil.c     |  32 ++++--
 src/backend/access/nbtree/nbtsearch.c  |  11 +-
 src/backend/access/nbtree/nbtsort.c    |  15 ++-
 src/backend/access/nbtree/nbtutils.c   |  75 ++++++++-----
 src/backend/utils/sort/tuplesort.c     |  28 ++---
 src/include/access/itup.h              |  85 ++++++++++++++
 7 files changed, 335 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index a4cb8914cc..25c2ad6497 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -433,6 +433,155 @@ nocache_index_getattr(IndexTuple tup,
 	return fetchatt(TupleDescAttr(tupleDesc, attnum), tp + off);
 }
 
+/*
+ * Initiate an index attribute iterator to attribute attnum, 
+ * and return the corresponding datum.
+ *
+ * This is nearly the same as index_deform_tuple, except that this
+ * returns the internal state up to attnum, instead of populating the
+ * datum- and isnull-arrays
+ */
+void
+nocache_index_attiterinit(IndexTuple tup, AttrNumber attnum, TupleDesc tupleDesc, IAttrIterState iter)
+{
+	bool		hasnulls = IndexTupleHasNulls(tup);
+	int			curatt;
+	char	   *tp;				/* ptr to tuple data */
+	int			off;			/* offset in tuple data */
+	bits8	   *bp;				/* ptr to null bitmap in tuple */
+	bool		slow = false;	/* can we use/set attcacheoff? */
+	bool		null = false;
+
+	/* Assert to protect callers */
+	Assert(tupleDesc->natts <= INDEX_MAX_KEYS);
+	Assert(attnum <= tupleDesc->natts);
+	Assert(attnum > 0);
+
+	/* XXX "knows" t_bits are just after fixed tuple header! */
+	bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData));
+
+	tp = (char *) tup + IndexInfoFindDataOffset(tup->t_info);
+	off = 0;
+
+	for (curatt = 0; curatt < attnum; curatt++)
+	{
+		Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, curatt);
+
+		if (hasnulls && att_isnull(curatt, bp))
+		{
+			null = true;
+			slow = true;		/* can't use attcacheoff anymore */
+			continue;
+		}
+
+		null = false;
+
+		if (!slow && thisatt->attcacheoff >= 0)
+			off = thisatt->attcacheoff;
+		else if (thisatt->attlen == -1)
+		{
+			/*
+			 * We can only cache the offset for a varlena attribute if the
+			 * offset is already suitably aligned, so that there would be no
+			 * pad bytes in any case: then the offset will be valid for either
+			 * an aligned or unaligned value.
+			 */
+			if (!slow &&
+				off == att_align_nominal(off, thisatt->attalign))
+				thisatt->attcacheoff = off;
+			else
+			{
+				off = att_align_pointer(off, thisatt->attalign, -1,
+										tp + off);
+				slow = true;
+			}
+		}
+		else
+		{
+			/* not varlena, so safe to use att_align_nominal */
+			off = att_align_nominal(off, thisatt->attalign);
+
+			if (!slow)
+				thisatt->attcacheoff = off;
+		}
+
+		off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+		if (thisatt->attlen <= 0)
+			slow = true;		/* can't use attcacheoff anymore */
+	}
+
+	iter->isNull = null;
+	iter->offset = off;
+	iter->slow = slow;
+}
+
+Datum
+nocache_index_attiternext(IndexTuple tup, AttrNumber attnum, TupleDesc tupleDesc, IAttrIterState iter)
+{
+	bool		hasnulls = IndexTupleHasNulls(tup);
+	char	   *tp;				/* ptr to tuple data */
+	bits8	   *bp;				/* ptr to null bitmap in tuple */
+	Datum		datum;
+
+	Assert(tupleDesc->natts <= INDEX_MAX_KEYS);
+	Assert(attnum <= tupleDesc->natts);
+	Assert(attnum > 0);
+
+	bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData));
+
+	tp = (char *) tup + IndexInfoFindDataOffset(tup->t_info);
+	Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum - 1);
+
+	if (hasnulls && att_isnull(attnum - 1, bp))
+	{
+		iter->isNull = true;
+		iter->slow = true;		/* can't use attcacheoff anymore */
+		return (Datum) 0;
+	}
+
+	iter->isNull = false;
+
+	if (!iter->slow && thisatt->attcacheoff >= 0)
+		iter->offset = thisatt->attcacheoff;
+	else if (thisatt->attlen == -1)
+	{
+		/*
+		 * We can only cache the offset for a varlena attribute if the
+		 * offset is already suitably aligned, so that there would be no
+		 * pad bytes in any case: then the offset will be valid for either
+		 * an aligned or unaligned value.
+		 */
+		if (!iter->slow &&
+			iter->offset == att_align_nominal(iter->offset, thisatt->attalign))
+			thisatt->attcacheoff = iter->offset;
+		else
+		{
+			iter->offset = att_align_pointer(iter->offset, thisatt->attalign, -1,
+											 tp + iter->offset);
+			iter->slow = true;
+		}
+	}
+	else
+	{
+		/* not varlena, so safe to use att_align_nominal */
+		iter->offset = att_align_nominal(iter->offset, thisatt->attalign);
+
+		if (!iter->slow)
+			thisatt->attcacheoff = iter->offset;
+	}
+
+	datum = fetchatt(thisatt, tp + iter->offset);
+
+	iter->offset = att_addlength_pointer(iter->offset, thisatt->attlen, tp + iter->offset);
+
+	if (thisatt->attlen <= 0)
+		iter->slow = true;		/* can't use attcacheoff anymore */
+
+	return datum;
+}
+
+
 /*
  * Convert an index tuple into Datum/isnull arrays.
  *
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 8dcd53c457..4455ec0ff9 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -296,15 +296,18 @@ gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
 				  OffsetNumber o, GISTENTRY *attdata, bool *isnull)
 {
 	int			i;
+	IAttrIterStateData iter;
+	index_attiterinit(tuple, 1, giststate->leafTupdesc, &iter);
 
 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
 	{
 		Datum		datum;
 
-		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
+		datum = index_attiternext(tuple, i + 1, giststate->leafTupdesc, &iter);
+		isnull[i] = iter.isNull;
 		gistdentryinit(giststate, i, &attdata[i],
 					   datum, r, p, o,
-					   false, isnull[i]);
+					   false, iter.isNull);
 	}
 }
 
@@ -439,6 +442,9 @@ gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
 		IndexTuple	itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
 		bool		zero_penalty;
 		int			j;
+		IAttrIterStateData iter;
+
+		index_attiterinit(itup, 1, giststate->leafTupdesc, &iter);
 
 		zero_penalty = true;
 
@@ -447,14 +453,13 @@ gistchoose(Relation r, Page p, IndexTuple it,	/* it has compressed entry */
 		{
 			Datum		datum;
 			float		usize;
-			bool		IsNull;
 
 			/* Compute penalty for this column. */
-			datum = index_getattr(itup, j + 1, giststate->leafTupdesc,
-								  &IsNull);
+			datum = index_attiternext(itup, j + 1, giststate->leafTupdesc,
+								  &iter);
 			gistdentryinit(giststate, j, &entry, datum, r, p, i,
-						   false, IsNull);
-			usize = gistpenalty(giststate, j, &entry, IsNull,
+						   false, iter.isNull);
+			usize = gistpenalty(giststate, j, &entry, iter.isNull,
 								&identry[j], isnull[j]);
 			if (usize > 0)
 				zero_penalty = false;
@@ -668,13 +673,17 @@ gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
 	MemoryContext oldcxt = MemoryContextSwitchTo(giststate->tempCxt);
 	Datum		fetchatt[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
+	IAttrIterStateData iter;
 	int			i;
 
+	index_attiterinit(tuple, 1, giststate->leafTupdesc, &iter);
+
 	for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(r); i++)
 	{
 		Datum		datum;
 
-		datum = index_getattr(tuple, i + 1, giststate->leafTupdesc, &isnull[i]);
+		datum = index_attiternext(tuple, i + 1, giststate->leafTupdesc, &iter);
+		isnull[i] = iter.isNull;
 
 		if (giststate->fetchFn[i].fn_oid != InvalidOid)
 		{
@@ -707,12 +716,13 @@ gistFetchTuple(GISTSTATE *giststate, Relation r, IndexTuple tuple)
 	}
 
 	/*
-	 * Get each included attribute.
+	 * Get each INCLUDEd attribute.
 	 */
 	for (; i < r->rd_att->natts; i++)
 	{
-		fetchatt[i] = index_getattr(tuple, i + 1, giststate->leafTupdesc,
-									&isnull[i]);
+		fetchatt[i] = index_attiternext(tuple, i + 1, giststate->leafTupdesc,
+										&iter);
+		isnull[i] = iter.isNull;
 	}
 	MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index d1177d8772..e002c11e8b 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -654,6 +654,7 @@ _bt_compare(Relation rel,
 	int			ncmpkey;
 	int			ntupatts;
 	int32		result;
+	IAttrIterStateData iter;
 
 	Assert(_bt_check_natts(rel, key->heapkeyspace, page, offnum));
 	Assert(key->keysz <= IndexRelationGetNumberOfKeyAttributes(rel));
@@ -685,23 +686,25 @@ _bt_compare(Relation rel,
 	Assert(key->heapkeyspace || ncmpkey == key->keysz);
 	Assert(!BTreeTupleIsPosting(itup) || key->allequalimage);
 	scankey = key->scankeys;
+
+	index_attiterinit(itup, 1, itupdesc, &iter);
+
 	for (int i = 1; i <= ncmpkey; i++)
 	{
 		Datum		datum;
-		bool		isNull;
 
-		datum = index_getattr(itup, scankey->sk_attno, itupdesc, &isNull);
+		datum = index_attiternext(itup, scankey->sk_attno, itupdesc, &iter);
 
 		if (scankey->sk_flags & SK_ISNULL)	/* key is NULL */
 		{
-			if (isNull)
+			if (iter.isNull)
 				result = 0;		/* NULL "=" NULL */
 			else if (scankey->sk_flags & SK_BT_NULLS_FIRST)
 				result = -1;	/* NULL "<" NOT_NULL */
 			else
 				result = 1;		/* NULL ">" NOT_NULL */
 		}
-		else if (isNull)		/* key is NOT_NULL and item is NULL */
+		else if (iter.isNull)		/* key is NOT_NULL and item is NULL */
 		{
 			if (scankey->sk_flags & SK_BT_NULLS_FIRST)
 				result = 1;		/* NOT_NULL ">" NULL */
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 2c4d7f6e25..07c1d3838a 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1241,21 +1241,24 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 			else if (itup != NULL)
 			{
 				int32		compare = 0;
+				IAttrIterStateData iter1;
+				IAttrIterStateData iter2;
+
+				index_attiterinit(itup, 1, tupdes, &iter1);
+				index_attiterinit(itup2, 1, tupdes, &iter2);
 
 				for (i = 1; i <= keysz; i++)
 				{
 					SortSupport entry;
 					Datum		attrDatum1,
 								attrDatum2;
-					bool		isNull1,
-								isNull2;
 
 					entry = sortKeys + i - 1;
-					attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
-					attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
+					attrDatum1 = index_attiternext(itup, i, tupdes, &iter1);
+					attrDatum2 = index_attiternext(itup2, i, tupdes, &iter2);
 
-					compare = ApplySortComparator(attrDatum1, isNull1,
-												  attrDatum2, isNull2,
+					compare = ApplySortComparator(attrDatum1, iter1.isNull,
+												  attrDatum2, iter2.isNull,
 												  entry);
 					if (compare > 0)
 					{
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index d524310723..87b0075fdc 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -96,6 +96,7 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 	int16	   *indoption;
 	int			tupnatts;
 	int			i;
+	IAttrIterStateData iter;
 
 	itupdesc = RelationGetDescr(rel);
 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
@@ -126,11 +127,13 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 	key->scantid = key->heapkeyspace && itup ?
 		BTreeTupleGetHeapTID(itup) : NULL;
 	skey = key->scankeys;
+
+	index_attiterinit(itup, 1, itupdesc, &iter);
+
 	for (i = 0; i < indnkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		Datum		arg;
-		bool		null;
 		int			flags;
 
 		/*
@@ -145,13 +148,13 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 		 * should never be used.
 		 */
 		if (i < tupnatts)
-			arg = index_getattr(itup, i + 1, itupdesc, &null);
+			arg = index_attiternext(itup, i + 1, itupdesc, &iter);
 		else
 		{
 			arg = (Datum) 0;
-			null = true;
+			iter.isNull = true;
 		}
-		flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
+		flags = (iter.isNull ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
 		ScanKeyEntryInitializeWithInfo(&skey[i],
 									   flags,
 									   (AttrNumber) (i + 1),
@@ -161,7 +164,7 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 									   procinfo,
 									   arg);
 		/* Record if any key attribute is NULL (or truncated) */
-		if (null)
+		if (iter.isNull)
 			key->anynullkeys = true;
 	}
 
@@ -1360,6 +1363,9 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
 	int			keysz;
 	int			ikey;
 	ScanKey		key;
+	IAttrIterStateData iter;
+	Datum		datum;
+	int			curattnum;
 
 	Assert(BTreeTupleGetNAtts(tuple, scan->indexRelation) == tupnatts);
 
@@ -1369,10 +1375,12 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
 	so = (BTScanOpaque) scan->opaque;
 	keysz = so->numberOfKeys;
 
+	index_attiterinit(tuple, 1, tupdesc, &iter);
+	curattnum = 0;
+	datum = (Datum) 0;
+
 	for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
 	{
-		Datum		datum;
-		bool		isNull;
 		Datum		test;
 
 		if (key->sk_attno > tupnatts)
@@ -1397,23 +1405,31 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
 			return false;
 		}
 
-		datum = index_getattr(tuple,
-							  key->sk_attno,
-							  tupdesc,
-							  &isNull);
+		/*
+		 * Attributes are received in sorded order, so we iterate until we
+		 * have the correct attribute. We will not see preceding attribute
+		 * again.
+		 * 
+		 * Note that we can see the same attribute many times; in which
+		 * case we will skip the index_attiternext call.
+		 */
+		for(; key->sk_attno > curattnum; curattnum++)
+		{
+			datum = index_attiternext(tuple, curattnum + 1, tupdesc, &iter);
+		}
 
 		if (key->sk_flags & SK_ISNULL)
 		{
 			/* Handle IS NULL/NOT NULL tests */
 			if (key->sk_flags & SK_SEARCHNULL)
 			{
-				if (isNull)
+				if (iter.isNull)
 					continue;	/* tuple satisfies this qual */
 			}
 			else
 			{
 				Assert(key->sk_flags & SK_SEARCHNOTNULL);
-				if (!isNull)
+				if (!iter.isNull)
 					continue;	/* tuple satisfies this qual */
 			}
 
@@ -1435,7 +1451,7 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
 			return false;
 		}
 
-		if (isNull)
+		if (iter.isNull)
 		{
 			if (key->sk_flags & SK_BT_NULLS_FIRST)
 			{
@@ -2349,6 +2365,8 @@ _bt_keep_natts(Relation rel, IndexTuple lastleft, IndexTuple firstright,
 	TupleDesc	itupdesc = RelationGetDescr(rel);
 	int			keepnatts;
 	ScanKey		scankey;
+	IAttrIterStateData iter1;
+	IAttrIterStateData iter2;
 
 	/*
 	 * _bt_compare() treats truncated key attributes as having the value minus
@@ -2360,20 +2378,22 @@ _bt_keep_natts(Relation rel, IndexTuple lastleft, IndexTuple firstright,
 
 	scankey = itup_key->scankeys;
 	keepnatts = 1;
+
+	index_attiterinit(lastleft, 1, itupdesc, &iter1);
+	index_attiterinit(firstright, 1, itupdesc, &iter2);
+
 	for (int attnum = 1; attnum <= nkeyatts; attnum++, scankey++)
 	{
 		Datum		datum1,
 					datum2;
-		bool		isNull1,
-					isNull2;
 
-		datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1);
-		datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2);
+		datum1 = index_attiternext(lastleft, attnum, itupdesc, &iter1);
+		datum2 = index_attiternext(firstright, attnum, itupdesc, &iter2);
 
-		if (isNull1 != isNull2)
+		if (iter1.isNull != iter2.isNull)
 			break;
 
-		if (!isNull1 &&
+		if (!iter1.isNull &&
 			DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
 											scankey->sk_collation,
 											datum1,
@@ -2421,24 +2441,27 @@ _bt_keep_natts_fast(Relation rel, IndexTuple lastleft, IndexTuple firstright)
 	TupleDesc	itupdesc = RelationGetDescr(rel);
 	int			keysz = IndexRelationGetNumberOfKeyAttributes(rel);
 	int			keepnatts;
+	IAttrIterStateData iter1;
+	IAttrIterStateData iter2;
+	
+	index_attiterinit(lastleft, 1, itupdesc, &iter1);
+	index_attiterinit(firstright, 1, itupdesc, &iter2);
 
 	keepnatts = 1;
 	for (int attnum = 1; attnum <= keysz; attnum++)
 	{
 		Datum		datum1,
 					datum2;
-		bool		isNull1,
-					isNull2;
 		Form_pg_attribute att;
 
-		datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1);
-		datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2);
+		datum1 = index_attiternext(lastleft, attnum, itupdesc, &iter1);
+		datum2 = index_attiternext(firstright, attnum, itupdesc, &iter2);
 		att = TupleDescAttr(itupdesc, attnum - 1);
 
-		if (isNull1 != isNull2)
+		if (iter1.isNull != iter2.isNull)
 			break;
 
-		if (!isNull1 &&
+		if (!iter1.isNull &&
 			!datum_image_eq(datum1, datum2, att->attbyval, att->attlen))
 			break;
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 22972071ff..18fa22cb26 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -4215,9 +4215,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 	int32		compare;
 	Datum		datum1,
 				datum2;
-	bool		isnull1,
-				isnull2;
-
+	IAttrIterStateData iter1;
+	IAttrIterStateData iter2;
 
 	/* Compare the leading sort key */
 	compare = ApplySortComparator(a->datum1, a->isnull1,
@@ -4231,14 +4230,17 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 	tuple2 = (IndexTuple) b->tuple;
 	keysz = state->nKeys;
 	tupDes = RelationGetDescr(state->indexRel);
+	
+	index_attiterinit(tuple1, 1, tupDes, &iter1);
+	index_attiterinit(tuple2, 1, tupDes, &iter2);
+
+	datum1 = index_attiternext(tuple1, 1, tupDes, &iter1);
+	datum2 = index_attiternext(tuple2, 1, tupDes, &iter2);
 
 	if (sortKey->abbrev_converter)
 	{
-		datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
-		datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
-
-		compare = ApplySortAbbrevFullComparator(datum1, isnull1,
-												datum2, isnull2,
+		compare = ApplySortAbbrevFullComparator(datum1, iter1.isNull,
+												datum2, iter2.isNull,
 												sortKey);
 		if (compare != 0)
 			return compare;
@@ -4251,17 +4253,17 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 	sortKey++;
 	for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
 	{
-		datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
-		datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
+		datum1 = index_attiternext(tuple1, nkey, tupDes, &iter1);
+		datum2 = index_attiternext(tuple2, nkey, tupDes, &iter2);
 
-		compare = ApplySortComparator(datum1, isnull1,
-									  datum2, isnull2,
+		compare = ApplySortComparator(datum1, iter1.isNull,
+									  datum2, iter2.isNull,
 									  sortKey);
 		if (compare != 0)
 			return compare;		/* done when we find unequal attributes */
 
 		/* they are equal, so we only need to examine one null flag */
-		if (isnull1)
+		if (iter1.isNull)
 			equal_hasnull = true;
 	}
 
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 1917375cde..78aa137ab0 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -59,6 +59,15 @@ typedef struct IndexAttributeBitMapData
 
 typedef IndexAttributeBitMapData * IndexAttributeBitMap;
 
+typedef struct IAttrIterStateData
+{
+	int			offset;
+	bool		slow;
+	bool		isNull;
+} IAttrIterStateData;
+
+typedef IAttrIterStateData * IAttrIterState;
+
 /*
  * t_info manipulation macros
  */
@@ -126,6 +135,80 @@ typedef IndexAttributeBitMapData * IndexAttributeBitMap;
 	) \
 )
 
+/* ----------------
+ *		index_attiterinit
+ *
+ *		This gets called many times, so we macro the cacheable and NULL
+ *		lookups, and call nocache_index_attiterinit() for the rest.
+ *
+ *		tup - the tuple being iterated on
+ *		attnum - the attribute number that we start the iteration with 
+ *				 in the first index_attiternext call
+ *		tupdesc - the tuple description
+ *
+ * ----------------
+ */
+#define index_attiterinit(tup, attnum, tupleDesc, iter) \
+( \
+	((attnum) == 1) ? \
+	( \
+		*(iter) = (IAttrIterStateData) { 0, false, false } \
+	) \
+	: \
+	!IndexTupleHasNulls(tup) && TupleDescAttr((tupleDesc), (attnum)-1)->attcacheoff >= 0 ? \
+	( \
+		*(iter) = (IAttrIterStateData) { \
+			TupleDescAttr((tupleDesc), (attnum)-1)->attcacheoff, /* offset */ \
+			TupleDescAttr((tupleDesc), (attnum)-1)->attlen >= 0, /* slow */ \
+			false /* isNull */ \
+		} \
+	) \
+	: \
+	nocache_index_attiterinit((tup), (attnum) - 1, (tupleDesc), (iter)) \
+)
+
+/* ----------------
+ *		index_attiternext
+ *
+ *		This gets called many times, so we macro the cacheable and NULL
+ *		lookups, and call nocache_index_attiternext() for the rest.
+ *
+ * ----------------
+ */
+#define index_attiternext(itup, attnum, tupleDesc, iter) \
+( \
+	AssertMacro(PointerIsValid(iter) && (attnum) > 0), \
+	(!IndexTupleHasNulls(itup)) ? \
+	( \
+		!(iter)->slow && TupleDescAttr((tupleDesc), (attnum) - 1)->attcacheoff >= 0 ? \
+		( \
+			(iter)->offset = att_addlength_pointer(TupleDescAttr((tupleDesc), \
+				(attnum) - 1)->attcacheoff, TupleDescAttr((tupleDesc), \
+				(attnum) - 1)->attlen, (char *) (itup) + \
+				IndexInfoFindDataOffset((itup)->t_info) + \
+				TupleDescAttr((tupleDesc), (attnum) - 1)->attcacheoff), \
+			(iter)->isNull = false,\
+			(iter)->slow = TupleDescAttr((tupleDesc), (attnum) - 1)->attlen < 0, \
+			(Datum) fetchatt(TupleDescAttr((tupleDesc), (attnum) - 1), \
+				(char *) (itup) + IndexInfoFindDataOffset((itup)->t_info) + \
+				TupleDescAttr((tupleDesc), (attnum) - 1)->attcacheoff) \
+		) \
+		: \
+		nocache_index_attiternext((itup), (attnum), (tupleDesc), (iter)) \
+	) \
+	: \
+	( \
+		att_isnull((attnum) - 1, (char *) (itup) + sizeof(IndexTupleData)) ? \
+		( \
+			(iter)->isNull = true, \
+			(iter)->slow = true, \
+			(Datum) 0 \
+		) \
+		: \
+		nocache_index_attiternext((itup), (attnum), (tupleDesc), (iter)) \
+	) \
+)
+
 /*
  * MaxIndexTuplesPerPage is an upper bound on the number of tuples that can
  * fit on one index page.  An index tuple must have either data or a null
@@ -160,5 +243,7 @@ extern void index_deform_tuple_internal(TupleDesc tupleDescriptor,
 extern IndexTuple CopyIndexTuple(IndexTuple source);
 extern IndexTuple index_truncate_tuple(TupleDesc sourceDescriptor,
 									   IndexTuple source, int leavenatts);
+extern void nocache_index_attiterinit(IndexTuple tup, AttrNumber attnum, TupleDesc tupleDesc, IAttrIterState iter);
+extern Datum nocache_index_attiternext(IndexTuple tup, AttrNumber attnum, TupleDesc tupleDesc, IAttrIterState iter);
 
 #endif							/* ITUP_H */
-- 
2.20.1

Reply via email to