From be9c70fa814a6eaa44f84c0c32b5375107aa19d2 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Mon, 23 Jun 2025 22:46:05 -0400
Subject: [PATCH v4 1/2] Make handling of redundant nbtree keys more robust.

nbtree preprocessing's handling of redundant (and contradictory) keys
created problems for scans with = arrays.  It was just about possible
for a scan with an = array and one or more redundant keys (keys that
preprocessing could not eliminate due an incomplete opfamily and a
cross-type key) to get stuck.  Testing has shown that infinite cycling
where the scan never manages to make forward progress was possible.
This could happen when the scan's arrays were reset in _bt_readpage's
forcenonrequired=true path (added by bugfix commit 5f4d98d4) when the
arrays weren't at least advanced up to the same point that they were in
at the start of the _bt_readpage call.  Earlier redundant keys prevented
the finaltup call to _bt_advance_array_keys from reaching lower-order
keys that needed to be used to sufficiently advance the scan's arrays.

It seems best to fix the bug by making preprocessing leave the scan's
keys in a state that is at least as close as possible to how it usually
leaves them (in the event of being unable to determine which keys are
truly redundant).  The design of _bt_advance_array_keys tacitly assumes
that there can be little interference from such redundant keys, so this
approach makes sense on general robustness grounds.

Now nbtree preprocessing reliably leaves behind at most one required
>/>= key per index column, and at most one required </<= key per index
column.  Columns that have one or more = keys that are eligible to be
marked required (based on the traditional rules) will now leave behind
only one of the = keys as the index column's only required key.
Standard preprocessing steps continue to mark the keys required by
following the traditional rules.  A new preprocessing step (used only
when preprocessing fails to handle a redundancy comprehensively) can now
take place after that.  This new step unmarks a subset of the keys, in
such a way as to allow _bt_advance_array_keys (and related code) to not
have to deal with redundant required keys at all.

Keys that are not marked required (whether due to the new preprocessing
step running or for some other reason) are relocated to the end of the
so->keyData[] array as needed.  That way they'll always be evaluated
after the scan's required keys, and so cannot prevent code in places
like _bt_advance_array_keys from reaching any required key.

Also teach _bt_first to decide which initial positioning keys to use
based on the same requiredness markings that have long been used by
_bt_checkkeys/_bt_advance_array_keys.  This is a necessary condition for
reliably avoiding infinite cycling.  _bt_advance_array_keys expects to
be able to reason about what'll happen in the next _bt_first call should
it start another primitive index scan, by evaluating inequality keys
that were marked required in the opposite-to-scan scan direction only.
Now everybody (_bt_first, _bt_checkkeys, and _bt_advance_array_keys)
will always agree on which exact key will be used on each index column
to start and/or end the scan (except when row compare keys are involved,
which have similar problems not addressed by this commit).

An upcoming commit will finish off the work started by this commit by
harmonizing how _bt_first, _bt_checkkeys, and _bt_advance_array_keys
apply row compare keys to start and end scans.

This fixes what was arguably an oversight in either commit 5f4d98d4 or
commit 8a510275.

Author: Peter Geoghegan <pg@bowt.ie>
Discussion: https://postgr.es/m/CAH2-Wz=ds4M+3NXMgwxYxqU8MULaLf696_v5g=9WNmWL2=Uo2A@mail.gmail.com
---
 src/backend/access/nbtree/nbtpreprocesskeys.c | 380 ++++++++++++++++--
 src/backend/access/nbtree/nbtsearch.c         |  54 ++-
 src/backend/access/nbtree/nbtutils.c          | 136 +------
 3 files changed, 371 insertions(+), 199 deletions(-)

diff --git a/src/backend/access/nbtree/nbtpreprocesskeys.c b/src/backend/access/nbtree/nbtpreprocesskeys.c
index a136e4bbf..e4cb48359 100644
--- a/src/backend/access/nbtree/nbtpreprocesskeys.c
+++ b/src/backend/access/nbtree/nbtpreprocesskeys.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/nbtree.h"
+#include "common/int.h"
 #include "lib/qunique.h"
 #include "utils/array.h"
 #include "utils/lsyscache.h"
@@ -56,6 +57,8 @@ static void _bt_skiparray_strat_decrement(IndexScanDesc scan, ScanKey arraysk,
 										  BTArrayKeyInfo *array);
 static void _bt_skiparray_strat_increment(IndexScanDesc scan, ScanKey arraysk,
 										  BTArrayKeyInfo *array);
