On Mon, Mar 31, 2014 at 7:35 PM, Peter Geoghegan <p...@heroku.com> wrote:
> Okay. Attached revision only trusts strxfrm() blobs (as far as that
> goes) when the buffer passed to strxfrm() was sufficiently large that
> the blob could fully fit.

Attached revision has been further polished. I've added two additional
optimizations:

* Squeeze the last byte out of each Datum, so that on a 64-bit system,
the full 8 bytes are available to store strxfrm() blobs.

* Figure out when the strcoll() bttextfastcmp_locale() comparator is
called, if it was called because a poor man's comparison required it
(and *not* because it's the non-leading key in the traditional sense,
which implies there are no poorman's normalized keys in respect of
this attribute at all). This allows us to try and get away with a
straight memcmp if and when the lengths of the original text strings
match, on the assumption that when the initial poorman's comparison
didn't work out, and when the string lengths match, there is a very
good chance that both are equal, and on average it's a win to avoid
doing a strcoll() (along with the attendant copying around of buffers
for NULL-termination) entirely. Given that memcmp() is so much cheaper
than strcoll() anyway, this seems like a good trade-off.

-- 
Peter Geoghegan
*** a/src/backend/executor/nodeMergeAppend.c
--- b/src/backend/executor/nodeMergeAppend.c
*************** ExecInitMergeAppend(MergeAppend *node, E
*** 136,141 ****
--- 136,142 ----
  		sortKey->ssup_collation = node->collations[i];
  		sortKey->ssup_nulls_first = node->nullsFirst[i];
  		sortKey->ssup_attno = node->sortColIdx[i];
+ 		sortKey->leading = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
  	}
*** a/src/backend/utils/adt/varlena.c
--- b/src/backend/utils/adt/varlena.c
***************
*** 17,22 ****
--- 17,23 ----
  #include <ctype.h>
  #include <limits.h>
  
+ #include "access/nbtree.h"
  #include "access/tuptoaster.h"
  #include "catalog/pg_collation.h"
  #include "catalog/pg_type.h"
***************
*** 29,34 ****
--- 30,36 ----
  #include "utils/bytea.h"
  #include "utils/lsyscache.h"
  #include "utils/pg_locale.h"
+ #include "utils/sortsupport.h"
  
  
  /* GUC variable */
*************** typedef struct
*** 50,61 ****
--- 52,85 ----
  	int			skiptable[256]; /* skip distance for given mismatched char */
  } TextPositionState;
  
+ typedef struct
+ {
+ 	char	   *buf1;			/* 1st string, or poorman original string buf */
+ 	char	   *buf2;			/* 2nd string, or leading key/poor man blob */
+ 	int			buflen1;
+ 	int			buflen2;
+ #ifdef HAVE_LOCALE_T
+ 	pg_locale_t locale;
+ #endif
+ } TextSortSupport;
+ 
+ /*
+  * This should be large enough that most strings will be fit, but small enough
+  * that we feel comfortable putting it on the stack
+  */
+ #define TEXTBUFLEN		1024
+ 
  #define DatumGetUnknownP(X)			((unknown *) PG_DETOAST_DATUM(X))
  #define DatumGetUnknownPCopy(X)		((unknown *) PG_DETOAST_DATUM_COPY(X))
  #define PG_GETARG_UNKNOWN_P(n)		DatumGetUnknownP(PG_GETARG_DATUM(n))
  #define PG_GETARG_UNKNOWN_P_COPY(n) DatumGetUnknownPCopy(PG_GETARG_DATUM(n))
  #define PG_RETURN_UNKNOWN_P(x)		PG_RETURN_POINTER(x)
  