+static void _bt_unmark_extra_keys(IndexScanDesc scan, int *keyDataMap);
+static int	_bt_reorder_array_cmp(const void *a, const void *b);
 static ScanKey _bt_preprocess_array_keys(IndexScanDesc scan, int *new_numberOfKeys);
 static void _bt_preprocess_array_keys_final(IndexScanDesc scan, int *keyDataMap);
 static int	_bt_num_array_keys(IndexScanDesc scan, Oid *skip_eq_ops_out,
@@ -96,7 +99,7 @@ static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
  * incomplete sets of cross-type operators, we may fail to detect redundant
  * or contradictory keys, but we can survive that.)
  *
- * The output keys must be sorted by index attribute.  Presently we expect
+ * Required output keys are sorted by index attribute.  Presently we expect
  * (but verify) that the input keys are already so sorted --- this is done
  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
  * within each attribute may be done as a byproduct of the processing here.
@@ -127,29 +130,25 @@ static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
  * This has the potential to be much more efficient than a full index scan
  * (though it behaves like a full scan when there's many distinct "x" values).
  *
- * If possible, redundant keys are eliminated: we keep only the tightest
+ * Typically, redundant keys are eliminated: we keep only the tightest
  * >/>= bound and the tightest </<= bound, and if there's an = key then
  * that's the only one returned.  (So, we return either a single = key,
  * or one or two boundary-condition keys for each attr.)  However, if we
  * cannot compare two keys for lack of a suitable cross-type operator,
- * we cannot eliminate either.  If there are two such keys of the same
- * operator strategy, the second one is just pushed into the output array
- * without further processing here.  We may also emit both >/>= or both
- * </<= keys if we can't compare them.  The logic about required keys still
- * works if we don't eliminate redundant keys.
+ * we cannot eliminate either key.
  *
- * Note that one reason we need direction-sensitive required-key flags is
- * precisely that we may not be able to eliminate redundant keys.  Suppose
+ * When all redundant keys could not be eliminated, we'll still output a
+ * so->keyData[] array that's as close as possible to what we'd output in the
+ * typical case where all redundant keys could be fully eliminated.  Suppose
  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
  * which key is more restrictive for lack of a suitable cross-type operator.
- * _bt_first will arbitrarily pick one of the keys to do the initial
- * positioning with.  If it picks x > 4, then the x > 10 condition will fail
- * until we reach index entries > 10; but we can't stop the scan just because
- * x > 10 is failing.  On the other hand, if we are scanning backwards, then
- * failure of either key is indeed enough to stop the scan.  (In general, when
- * inequality keys are present, the initial-positioning code only promises to
- * position before the first possible match, not exactly at the first match,
- * for a forward scan; or after the last match for a backward scan.)
+ * We'll arbitrarily pick one of the keys to mark required/do the initial
+ * positioning with.  If x > 4 is chosen then the x > 10 condition will fail
+ * until we reach index entries > 10.  x > 10 will be output at the end of
+ * so->keyData[], after the x > 4 key (i.e. after all required keys).  Note
+ * that this means that so->keyData[] won't always have keys in index
+ * attribute order (only the initial group of required keys are reliably
+ * output in attribute order).
  *
  * As a byproduct of this work, we can detect contradictory quals such
  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = false,
@@ -193,6 +192,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
 	ScanKey		arrayKeyData;
 	int		   *keyDataMap = NULL;
 	int			arrayidx = 0;
+	bool		comparisonfailed = false;
 
 	if (so->numberOfKeys > 0)
 	{
@@ -388,7 +388,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
 						xform[j].inkey = NULL;
 						xform[j].inkeyi = -1;
 					}
-					/* else, cannot determine redundancy, keep both keys */
+					else
+						comparisonfailed = true;
 				}
 				/* track number of attrs for which we have "=" keys */
 				numberOfEqualCols++;
@@ -409,6 +410,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
 					else
 						xform[BTLessStrategyNumber - 1].inkey = NULL;
 				}
+				else
+					comparisonfailed = true;
 			}
 
 			/* try to keep only one of >, >= */
@@ -426,6 +429,8 @@ _bt_preprocess_keys(IndexScanDesc scan)
 					else
 						xform[BTGreaterStrategyNumber - 1].inkey = NULL;
 				}
+				else
+					comparisonfailed = true;
 			}
 
 			/*
@@ -466,25 +471,6 @@ _bt_preprocess_keys(IndexScanDesc scan)
 		/* check strategy this key's operator corresponds to */
 		j = inkey->sk_strategy - 1;
 
-		/* if row comparison, push it directly to the output array */
-		if (inkey->sk_flags & SK_ROW_HEADER)
-		{
-			ScanKey		outkey = &so->keyData[new_numberOfKeys++];
-
-			memcpy(outkey, inkey, sizeof(ScanKeyData));
-			if (arrayKeyData)
-				keyDataMap[new_numberOfKeys - 1] = i;
-			if (numberOfEqualCols == attno - 1)
-				_bt_mark_scankey_required(outkey);
-
-			/*
-			 * We don't support RowCompare using equality; such a qual would
-			 * mess up the numberOfEqualCols tracking.
-			 */
-			Assert(j != (BTEqualStrategyNumber - 1));
-			continue;
-		}
-
 		if (inkey->sk_strategy == BTEqualStrategyNumber &&
 			(inkey->sk_flags & SK_SEARCHARRAY))
 		{
@@ -591,11 +577,6 @@ _bt_preprocess_keys(IndexScanDesc scan)
 				 * We can't determine which key is more restrictive.  Push
 				 * xform[j] directly to the output array, then set xform[j] to
 				 * the new scan key.
-				 *
-				 * Note: We do things this way around so that our arrays are
-				 * always in the same order as their corresponding scan keys,
-				 * even with incomplete opfamilies.  _bt_advance_array_keys
-				 * depends on this.
 				 */
 				ScanKey		outkey = &so->keyData[new_numberOfKeys++];
 
@@ -607,6 +588,7 @@ _bt_preprocess_keys(IndexScanDesc scan)
 				xform[j].inkey = inkey;
 				xform[j].inkeyi = i;
 				xform[j].arrayidx = arrayidx;
+				comparisonfailed = true;
 			}
 		}
 	}
@@ -622,6 +604,16 @@ _bt_preprocess_keys(IndexScanDesc scan)
 	if (arrayKeyData)
 		_bt_preprocess_array_keys_final(scan, keyDataMap);
 
+	/*
+	 * If all prior preprocessing steps failed to eliminate any keys due to a
+	 * lack of suitable cross-type support (and a cross-type key), unset their
+	 * required markings now.  This ensures that there's never ambiguity about
+	 * which key must be used to determine where the scan starts and/or ends
+	 * for a given attribute (even when it's an _implicit_ NOT NULL key).
+	 */
+	if (unlikely(comparisonfailed && so->qual_ok))
+		_bt_unmark_extra_keys(scan, keyDataMap);
+
 	/* Could pfree arrayKeyData/keyDataMap now, but not worth the cycles */
 }
 
@@ -847,8 +839,7 @@ _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
 				cmp_op;
 	StrategyNumber strat;
 