+ static void btpoorman_worker(SortSupport ssup, Oid collid);
+ static int bttextfastcmp_c(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup);
+ static int bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup);
+ static Datum bttext_convert(Datum d, SortSupport ssup);
  static int32 text_length(Datum str);
  static text *text_catenate(text *t1, text *t2);
  static text *text_substring(Datum str,
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1356,1365 ****
  	}
  	else
  	{
! #define STACKBUFLEN		1024
! 
! 		char		a1buf[STACKBUFLEN];
! 		char		a2buf[STACKBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
--- 1380,1387 ----
  	}
  	else
  	{
! 		char		a1buf[TEXTBUFLEN];
! 		char		a2buf[TEXTBUFLEN];
  		char	   *a1p,
  				   *a2p;
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1393,1416 ****
  			int			a2len;
  			int			r;
  
! 			if (len1 >= STACKBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = STACKBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= STACKBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = STACKBUFLEN;
  				a2p = a2buf;
  			}
  
--- 1415,1438 ----
  			int			a2len;
  			int			r;
  
! 			if (len1 >= TEXTBUFLEN / 2)
  			{
  				a1len = len1 * 2 + 2;
  				a1p = palloc(a1len);
  			}
  			else
  			{
! 				a1len = TEXTBUFLEN;
  				a1p = a1buf;
  			}
! 			if (len2 >= TEXTBUFLEN / 2)
  			{
  				a2len = len2 * 2 + 2;
  				a2p = palloc(a2len);
  			}
  			else
  			{
! 				a2len = TEXTBUFLEN;
  				a2p = a2buf;
  			}
  
*************** varstr_cmp(char *arg1, int len1, char *a
*** 1475,1485 ****
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= STACKBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= STACKBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
--- 1497,1507 ----
  		}
  #endif   /* WIN32 */
  
! 		if (len1 >= TEXTBUFLEN)
  			a1p = (char *) palloc(len1 + 1);
  		else
  			a1p = a1buf;
! 		if (len2 >= TEXTBUFLEN)
  			a2p = (char *) palloc(len2 + 1);
  		else
  			a2p = a2buf;
*************** bttextcmp(PG_FUNCTION_ARGS)
*** 1683,1688 ****
--- 1705,2039 ----
  	PG_RETURN_INT32(result);
  }
  
+ Datum
+ bttextsortsupport(PG_FUNCTION_ARGS)
+ {
+ 	SortSupport		ssup = (SortSupport) PG_GETARG_POINTER(0);
+ 	Oid				collid = ssup->ssup_collation;
+ 
+ 	btpoorman_worker(ssup, collid);
+ 
+ 	PG_RETURN_VOID();
+ }
+ 
+ static void
+ btpoorman_worker(SortSupport ssup, Oid collid)
+ {
+ 	TextSortSupport	   *tss;
+ 
+ 	/*
+ 	 * If LC_COLLATE = C, we can make things quite a bit faster by using
+ 	 * memcmp() rather than strcoll().  To minimize the per-comparison
+ 	 * overhead, we make this decision just once for the whole sort.  For now,
+ 	 * don't apply the "small string optimization" to C-locale, even though
+ 	 * that should work (without any strxfrm() processing).
+ 	 */
+ 	if (lc_collate_is_c(collid))
+ 	{
+ 		ssup->comparator = bttextfastcmp_c;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * WIN32 requires complex hacks when the database encoding is UTF-8
+ 	 * (except when using the "C" collation).  For now, we don't optimize that
+ 	 * case.
+ 	 */
+ #ifdef WIN32
+ 	if (GetDatabaseEncoding() == PG_UTF8)
+ 		PG_RETURN_VOID();
+ #endif /* WIN32 */
+ 
+ 	/*
+ 	 * We need a collation-sensitive comparison.  To make things faster,
+ 	 * we'll figure out the collation based on the locale id and cache the
+ 	 * result.  Also, since strxfrm()/strcoll() require NULL-terminated inputs,
+ 	 * prepare one or two palloc'd buffers to use as temporary workspace.  In
+ 	 * the ad-hoc comparison case we only use palloc'd buffers when we need
+ 	 * more space than we're comfortable allocating on the stack, but here we
+ 	 * can keep the buffers around for the whole sort, so it makes sense to
+ 	 * allocate them once and use them unconditionally (although we won't
+ 	 * actually need them when sorting proper actually begins and strxfrm()
+ 	 * conversion has already occurred, when sorting a leading key).
+ 	 */
+ 	tss = MemoryContextAlloc(ssup->ssup_cxt, sizeof(TextSortSupport));
+ #ifdef HAVE_LOCALE_T
+ 	tss->locale = 0;
+ #endif
+ 
+ 	if (collid != DEFAULT_COLLATION_OID)
+ 	{
+ 		if (!OidIsValid(collid))
+ 		{
+ 			/*
+ 			 * This typically means that the parser could not resolve a
+ 			 * conflict of implicit collations, so report it that way.
+ 			 */
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_INDETERMINATE_COLLATION),
+ 					 errmsg("could not determine which collation to use for string comparison"),
+ 					 errhint("Use the COLLATE clause to set the collation explicitly.")));
+ 		}
+ #ifdef HAVE_LOCALE_T
+ 		tss->locale = pg_newlocale_from_collation(collid);
+ #endif
+ 	}
+ 
+ 	tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen1 = TEXTBUFLEN;
+ 	tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, TEXTBUFLEN);
+ 	tss->buflen2 = TEXTBUFLEN;
+ 
+ 	ssup->ssup_extra = tss;
+ 	if (!ssup->leading)
+ 	{
+ 		ssup->comparator = bttextfastcmp_locale;
+ 		ssup->converter = NULL;
+ 		ssup->proper = NULL;
+ 		return;
+ 	}
+ 
+ 	ssup->comparator = bttextfastcmp_locale_poorman;
+ 	ssup->converter = bttext_convert;
+ 
+ 	ssup->proper = MemoryContextAllocZero(CurrentMemoryContext,
+ 										  sizeof(SortSupportData));
+ 	ssup->proper->ssup_cxt = ssup->ssup_cxt;
+ 	ssup->proper->ssup_collation = ssup->ssup_collation;
+ 	ssup->proper->ssup_reverse = ssup->ssup_reverse;
+ 	ssup->proper->ssup_nulls_first = ssup->ssup_nulls_first;
+ 	ssup->proper->ssup_attno = ssup->ssup_attno;
+ 
+ 	/*
+ 	 * Initialize the "proper" sortsupport state with a reliable
+ 	 * strcoll()-based comparator for tie-breaking.  This is the same thing
+ 	 * that this routine does directly for non-leading keys, which never have
+ 	 * poor man's normalized keys.
+ 	 *
+ 	 * This is the "true" leading key, which is technically not actually
+ 	 * leading.  The pseudo leading poor man's key is the leading key for
+ 	 * tuplesort's purposes.
+ 	 */
+ 	ssup->proper->leading = false;
+ 	ssup->proper->true_leading = true;
+ 	btpoorman_worker(ssup->proper, collid);
+ }
+ 
+ static int
+ bttextfastcmp_c(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	result = memcmp(a1p, a2p, Min(len1, len2));
+ 	if ((result == 0) && (len1 != len2))
+ 		result = (len1 < len2) ? -1 : 1;
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ static int
+ bttextfastcmp_locale(Datum x, Datum y, SortSupport ssup)
+ {
+ 	text	   *arg1 = DatumGetTextPP(x);
+ 	text	   *arg2 = DatumGetTextPP(y);
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	char	   *a1p,
+ 			   *a2p;
+ 	int			len1,
+ 				len2,
+ 	result;
+ 
+ 	a1p = VARDATA_ANY(arg1);
+ 	a2p = VARDATA_ANY(arg2);
+ 
+ 	len1 = VARSIZE_ANY_EXHDR(arg1);
+ 	len2 = VARSIZE_ANY_EXHDR(arg2);
+ 
+ 	if (ssup->true_leading && len1 == len2)
+ 	{
+ 		/*
+ 		 * "True" leading key.  This indicates that we're being called as a
+ 		 * reliable tie-breaker for the poor man's normalized key comparison
+ 		 * (there may be other attributes that must be subsequently compared,
+ 		 * but we're consulted first).
+ 		 *
+ 		 * In general there is a pretty good chance that we are being consulted
+ 		 * because the key is actually fully equal, especially since the two
+ 		 * text strings are already known to have the same length.  As such, it
+ 		 * seems worth it to try and give an answer using only a cheap memcmp()
+ 		 * comparison in anticipation of that reliably indicating equality
+ 		 * frequently enough for it to be worth it on balance.
+ 		 */
+ 		if (memcmp(a1p, a2p, len1) == 0)
+ 			return 0;
+ 	}
+ 
+ 	if (len1 >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 	if (len2 >= tss->buflen2)
+ 	{
+ 		pfree(tss->buf2);
+ 		tss->buflen2 *= 2;
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 	}
+ 
+ 	memcpy(tss->buf1, a1p, len1);
+ 	tss->buf1[len1] = '\0';
+ 	memcpy(tss->buf2, a2p, len2);
+ 	tss->buf2[len2] = '\0';
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		result = strcoll_l(tss->buf1, tss->buf2, tss->locale);
+ 	else
+ #endif
+ 		result = strcoll(tss->buf1, tss->buf2);
+ 
+ 	/*
+ 	 * In some locales strcoll() can claim that nonidentical strings are equal.
+ 	 * Believing that would be bad news for a number of reasons, so we follow
+ 	 * Perl's lead and sort "equal" strings according to strcmp().
+ 	 */
+ 	if (result == 0)
+ 		result = strcmp(tss->buf1, tss->buf2);
+ 
+ 	/* We can't afford to leak memory here. */
+ 	if (PointerGetDatum(arg1) != x)
+ 		pfree(arg1);
+ 	if (PointerGetDatum(arg2) != y)
+ 		pfree(arg2);
+ 
+ 	return result;
+ }
+ 
+ /*
+  * This is only used for the leading column of a sort operation, where
+  * bttext_convert() has already prepared a poor man's normalized key
+  * representation.  The result returned here can only be trusted when non-zero.
+  */
+ static int
+ bttextfastcmp_locale_poorman(Datum x, Datum y, SortSupport ssup)
+ {
+ 	int 	result;
+ 	int 	len1, len2;
+ 	char* a = (char*) &x;
+ 	char* b = (char*) &y;
+ 
+ 	if (a[sizeof(Datum) - 1] == '\0')
+ 		len1 = strlen(a);
+ 	else
+ 		len1 = sizeof(Datum);
+ 
+ 	if (b[sizeof(Datum) - 1] == '\0')
+ 		len2 = strlen(b);
+ 	else
+ 		len2 = sizeof(Datum);
+ 
+ 	result = memcmp(a, b, Min(len1, len2));
+ 
+ 	/*
+ 	 * Lengths here cannot reliably indicate original string length.  When
+ 	 * result = 0, leave it up to the bttextfastcmp_locale() tie-breaker that
+ 	 * the core system is obligated to perform (rather than matching
+ 	 * bttextfastcmp_c()'s length tie-break behavior).  Even a strcmp() on two
+ 	 * non-truncated strxfrm() blobs cannot indicate equality in a way that is
+ 	 * generally trustworthy, for the same reason that there is a strcoll()
+ 	 * strcmp() tie-breaker (there'd still need to be a strcmp() tie-breaker on
+ 	 * the *original* string).
+ 	 */
+ 	return result;
+ }
+ 
+ static Datum
+ bttext_convert(Datum d, SortSupport ssup)
+ {
+ 	Datum				res;
+ 	char			   *pres;
+ 	int					len;
+ 	TextSortSupport	   *tss = (TextSortSupport *) ssup->ssup_extra;
+ 	text			   *full = DatumGetTextPP(d);
+ 	Size				bsize;
+ 
+ 	/*
+ 	 * Convert text into a "poor man's normalized key".  This is a
+ 	 * pass-by-value Datum that is treated as a char array.
+ 	 */
+ 	pres = (char*) &res;
+ 	/* MemSet so non-copied bytes are always NULL */
+ 	MemSet(pres, 0, sizeof(Datum));
+ 	len = VARSIZE_ANY_EXHDR(full);
+ 
+ 	/* By convention, we use buffer 1 */
+ 	if (len >= tss->buflen1)
+ 	{
+ 		pfree(tss->buf1);
+ 		tss->buflen1 *= 2;
+ 		tss->buf1 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen1);
+ 	}
+ 
+ 	/* Just like strcoll(), strxfrm() expects a NULL-terminated string */
+ 	memcpy(tss->buf1, VARDATA_ANY(full), len);
+ 	tss->buf1[len] = '\0';
+ 
+ retry:
+ 
+ #ifdef HAVE_LOCALE_T
+ 	if (tss->locale)
+ 		bsize = strxfrm_l(tss->buf2, tss->buf1, tss->buflen2, tss->locale);
+ 	else
+ #endif
+ 		bsize = strxfrm(tss->buf2, tss->buf1, tss->buflen2);
+ 
+ 	if (bsize >= tss->buflen2)
+ 	{
+ 		/*
+ 		 * The standard requires that we fit the entire blob in temp buffer in
+ 		 * order for subsequent comparisons to be well-defined.  glibc doesn't
+ 		 * actually require this, but it doesn't seem to help performance to
+ 		 * depend on that.
+ 		 */
+ 		pfree(tss->buf2);
+ 		tss->buflen2 = Max(bsize + 1, tss->buflen2 * 2);
+ 		tss->buf2 = MemoryContextAlloc(ssup->ssup_cxt, tss->buflen2);
+ 		goto retry;
+ 	}
+ 
+ 	memcpy(pres, tss->buf2, Min(sizeof(Datum), bsize));
+ 
+ 	/*
+ 	 * Iff last byte isn't NULL, as in the common case where the entire Datum
+ 	 * is filled with blob bytes, that is interpreted as indicating that every
+ 	 * Datum byte should be compared.  Otherwise, the length is determined by
+ 	 * strlen().  This is safe because the strxfrm() blob is itself
+ 	 * NULL-terminated, leaving no danger of misinterpreting any NULL bytes not
+ 	 * intended to be interpreted as logically representing termination.
+ 	 */
+ 	return res;
+ }
  
  Datum
  text_larger(PG_FUNCTION_ARGS)
*** a/src/backend/utils/sort/sortsupport.c
--- b/src/backend/utils/sort/sortsupport.c
*************** PrepareSortSupportComparisonShim(Oid cmp
*** 82,87 ****
--- 82,89 ----
  
  	ssup->ssup_extra = extra;
  	ssup->comparator = comparison_shim;
+ 	ssup->converter = NULL;
+ 	ssup->proper = NULL;
  }
  
  /*
*************** PrepareSortSupportFromOrderingOp(Oid ord
*** 104,109 ****
--- 106,113 ----
  		elog(ERROR, "operator %u is not a valid ordering operator",
  			 orderingOp);
  
+ 	ssup->converter = NULL;
+ 
  	if (issupport)
  	{
  		/* The sort support function should provide a comparator */
*** a/src/backend/utils/sort/tuplesort.c
--- b/src/backend/utils/sort/tuplesort.c
*************** bool		optimize_bounded_sort = true;
*** 150,156 ****
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
--- 150,159 ----
   * When sorting single Datums, the data value is represented directly by
   * datum1/isnull1.	If the datatype is pass-by-reference and isnull1 is false,
   * then datum1 points to a separately palloc'd data value that is also pointed
!  * to by the "tuple" pointer; otherwise "tuple" is NULL.  Note that there are
!  * some exceptions, as when the sort support infrastructure provides a "poor
!  * man's normalized key" representation.   When that occurs, extra precautions
!  * are taken when a comparison involving a pair of datum1s returns 0.
   *
   * While building initial runs, tupindex holds the tuple's run number.  During
   * merge passes, we re-use it to hold the input tape number that each tuple in
*************** tuplesort_begin_heap(TupleDesc tupDesc,
*** 647,657 ****
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 
! 	if (nkeys == 1)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
--- 650,667 ----
  		sortKey->ssup_collation = sortCollations[i];
  		sortKey->ssup_nulls_first = nullsFirstFlags[i];
  		sortKey->ssup_attno = attNums[i];
+ 		sortKey->leading = (i == 0);
  
  		PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
  	}
! 	/*
! 	 * The "onlyKey" optimization cannot be used when a tie-breaker for an
! 	 * unreliable poor man's normalized key comparison is required.  Typically,
! 	 * the optimization is only of significant value to pass-by-value types
! 	 * anyway, whereas poor man's normalized keys are typically used by
! 	 * pass-by-reference types.
! 	 */
! 	if (nkeys == 1 && !state->sortKeys->converter)
  		state->onlyKey = state->sortKeys;
  
  	MemoryContextSwitchTo(oldcontext);
*************** comparetup_heap(const SortTuple *a, cons
*** 2870,2875 ****
--- 2880,2908 ----
  	rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
  	rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
  	tupDesc = state->tupDesc;
+ 	/* If leading comparison was unreliable, do reliable comparison */
+ 	if (state->sortKeys->converter)
+ 	{
+ 		AttrNumber	attno = sortKey->ssup_attno;
+ 		Datum		datum1,
+ 					datum2;
+ 		bool		isnull1,
+ 					isnull2;
+ 
+ 		Assert(attno == sortKey->proper->ssup_attno);
+ 		Assert(sortKey->proper->true_leading);
+ 		Assert(sortKey->leading);
+ 
+ 		datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+ 		datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+ 
+ 		compare = ApplySortComparator(datum1, isnull1,
+ 									  datum2, isnull2,
+ 									  sortKey->proper);
+ 		if (compare != 0)
+ 			return compare;
+ 	}
+ 
  	sortKey++;
  	for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
  	{
*************** copytup_heap(Tuplesortstate *state, Sort
*** 2910,2919 ****
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	stup->datum1 = heap_getattr(&htup,
! 								state->sortKeys[0].ssup_attno,
! 								state->tupDesc,
! 								&stup->isnull1);
  }
  
  static void
--- 2943,2972 ----
  	/* set up first-column key value */
  	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
  	htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
! 	if (!state->sortKeys->converter)
! 	{
! 		/* Store ordinary Datum representation */
! 		stup->datum1 = heap_getattr(&htup,
! 									state->sortKeys[0].ssup_attno,
! 									state->tupDesc,
! 									&stup->isnull1);
! 	}
! 	else
! 	{
! 		Datum		temp;
! 
! 		/*
! 		 * Store "poor man's normalized key", which cannot indicate equality in
! 		 * a trustworthy manner, and may require a tie-breaker
! 		 */
! 		Assert(state->sortKeys[0].leading);
! 		temp = heap_getattr(&htup,
! 							state->sortKeys[0].ssup_attno,
! 							state->tupDesc,
! 							&stup->isnull1);
! 
! 		stup->datum1 = state->sortKeys->converter(temp, state->sortKeys);
! 	}
  }
  
  static void
*** a/src/include/catalog/pg_amproc.h
--- b/src/include/catalog/pg_amproc.h
*************** DATA(insert (	1989   26 26 1 356 ));
*** 122,127 ****
--- 122,128 ----
  DATA(insert (	1989   26 26 2 3134 ));
  DATA(insert (	1991   30 30 1 404 ));
  DATA(insert (	1994   25 25 1 360 ));
+ DATA(insert (	1994   25 25 2 3492 ));
  DATA(insert (	1996   1083 1083 1 1107 ));
  DATA(insert (	2000   1266 1266 1 1358 ));
  DATA(insert (	2002   1562 1562 1 1672 ));
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
*************** DATA(insert OID = 3135 ( btnamesortsuppo
*** 610,615 ****
--- 610,617 ----
  DESCR("sort support");
  DATA(insert OID = 360 (  bttextcmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "25 25" _null_ _null_ _null_ _null_ bttextcmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
+ DATA(insert OID = 3492 ( bttextsortsupport PGNSP PGUID 12 1 0 0 0 f f f f t f i 1 0 2278 "2281" _null_ _null_ _null_ _null_ bttextsortsupport _null_ _null_ _null_ ));
+ DESCR("sort support");
  DATA(insert OID = 377 (  cash_cmp		   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "790 790" _null_ _null_ _null_ _null_ cash_cmp _null_ _null_ _null_ ));
  DESCR("less-equal-greater");
  DATA(insert OID = 380 (  btreltimecmp	   PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 23 "703 703" _null_ _null_ _null_ _null_ btreltimecmp _null_ _null_ _null_ ));
*** a/src/include/pg_config_manual.h
--- b/src/include/pg_config_manual.h
***************
*** 238,244 ****
   * You should normally use MEMORY_CONTEXT_CHECKING with USE_VALGRIND;
   * instrumentation of repalloc() is inferior without it.
   */
! /* #define USE_VALGRIND */
  
  /*
   * Define this to cause pfree()'d memory to be cleared immediately, to
--- 238,244 ----
   * You should normally use MEMORY_CONTEXT_CHECKING with USE_VALGRIND;
   * instrumentation of repalloc() is inferior without it.
   */
! #define USE_VALGRIND
  
  /*
   * Define this to cause pfree()'d memory to be cleared immediately, to
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
*************** extern Datum bttintervalcmp(PG_FUNCTION_
*** 312,317 ****
--- 312,318 ----
  extern Datum btcharcmp(PG_FUNCTION_ARGS);
  extern Datum btnamecmp(PG_FUNCTION_ARGS);
  extern Datum bttextcmp(PG_FUNCTION_ARGS);
+ extern Datum bttextsortsupport(PG_FUNCTION_ARGS);
  
  /*
   *		Per-opclass sort support functions for new btrees.	Like the
*** a/src/include/utils/sortsupport.h
--- b/src/include/utils/sortsupport.h
*************** typedef struct SortSupportData
*** 59,64 ****
--- 59,80 ----
  	 */
  	MemoryContext ssup_cxt;		/* Context containing sort info */
  	Oid			ssup_collation; /* Collation to use, or InvalidOid */
+ 	bool		leading;		/* Leading (poor man-applicable) key? */
+ 	bool		true_leading;	/* "True" (non-poorman's) leading key? */
+ 
+ 	/*
+ 	 * Alternative "proper" SortSupport state for leading key, used only when
+ 	 * an unreliable poor man's normalized key comparator returns 0, and the
+ 	 * core system must use this separate state to perform a trustworthy
+ 	 * comparison.  By convention this relates to the same attribute as our
+ 	 * ssup_attno.  When set initially by a SortSupport function, the core
+ 	 * system expects to be able to check if this is set, and independently
+ 	 * make reliable non-poorman's comparisons if and when this SortSupport
+ 	 * state returns an uncertain 0 result (the implication is that when this
+ 	 * applies, leading = true here, and this state relates to a poorman's
+ 	 * normalized key implementation).
+ 	 */
+ 	struct SortSupportData *proper;
  
  	/*
  	 * Additional sorting parameters; but unlike ssup_collation, these can be
*************** typedef struct SortSupportData
*** 69,75 ****
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should not be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
--- 85,91 ----
  	bool		ssup_nulls_first;		/* sort nulls first? */
  
  	/*
! 	 * These fields are workspace for callers, and should only be touched by
  	 * opclass-specific functions.
  	 */
  	AttrNumber	ssup_attno;		/* column number to sort */
*************** typedef struct SortSupportData
*** 96,103 ****
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Additional sort-acceleration functions might be added here later.
  	 */
  } SortSupportData;
  
  
--- 112,125 ----
  	int			(*comparator) (Datum x, Datum y, SortSupport ssup);
  
  	/*
! 	 * Given a datum involving an opclass with relevant support, use this
! 	 * callback to convert to a pass-by-value untrustworthy Datum, or poor
! 	 * man's normalized key.  Our sort support comparator expects this.  When
! 	 * this is set, tuplesort knows not to trust a comparison results of 0.
! 	 * However, -1 or 1 can be trusted, and taken as a perfect proxy for what
! 	 * the full leading key would indicate.
  	 */
+ 	Datum		(*converter) (Datum d, SortSupport ssup);
  } SortSupportData;
  
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to