-	Assert(!((leftarg->sk_flags | rightarg->sk_flags) &
-			 (SK_ROW_HEADER | SK_ROW_MEMBER)));
+	Assert(!((leftarg->sk_flags | rightarg->sk_flags) & SK_ROW_MEMBER));
 
 	/*
 	 * First, deal with cases where one or both args are NULL.  This should
@@ -924,6 +915,16 @@ _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
 		return true;
 	}
 
+	/*
+	 * We don't yet know how to determine redundancy when it involves a row
+	 * compare key (barring simple cases involving IS NULL/IS NOT NULL)
+	 */
+	if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ROW_HEADER)
+	{
+		Assert(!((leftarg->sk_flags | rightarg->sk_flags) & SK_BT_SKIP));
+		return false;
+	}
+
 	/*
 	 * If either leftarg or rightarg are equality-type array scankeys, we need
 	 * specialized handling (since by now we know that IS NULL wasn't used)
@@ -1467,6 +1468,297 @@ _bt_skiparray_strat_increment(IndexScanDesc scan, ScanKey arraysk,
 	}
 }
 
+/*
+ *	_bt_unmark_extra_keys() -- make "extra" so->keyData[] keys nonrequired.
+ *
+ * When _bt_preprocess_keys failed to eliminate one or more redundant keys, it
+ * calls here to make sure that no index attribute has more than one > or >=
+ * key marked required, and no more than one required < or <= key.  Attributes
+ * with = keys will always get one = key as their required key.  All other
+ * keys that were initially marked required get "unmarked" here.  This ensures
+ * that _bt_first and _bt_checkkeys reliably have exactly the same ideas about
+ * which keys to use to start and end the scan.  Scans with array keys rely on
+ * this guarantee.  They expect to be able to unambiguously determine whether
+ * starting another primscan (given the current array key elements, and all
+ * other keys) will land the scan on some later leaf page.
+ *
+ * We also relocate keys that become/started out nonrequired to the end of
+ * so->keyData[].  That way, _bt_first/_bt_checkkeys/_bt_advance_array_keys
+ * cannot fail to reach a required key due to some earlier nonrequired key.
+ *
+ * so->keyData[] won't be in strict attribute order when we're done with it,
+ * but that's okay; the group of remaining required scan keys that'll appear
+ * first in so->keyData[] retain their original order, which is good enough.
+ * (The group of nonrequired keys that'll appear at the end of so->keyData[]
+ * also retain their original order, though any order works for these keys.)
+ *
+ * Only call here when _bt_compare_scankey_args returned false at least once
+ * (otherwise, calling here will just waste cycles).  We always unmark at
+ * least one key per call to _bt_compare_scankey_args that reported failure.
+ */
+static void
+_bt_unmark_extra_keys(IndexScanDesc scan, int *keyDataMap)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	AttrNumber	curattr;
+	ScanKey		origkey;
+	bool	   *unmarkikey;
+	int			nunmark,
+				nunmarked,
+				nkept,
+				firsti;
+	ScanKey		keepKeys,
+				unmarkKeys;
+	FmgrInfo   *keepOrderProcs = NULL,
+			   *unmarkOrderProcs = NULL;
+	bool		haveReqEquals,
+				haveReqForward,
+				haveReqBackward;
+
+	/*
+	 * Do an initial pass over so->keyData[] that determines which keys to
+	 * keeo as required, and which to mark non-required.
+	 *
+	 * When both equality and inequality keys remain on a single attribute, we
+	 * *must* leave only the equality key as required to continue the scan.
+	 * Keys can only be considered required when they're on the first index
+	 * column, or when all earlier index columns have a required = key.
+	 */
+	unmarkikey = palloc0(so->numberOfKeys * sizeof(bool));
+	nunmark = 0;
+
+	/* Set things up for first key's attr */
+	origkey = so->keyData;
+	curattr = origkey->sk_attno;
+	firsti = 0;
+	haveReqEquals = false;
+	haveReqForward = false;
+	haveReqBackward = false;
+	for (int i = 0; i < so->numberOfKeys; origkey++, i++)
+	{
+		if (origkey->sk_attno != curattr)
+		{
+			/* Reset for next attr */
+			curattr = origkey->sk_attno;
+			firsti = i;
+
+			haveReqEquals = false;
+			haveReqForward = false;
+			haveReqBackward = false;
+		}
+
+		/* First consider equalities */
+		if (haveReqEquals)
+		{
+			/*
+			 * We've already found the first "=" key for attr.  We already
+			 * decided that every other key on the same attr is to be unmarked
+			 */
+			Assert(!(origkey->sk_flags & SK_SEARCHNULL));
+			unmarkikey[i] = true;
+			nunmark++;
+			continue;
+		}
+		else if ((origkey->sk_flags & SK_BT_REQFWD) &&
+				 (origkey->sk_flags & SK_BT_REQBKWD))
+		{
+			/*
+			 * Found the first "=" key for attr.  All other keys on the same
+			 * attr will be unmarked.
+			 */
+			Assert(origkey->sk_strategy == BTEqualStrategyNumber);
+
+			haveReqEquals = true;
+			for (int j = firsti; j < i; j++)
+			{
+				/* We'll unmark prior keys for the same attr after all */
+				if (!unmarkikey[j])
+				{
+					unmarkikey[j] = true;
+					nunmark++;
+				}
+			}
+			continue;
+		}
+
+		/* Next consider inequalities */
+		if ((origkey->sk_flags & SK_BT_REQFWD) && !haveReqForward)
+		{
+			haveReqForward = true;
+			continue;
+		}
+		else if ((origkey->sk_flags & SK_BT_REQBKWD) && !haveReqBackward)
+		{
+			haveReqBackward = true;
+			continue;
+		}
+		unmarkikey[i] = true;
+		nunmark++;
+	}
+
+	/*
+	 * We're only supposed to be called when _bt_compare_scankey_args reported
+	 * failure when called from the main _bt_preprocess_keys loop
+	 */
+	Assert(nunmark > 0);
+
+	/*
+	 * Next, allocate temp arrays: one set for unchanged keys, another for
+	 * keys that will be unmarked/made non-required
+	 */
+	unmarkKeys = palloc(so->numberOfKeys * sizeof(ScanKeyData));
+	keepKeys = palloc(so->numberOfKeys * sizeof(ScanKeyData));
+	nunmarked = 0;
+	nkept = 0;
+	if (so->numArrayKeys)
+	{
+		unmarkOrderProcs = palloc(so->numberOfKeys * sizeof(FmgrInfo));
+		keepOrderProcs = palloc(so->numberOfKeys * sizeof(FmgrInfo));
+	}
+
+	/*
+	 * Next, copy the contents of so->keyData[] into the appropriate temp
+	 * array.
+	 *
+	 * Scans with = array keys need us to maintain invariants around the order
+	 * of so->orderProcs[] and so->arrayKeys[] relative to so->keyData[].  See
+	 * _bt_preprocess_array_keys_final for a full explanation.
+	 */
+	for (int i = 0; i < so->numberOfKeys; i++)
+	{
+		ScanKey		unmark;
+
+		origkey = so->keyData + i;
+
+		if (!unmarkikey[i])
+		{
+			/*
+			 * Key was chosen above all other keys on this key's attr (among
+			 * those keys with the same SK_BT_REQFWD/SK_BT_REQBKWD marking).
+			 *
+			 * Key will stay in its original position, unless we've unmarked
+			 * some earlier key (in which case this key gets placed earlier).
+			 */
+			memcpy(keepKeys + nkept, origkey, sizeof(ScanKeyData));
+
+			if (so->numArrayKeys)
+			{
+				keyDataMap[i] = nkept;
+				memcpy(keepOrderProcs + nkept, &so->orderProcs[i],
+					   sizeof(FmgrInfo));
+			}
+
+			nkept++;
+			continue;
+		}
+
+		/*
+		 * Key was _not_ chosen above all other keys on this key's attr (among
+		 * those keys with the same SK_BT_REQFWD/SK_BT_REQBKWD marking).
+		 *
+		 * Key will be unmarked, and moved to the end of the array, next to
+		 * other keys that we'll unmark (or that were always nonrequired).
+		 */
+		unmark = unmarkKeys + nunmarked;
+		memcpy(unmark, origkey, sizeof(ScanKeyData));
+
+		if (so->numArrayKeys)
+		{
+			keyDataMap[i] = (so->numberOfKeys - nunmark) + nunmarked;
+			memcpy(&unmarkOrderProcs[nunmarked], &so->orderProcs[i],
+				   sizeof(FmgrInfo));
+		}
+
+		/*
+		 * Preprocessing only generates skip arrays when it knows that they're
+		 * the only = key on the attr.  We favor = strategy keys over keys
+		 * with other strategies, so we should never unmark a skip array here.
+		 */
+		Assert(!(unmark->sk_flags & SK_BT_SKIP));
+
+		/*
+		 * Also shouldn't have to unmark an IS NULL or an IS NOT NULL key.
+		 * They aren't cross-type, so an incomplete opfamily can't matter.
+		 */
+		Assert(!(unmark->sk_flags & SK_ISNULL) ||
+			   !(unmark->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)));
+
+		/* Clear requiredness flags on redundant key (and on any subkeys) */
+		unmark->sk_flags &= ~(SK_BT_REQFWD | SK_BT_REQBKWD);
+		if (unmark->sk_flags & SK_ROW_HEADER)
+		{
+			ScanKey		subkey = (ScanKey) DatumGetPointer(origkey->sk_argument);
+
+			Assert(subkey->sk_strategy == origkey->sk_strategy);
+			for (;;)
+			{
+				Assert(subkey->sk_flags & SK_ROW_MEMBER);
+				subkey->sk_flags &= ~(SK_BT_REQFWD | SK_BT_REQBKWD);
+				if (subkey->sk_flags & SK_ROW_END)
+					break;
+				subkey++;
+			}
+		}
+
+		nunmarked++;
+	}
+
+	/*
+	 * Copy temp arrays back into so->keyData[]
+	 */
+	Assert(nkept == so->numberOfKeys - nunmark);
+	Assert(nunmarked == nunmark);
+	memcpy(so->keyData, keepKeys, sizeof(ScanKeyData) * nkept);
+	memcpy(so->keyData + nkept, unmarkKeys, sizeof(ScanKeyData) * nunmarked);
+
+	/* Done with temp arrays */
+	pfree(unmarkikey);
+	pfree(keepKeys);
+	pfree(unmarkKeys);
+
+	/*
+	 * Now copy so->orderProcs[] temp entries needed by scans with = array
+	 * keys back (just like with the so->keyData[] temp arrays)
+	 */
+	if (so->numArrayKeys)
+	{
+		memcpy(so->orderProcs, keepOrderProcs, sizeof(FmgrInfo) * nkept);
+		memcpy(so->orderProcs + nkept, unmarkOrderProcs,
+			   sizeof(FmgrInfo) * nunmarked);
+
+		/* Also fix-up array->scan_key references */
+		for (int arridx = 0; arridx < so->numArrayKeys; arridx++)
+		{
+			BTArrayKeyInfo *array = &so->arrayKeys[arridx];
+
+			array->scan_key = keyDataMap[array->scan_key];
+		}
+
+		/*
+		 * Also make sure that the scan's arrays appear in the expected order.
+		 * This must match corresponding scan key/so->orderProcs[] entries.
+		 */
+		qsort(so->arrayKeys, so->numArrayKeys, sizeof(BTArrayKeyInfo),
+			  _bt_reorder_array_cmp);
+
+		/* Done with temp arrays */
+		pfree(unmarkOrderProcs);
+		pfree(keepOrderProcs);
+	}
+}
+
+/*
+ * qsort comparator for reordering so->arrayKeys[] BTArrayKeyInfo entries
+ */
+static int
+_bt_reorder_array_cmp(const void *a, const void *b)
+{
+	BTArrayKeyInfo *arraya = (BTArrayKeyInfo *) a;
+	BTArrayKeyInfo *arrayb = (BTArrayKeyInfo *) b;
+
+	return pg_cmp_s32(arraya->scan_key, arrayb->scan_key);
+}
+
 /*
  *	_bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
  *
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 36544ecfd..0b997282e 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -960,6 +960,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
 	/*----------
 	 * Examine the scan keys to discover where we need to start the scan.
+	 * The selected scan keys (at most one per index column) are remembered by
+	 * storing their addresses into the local startKeys[] array.
 	 *
 	 * We want to identify the keys that can be used as starting boundaries;
 	 * these are =, >, or >= keys for a forward scan or =, <, <= keys for
@@ -968,25 +970,20 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * a > or < boundary or find an attribute with no boundary (which can be
 	 * thought of as the same as "> -infinity"), we can't use keys for any
 	 * attributes to its right, because it would break our simplistic notion
-	 * of what initial positioning strategy to use.
+	 * of what initial positioning strategy to use.  We can only pick keys
+	 * that were marked required (in the direction opposite our own scan's)
+	 * during preprocessing.  In general, keys that we use to find an initial
+	 * position when scanning forwards are the same keys that'll end the scan
+	 * when scanning backwards (and vice-versa).
 	 *
 	 * When the scan keys include cross-type operators, _bt_preprocess_keys
-	 * may not be able to eliminate redundant keys; in such cases we will
-	 * arbitrarily pick a usable one for each attribute.  This is correct
-	 * but possibly not optimal behavior.  (For example, with keys like
-	 * "x >= 4 AND x >= 5" we would elect to scan starting at x=4 when
-	 * x=5 would be more efficient.)  Since the situation only arises given
-	 * a poorly-worded query plus an incomplete opfamily, live with it.
-	 *
-	 * When both equality and inequality keys appear for a single attribute
-	 * (again, only possible when cross-type operators appear), we *must*
-	 * select one of the equality keys for the starting point, because
-	 * _bt_checkkeys() will stop the scan as soon as an equality qual fails.
-	 * For example, if we have keys like "x >= 4 AND x = 10" and we elect to
-	 * start at x=4, we will fail and stop before reaching x=10.  If multiple
-	 * equality quals survive preprocessing, however, it doesn't matter which
-	 * one we use --- by definition, they are either redundant or
-	 * contradictory.
+	 * may not be able to eliminate redundant keys; in such cases it will
+	 * arbitrarily pick a usable one for each attribute, making sure that the
+	 * other key isn't marked required to continue the scan.  It'll still be
+	 * clear which key we should use here, since _bt_preprocess_keys outputs
+	 * the non-required key at the end of so->keyData[], after all of the
+	 * scan's required keys; we stop considering further keys once we reach
+	 * the first nonrequired key (which won't be in index attribute order!).
 	 *
 	 * In practice we rarely see any "attribute boundary key gaps" here.
 	 * Preprocessing can usually backfill skip array keys for any attributes
@@ -996,10 +993,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * This happens with range skip arrays, which store inequality keys in the
 	 * array's low_compare/high_compare fields (used to find the first/last
 	 * set of matches, when = key will lack a usable sk_argument value).
-	 * These are always preferred over any redundant "standard" inequality
-	 * keys on the same column (per the usual rule about preferring = keys).
-	 * Note also that any column with an = skip array key can never have an
-	 * additional, contradictory = key.
 	 *
 	 * All keys (with the exception of SK_SEARCHNULL keys and SK_BT_SKIP
 	 * array keys whose array is "null_elem=true") imply a NOT NULL qualifier.
@@ -1014,9 +1007,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	 * first (leftmost) columns.  We'll add on lower-order columns of the row
 	 * comparison below, if possible.
 	 *
-	 * The selected scan keys (at most one per index column) are remembered by
-	 * storing their addresses into the local startKeys[] array.
-	 *
 	 * _bt_checkkeys/_bt_advance_array_keys decide whether and when to start
 	 * the next primitive index scan (for scans with array keys) based in part
 	 * on an understanding of how it'll enable us to reposition the scan.
@@ -1058,6 +1048,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 		{
 			if (i >= so->numberOfKeys || cur->sk_attno != curattr)
 			{
+				Assert(chosen == NULL ||
+					   (chosen->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)));
+
 				/*
 				 * Done looking at keys for curattr.
 				 *
@@ -1180,17 +1173,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
 				/*
 				 * Done if that was the last scan key output by preprocessing.
-				 * Also done if there is a gap index attribute that lacks a
-				 * usable key (only possible when preprocessing was unable to
-				 * generate a skip array key to "fill in the gap").
+				 * Also done if we've now examined all keys marked required.
 				 */
 				if (i >= so->numberOfKeys ||
-					cur->sk_attno != curattr + 1)
+					!(cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)))
 					break;
 
 				/*
 				 * Reset for next attr.
 				 */
+				Assert(cur->sk_attno == curattr + 1);
 				curattr = cur->sk_attno;
 				chosen = NULL;
 				impliesNN = NULL;
@@ -1216,8 +1208,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 					}
 					break;
 				case BTEqualStrategyNumber:
-					/* override any non-equality choice */
-					chosen = cur;
+					/* first equality key always wins */
+					if (chosen == NULL)
+						chosen = cur;
+					Assert(chosen->sk_strategy == BTEqualStrategyNumber);
 					break;
 				case BTGreaterEqualStrategyNumber:
 				case BTGreaterStrategyNumber:
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index c71d1b6f2..eb6dbfda3 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -44,7 +44,6 @@ static bool _bt_array_decrement(Relation rel, ScanKey skey, BTArrayKeyInfo *arra
 static bool _bt_array_increment(Relation rel, ScanKey skey, BTArrayKeyInfo *array);
 static bool _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir,
 											 bool *skip_array_set);
-static void _bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir);
 static bool _bt_tuple_before_array_skeys(IndexScanDesc scan, ScanDirection dir,
 										 IndexTuple tuple, TupleDesc tupdesc, int tupnatts,
 										 bool readpagetup, int sktrig, bool *scanBehind);
@@ -52,7 +51,6 @@ static bool _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
 								   IndexTuple tuple, int tupnatts, TupleDesc tupdesc,
 								   int sktrig, bool sktrig_required);
 #ifdef USE_ASSERT_CHECKING
-static bool _bt_verify_arrays_bt_first(IndexScanDesc scan, ScanDirection dir);
 static bool _bt_verify_keys_with_arraykeys(IndexScanDesc scan);
 #endif
 static bool _bt_oppodir_checkkeys(IndexScanDesc scan, ScanDirection dir,
@@ -1034,73 +1032,6 @@ _bt_advance_array_keys_increment(IndexScanDesc scan, ScanDirection dir,
 	return false;
 }
 
-/*
- * _bt_rewind_nonrequired_arrays() -- Rewind SAOP arrays not marked required
- *
- * Called when _bt_advance_array_keys decides to start a new primitive index
- * scan on the basis of the current scan position being before the position
- * that _bt_first is capable of repositioning the scan to by applying an
- * inequality operator required in the opposite-to-scan direction only.
- *
- * Although equality strategy scan keys (for both arrays and non-arrays alike)
- * are either marked required in both directions or in neither direction,
- * there is a sense in which non-required arrays behave like required arrays.
- * With a qual such as "WHERE a IN (100, 200) AND b >= 3 AND c IN (5, 6, 7)",
- * the scan key on "c" is non-required, but nevertheless enables positioning
- * the scan at the first tuple >= "(100, 3, 5)" on the leaf level during the
- * first descent of the tree by _bt_first.  Later on, there could also be a
- * second descent, that places the scan right before tuples >= "(200, 3, 5)".
- * _bt_first must never be allowed to build an insertion scan key whose "c"
- * entry is set to a value other than 5, the "c" array's first element/value.
- * (Actually, it's the first in the current scan direction.  This example uses
- * a forward scan.)
- *
- * Calling here resets the array scan key elements for the scan's non-required
- * arrays.  This is strictly necessary for correctness in a subset of cases
- * involving "required in opposite direction"-triggered primitive index scans.
- * Not all callers are at risk of _bt_first using a non-required array like
- * this, but advancement always resets the arrays when another primitive scan
- * is scheduled, just to keep things simple.  Array advancement even makes
- * sure to reset non-required arrays during scans that have no inequalities.
- * (Advancement still won't call here when there are no inequalities, though
- * that's just because it's all handled indirectly instead.)
- *
- * Note: _bt_verify_arrays_bt_first is called by an assertion to enforce that
- * everybody got this right.
- *
- * Note: In practice almost all SAOP arrays are marked required during
- * preprocessing (if necessary by generating skip arrays).  It is hardly ever
- * truly necessary to call here, but consistently doing so is simpler.
- */
-static void
-_bt_rewind_nonrequired_arrays(IndexScanDesc scan, ScanDirection dir)
-{
-	Relation	rel = scan->indexRelation;
-	BTScanOpaque so = (BTScanOpaque) scan->opaque;
-	int			arrayidx = 0;
-
-	for (int ikey = 0; ikey < so->numberOfKeys; ikey++)
-	{
-		ScanKey		cur = so->keyData + ikey;
-		BTArrayKeyInfo *array = NULL;
-
-		if (!(cur->sk_flags & SK_SEARCHARRAY) ||
-			cur->sk_strategy != BTEqualStrategyNumber)
-			continue;
-
-		array = &so->arrayKeys[arrayidx++];
-		Assert(array->scan_key == ikey);
-
-		if ((cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)))
-			continue;
-
-		Assert(array->num_elems != -1); /* No non-required skip arrays */
-
-		_bt_array_set_low_or_high(rel, cur, array,
-								  ScanDirectionIsForward(dir));
-	}
-}
-
 /*
  * _bt_tuple_before_array_skeys() -- too early to advance required arrays?
  *
@@ -1380,8 +1311,6 @@ _bt_start_prim_scan(IndexScanDesc scan, ScanDirection dir)
 	 */
 	if (so->needPrimScan)
 	{
-		Assert(_bt_verify_arrays_bt_first(scan, dir));
-
 		/*
 		 * Flag was set -- must call _bt_first again, which will reset the
 		 * scan's needPrimScan flag
@@ -2007,14 +1936,7 @@ _bt_advance_array_keys(IndexScanDesc scan, BTReadPageState *pstate,
 	 */
 	else if (has_required_opposite_direction_only && pstate->finaltup &&
 			 unlikely(!_bt_oppodir_checkkeys(scan, dir, pstate->finaltup)))
-	{
-		/*
-		 * Make sure that any SAOP arrays that were not marked required by
-		 * preprocessing are reset to their first element for this direction
-		 */
-		_bt_rewind_nonrequired_arrays(scan, dir);
 		goto new_prim_scan;
-	}
 
 continue_scan:
 
@@ -2045,8 +1967,6 @@ continue_scan:
 		 */
 		so->oppositeDirCheck = has_required_opposite_direction_only;
 
-		_bt_rewind_nonrequired_arrays(scan, dir);
-
 		/*
 		 * skip by setting "look ahead" mechanism's offnum for forwards scans
 		 * (backwards scans check scanBehind flag directly instead)
@@ -2142,48 +2062,6 @@ end_toplevel_scan:
 }
 
 #ifdef USE_ASSERT_CHECKING
-/*
- * Verify that the scan's qual state matches what we expect at the point that
- * _bt_start_prim_scan is about to start a just-scheduled new primitive scan.
- *
- * We enforce a rule against non-required array scan keys: they must start out
- * with whatever element is the first for the scan's current scan direction.
- * See _bt_rewind_nonrequired_arrays comments for an explanation.
- */
-static bool
-_bt_verify_arrays_bt_first(IndexScanDesc scan, ScanDirection dir)
-{
-	BTScanOpaque so = (BTScanOpaque) scan->opaque;
-	int			arrayidx = 0;
-
-	for (int ikey = 0; ikey < so->numberOfKeys; ikey++)
-	{
-		ScanKey		cur = so->keyData + ikey;
-		BTArrayKeyInfo *array = NULL;
-		int			first_elem_dir;
-
-		if (!(cur->sk_flags & SK_SEARCHARRAY) ||
-			cur->sk_strategy != BTEqualStrategyNumber)
-			continue;
-
-		array = &so->arrayKeys[arrayidx++];
-
-		if (((cur->sk_flags & SK_BT_REQFWD) && ScanDirectionIsForward(dir)) ||
-			((cur->sk_flags & SK_BT_REQBKWD) && ScanDirectionIsBackward(dir)))
-			continue;
-
-		if (ScanDirectionIsForward(dir))
-			first_elem_dir = 0;
-		else
-			first_elem_dir = array->num_elems - 1;
-
-		if (array->cur_elem != first_elem_dir)
-			return false;
-	}
-
-	return _bt_verify_keys_with_arraykeys(scan);
-}
-
 /*
  * Verify that the scan's "so->keyData[]" scan keys are in agreement with
  * its array key state
@@ -2194,6 +2072,7 @@ _bt_verify_keys_with_arraykeys(IndexScanDesc scan)
 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
 	int			last_sk_attno = InvalidAttrNumber,
 				arrayidx = 0;
+	bool		nonrequiredseen = false;
 
 	if (!so->qual_ok)
 		return false;
@@ -2217,8 +2096,16 @@ _bt_verify_keys_with_arraykeys(IndexScanDesc scan)
 		if (array->num_elems != -1 &&
 			cur->sk_argument != array->elem_values[array->cur_elem])
 			return false;
-		if (last_sk_attno > cur->sk_attno)
-			return false;
+		if (cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD))
+		{
+			if (last_sk_attno > cur->sk_attno)
+				return false;
+			if (nonrequiredseen)
+				return false;
+		}
+		else
+			nonrequiredseen = true;
+
 		last_sk_attno = cur->sk_attno;
 	}
 
@@ -2551,7 +2438,6 @@ _bt_set_startikey(IndexScanDesc scan, BTReadPageState *pstate)
 		if (!(key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)))
 		{
 			/* Scan key isn't marked required (corner case) */
-			Assert(!(key->sk_flags & SK_ROW_HEADER));
 			break;				/* unsafe */
 		}
 		if (key->sk_flags & SK_ROW_HEADER)
-- 
2.50.0

