On Sat, Mar 13, 2021 at 3:49 PM Thomas Munro <thomas.mu...@gmail.com> wrote: > On Fri, Mar 12, 2021 at 7:58 AM Andres Freund <and...@anarazel.de> wrote: > > I wish we had the same for bsearch... :) > > Glibc already has the definition of the traditional void-based > function in /usr/include/bits/stdlib-bsearch.h, so the generated code > when the compiler can see the comparator definition is already good in > eg lazy_tid_reaped() and eg some nbtree search routines. We could > probably expose more trivial comparators in headers to get more of > that, and we could perhaps put our own bsearch definition in a header > for other platforms that didn't think of that... > > It might be worth doing type-safe macro templates as well, though (as > I already did in an earlier proposal[1]), just to have nice type safe > code though, not sure, I'm thinking about that...
I remembered a very good reason to do this: the ability to do branch-free comparators in more places by introducing optional wider results. That's good for TIDs (needs 49 bits), and places that want to "reverse" a traditional comparator (just doing -result on an int comparator that might theoretically return INT_MIN requires at least 33 bits). So I rebased the relevant parts of my earlier version, and went through and wrote a bunch of examples to demonstrate all this stuff actually working. There are two categories of change in these patches: 0002-0005: Places that sort/unique/search OIDs, BlockNumbers and TIDs, which can reuse a small set of typed functions (a few more could be added, if useful). See sortitemptr.h and sortscalar.h. Mostly this is just a notational improvement, and an excuse to drop a bunch of duplicated code. In a few places this might really speed something important up! Like VACUUM's lazy_tid_reaped(). 0006-0009. Places where a specialised function is generated for one special purpose, such as ANALYZE's HeapTuple sort, tidbitmap.c's pagetable sort, some places in nbtree code etc. These may require some case-by-case research on whether the extra executable size is worth the speedup, and there are surely more opportunities like that; I just picked on these arbitrarily.
From d0fb306d2720d14be2d46f4ae4198b25a33a95fa Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 13 Mar 2021 17:29:44 +1300 Subject: [PATCH 1/9] Add bsearch and unique templates to sort_template.h. Since binary search and uniquify are so closely related to sorting, let's optionally generate compatible functions at the same time. Also, optionally support comparators with wider return types. This will allow us to do more branchless comparators. Also, adjust the ST_SORT template to cope with pointer types. --- src/include/lib/sort_template.h | 142 ++++++++++++++++++++++++++++---- 1 file changed, 124 insertions(+), 18 deletions(-) diff --git a/src/include/lib/sort_template.h b/src/include/lib/sort_template.h index 771c789ced..a6097e1ac5 100644 --- a/src/include/lib/sort_template.h +++ b/src/include/lib/sort_template.h @@ -3,7 +3,8 @@ * sort_template.h * * A template for a sort algorithm that supports varying degrees of - * specialization. + * specialization. Also related algorithms for binary search and + * unique, on sorted arrays. * * Copyright (c) 2021, PostgreSQL Global Development Group * Portions Copyright (c) 1992-1994, Regents of the University of California @@ -13,7 +14,9 @@ * To generate functions specialized for a type, the following parameter * macros should be #define'd before this file is included. * - * - ST_SORT - the name of a sort function to be generated + * - ST_SORT - if defined the name of a sort function + * - ST_UNIQUE - if defined the name of a unique function + * - ST_SEARCH - if defined the name of a search function * - ST_ELEMENT_TYPE - type of the referenced elements * - ST_DECLARE - if defined the functions and types are declared * - ST_DEFINE - if defined the functions and types are defined @@ -40,6 +43,10 @@ * * - ST_COMPARE_ARG_TYPE - type of extra argument * + * To say that the comparator returns a type other than int, use: + * + * - ST_COMPARE_TYPE - an integer type + * * The prototype of the generated sort function is: * * void ST_SORT(ST_ELEMENT_TYPE *data, size_t n, @@ -179,13 +186,35 @@ typedef int (*ST_COMPARATOR_TYPE_NAME) (const ST_ELEMENT_TYPE *, const ST_ELEMENT_TYPE *ST_SORT_PROTO_ARG); #endif +#ifdef ST_SORT /* Declare the sort function. Note optional arguments at end. */ -ST_SCOPE void ST_SORT(ST_ELEMENT_TYPE *first, size_t n +ST_SCOPE void ST_SORT(ST_ELEMENT_TYPE *array, + size_t n ST_SORT_PROTO_ELEMENT_SIZE ST_SORT_PROTO_COMPARE ST_SORT_PROTO_ARG); +#endif /* ST_SORT */ -#endif +#ifdef ST_SEARCH +/* Declare the search function. */ +ST_SCOPE ST_ELEMENT_TYPE *ST_SEARCH(ST_ELEMENT_TYPE *value, + ST_ELEMENT_TYPE *array, + size_t n + ST_SORT_PROTO_ELEMENT_SIZE + ST_SORT_PROTO_COMPARE + ST_SORT_PROTO_ARG); +#endif /* ST_SEARCH */ + +#ifdef ST_UNIQUE +ST_SCOPE size_t +ST_UNIQUE(ST_ELEMENT_TYPE *array, + size_t n + ST_SORT_PROTO_ELEMENT_SIZE + ST_SORT_PROTO_COMPARE + ST_SORT_PROTO_ARG); +#endif /* ST_UNIQUE */ + +#endif /* ST_DECLARE */ #ifdef ST_DEFINE @@ -201,16 +230,20 @@ ST_SCOPE void ST_SORT(ST_ELEMENT_TYPE *first, size_t n #define DO_CHECK_FOR_INTERRUPTS() #endif +#ifndef ST_COMPARE_TYPE +#define ST_COMPARE_TYPE int +#endif + /* * Create wrapper macros that know how to invoke compare, med3 and sort with * the right arguments. */ #ifdef ST_COMPARE_RUNTIME_POINTER -#define DO_COMPARE(a_, b_) ST_COMPARE((a_), (b_) ST_SORT_INVOKE_ARG) +#define DO_COMPARE(a_, b_) ((ST_COMPARE_TYPE) (ST_COMPARE((a_), (b_) ST_SORT_INVOKE_ARG))) #elif defined(ST_COMPARE_ARG_TYPE) -#define DO_COMPARE(a_, b_) ST_COMPARE((a_), (b_), arg) +#define DO_COMPARE(a_, b_) ((ST_COMPARE_TYPE) (ST_COMPARE((a_), (b_), arg))) #else -#define DO_COMPARE(a_, b_) ST_COMPARE((a_), (b_)) +#define DO_COMPARE(a_, b_) ((ST_COMPARE_TYPE) (ST_COMPARE((a_), (b_)))) #endif #define DO_MED3(a_, b_, c_) \ ST_MED3((a_), (b_), (c_) \ @@ -239,6 +272,8 @@ ST_SCOPE void ST_SORT(ST_ELEMENT_TYPE *first, size_t n #define DO_SWAP(a_, b_) DO_SWAPN((a_), (b_), element_size) #endif +#ifdef ST_SORT + /* * Find the median of three values. Currently, performance seems to be best * if the the comparator is inlined here, but the med3 function is not inlined @@ -281,18 +316,18 @@ ST_SORT(ST_ELEMENT_TYPE *data, size_t n ST_SORT_PROTO_COMPARE ST_SORT_PROTO_ARG) { - ST_POINTER_TYPE *a = (ST_POINTER_TYPE *) data, - *pa, - *pb, - *pc, - *pd, - *pl, - *pm, - *pn; + ST_POINTER_TYPE *a = (ST_POINTER_TYPE *) data; + ST_POINTER_TYPE *pa; + ST_POINTER_TYPE *pb; + ST_POINTER_TYPE *pc; + ST_POINTER_TYPE *pd; + ST_POINTER_TYPE *pl; + ST_POINTER_TYPE *pm; + ST_POINTER_TYPE *pn; size_t d1, d2; - int r, - presorted; + int presorted; + ST_COMPARE_TYPE r; loop: DO_CHECK_FOR_INTERRUPTS(); @@ -399,7 +434,75 @@ loop: } } } -#endif + +#endif /* ST_SORT */ + +#ifdef ST_SEARCH + +/* + * Find an element in the array of sorted values that is equal to a given + * value, in a sorted array. Return NULL if there is none. + */ +ST_SCOPE ST_ELEMENT_TYPE * +ST_SEARCH(ST_ELEMENT_TYPE *value, + ST_ELEMENT_TYPE *array, + size_t n + ST_SORT_PROTO_ELEMENT_SIZE + ST_SORT_PROTO_COMPARE + ST_SORT_PROTO_ARG) +{ + ssize_t left = 0, + right = n - 1; + + while (left <= right) + { + size_t mid = (left + right) / 2; + ST_ELEMENT_TYPE *element = &array[mid]; + ST_COMPARE_TYPE cmp = DO_COMPARE(element, value); + + if (cmp < 0) + left = mid + 1; + else if (cmp > 0) + right = mid - 1; + else + return element; + } + + return NULL; +} + +#endif /* ST_SEARCH */ + +#ifdef ST_UNIQUE + +/* + * Remove duplicates from an array. Return the new size. + */ +ST_SCOPE size_t +ST_UNIQUE(ST_ELEMENT_TYPE *array, + size_t n + ST_SORT_PROTO_ELEMENT_SIZE + ST_SORT_PROTO_COMPARE + ST_SORT_PROTO_ARG) +{ + size_t i, + j; + + if (n <= 1) + return n; + + for (i = 1, j = 0; i < n; ++i) + { + if (DO_COMPARE(&array[i], &array[j]) != 0 && ++j != i) + array[j] = array[i]; + } + + return j + 1; +} + +#endif /* ST_UNIQUE */ + +#endif /* ST_DEFINE */ #undef DO_CHECK_FOR_INTERRUPTS #undef DO_COMPARE @@ -411,6 +514,7 @@ loop: #undef ST_COMPARE #undef ST_COMPARE_ARG_TYPE #undef ST_COMPARE_RUNTIME_POINTER +#undef ST_COMPARE_TYPE #undef ST_ELEMENT_TYPE #undef ST_ELEMENT_TYPE_VOID #undef ST_MAKE_NAME @@ -420,6 +524,7 @@ loop: #undef ST_POINTER_STEP #undef ST_POINTER_TYPE #undef ST_SCOPE +#undef ST_SEARCH #undef ST_SORT #undef ST_SORT_INVOKE_ARG #undef ST_SORT_INVOKE_COMPARE @@ -429,3 +534,4 @@ loop: #undef ST_SORT_PROTO_ELEMENT_SIZE #undef ST_SWAP #undef ST_SWAPN +#undef ST_UNIQUE -- 2.30.1
From 9a0cb8be6e3146d9cde39c4d9291bf3990604823 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 10:00:25 +1300 Subject: [PATCH 2/9] Supply sort/search specializations for common scalar types. --- src/backend/utils/sort/Makefile | 1 + src/backend/utils/sort/sortscalar.c | 11 +++++++++++ src/include/utils/sortscalar.h | 26 ++++++++++++++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 src/backend/utils/sort/sortscalar.c create mode 100644 src/include/utils/sortscalar.h diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile index 26f65fcaf7..b4dce08614 100644 --- a/src/backend/utils/sort/Makefile +++ b/src/backend/utils/sort/Makefile @@ -17,6 +17,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ logtape.o \ sharedtuplestore.o \ + sortscalar.o \ sortsupport.o \ tuplesort.o \ tuplestore.o diff --git a/src/backend/utils/sort/sortscalar.c b/src/backend/utils/sort/sortscalar.c new file mode 100644 index 0000000000..026e472a1d --- /dev/null +++ b/src/backend/utils/sort/sortscalar.c @@ -0,0 +1,11 @@ +#include "postgres.h" + +#include "utils/sortscalar.h" + +#define ST_SORT qsort_uint32 +#define ST_ELEMENT_TYPE uint32 +#define ST_COMPARE(a, b) (((int64) *(a)) - ((int64) *(b))) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE +#define ST_DEFINE +#include "lib/sort_template.h" diff --git a/src/include/utils/sortscalar.h b/src/include/utils/sortscalar.h new file mode 100644 index 0000000000..932fe29d42 --- /dev/null +++ b/src/include/utils/sortscalar.h @@ -0,0 +1,26 @@ +#ifndef SORTSCALAR_H +#define SORTSCALAR_H + +/* We'll define the sort functions in sortscalar.c (they'd better match). */ +extern void qsort_uint32(uint32 *array, size_t n); + +/* The unique and search functions are defined here so they can be inlined. */ +#define ST_UNIQUE unique_uint32 +#define ST_SEARCH bsearch_uint32 +#define ST_ELEMENT_TYPE uint32 +#define ST_COMPARE(a, b) (((int64) *(a)) - ((int64) *(b))) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static inline +#define ST_DEFINE +#include "lib/sort_template.h" + +/* Provide names for other common types that are currently uint32. */ +#define qsort_oid qsort_uint32 +#define unique_oid unique_uint32 +#define bsearch_oid bsearch_uint32 + +#define qsort_blocknum qsort_uint32 +#define unique_blocknum unique_uint32 +#define bsearch_blocknum bsearch_uint32 + +#endif -- 2.30.1
From 441563a698a7af06dfb71d0eca1283a1ca7a7cf1 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 11:40:01 +1300 Subject: [PATCH 3/9] Use qsort_oid() and friends in obvious places. Done mainly for notational advantage over traditional qsort()/bsearch(). There may be some speedup, from inlined branchless comparators. --- src/backend/access/nbtree/nbtinsert.c | 30 +++++-------------------- src/backend/catalog/pg_enum.c | 3 ++- src/backend/catalog/pg_inherits.c | 3 ++- src/backend/commands/subscriptioncmds.c | 15 ++++++------- src/backend/utils/adt/acl.c | 5 +++-- src/backend/utils/cache/syscache.c | 30 +++++-------------------- 6 files changed, 24 insertions(+), 62 deletions(-) diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 0bc86943eb..09abf342de 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -19,11 +19,11 @@ #include "access/nbtxlog.h" #include "access/transam.h" #include "access/xloginsert.h" -#include "lib/qunique.h" #include "miscadmin.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "storage/smgr.h" +#include "utils/sortscalar.h" /* Minimum tree height for application of fastpath optimization */ #define BTREE_FASTPATH_MIN_LEVEL 2 @@ -70,7 +70,6 @@ static void _bt_simpledel_pass(Relation rel, Buffer buffer, Relation heapRel, static BlockNumber *_bt_deadblocks(Page page, OffsetNumber *deletable, int ndeletable, IndexTuple newitem, int *nblocks); -static inline int _bt_blk_cmp(const void *arg1, const void *arg2); /* * _bt_doinsert() -- Handle insertion of a single index tuple in the tree. @@ -2814,8 +2813,7 @@ _bt_simpledel_pass(Relation rel, Buffer buffer, Relation heapRel, if (!BTreeTupleIsPosting(itup)) { tidblock = ItemPointerGetBlockNumber(&itup->t_tid); - match = bsearch(&tidblock, deadblocks, ndeadblocks, - sizeof(BlockNumber), _bt_blk_cmp); + match = bsearch_blocknum(&tidblock, deadblocks, ndeadblocks); if (!match) { @@ -2845,8 +2843,7 @@ _bt_simpledel_pass(Relation rel, Buffer buffer, Relation heapRel, ItemPointer tid = BTreeTupleGetPostingN(itup, p); tidblock = ItemPointerGetBlockNumber(tid); - match = bsearch(&tidblock, deadblocks, ndeadblocks, - sizeof(BlockNumber), _bt_blk_cmp); + match = bsearch_blocknum(&tidblock, deadblocks, ndeadblocks); if (!match) { @@ -2966,25 +2963,8 @@ _bt_deadblocks(Page page, OffsetNumber *deletable, int ndeletable, } } - qsort(tidblocks, ntids, sizeof(BlockNumber), _bt_blk_cmp); - *nblocks = qunique(tidblocks, ntids, sizeof(BlockNumber), _bt_blk_cmp); + qsort_blocknum(tidblocks, ntids); + *nblocks = unique_blocknum(tidblocks, ntids); return tidblocks; } - -/* - * _bt_blk_cmp() -- qsort comparison function for _bt_simpledel_pass - */ -static inline int -_bt_blk_cmp(const void *arg1, const void *arg2) -{ - BlockNumber b1 = *((BlockNumber *) arg1); - BlockNumber b2 = *((BlockNumber *) arg2); - - if (b1 < b2) - return -1; - else if (b1 > b2) - return 1; - - return 0; -} diff --git a/src/backend/catalog/pg_enum.c b/src/backend/catalog/pg_enum.c index f958f1541d..1d9dacb516 100644 --- a/src/backend/catalog/pg_enum.c +++ b/src/backend/catalog/pg_enum.c @@ -30,6 +30,7 @@ #include "utils/fmgroids.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#include "utils/sortscalar.h" #include "utils/syscache.h" /* Potentially set by pg_upgrade_support functions */ @@ -108,7 +109,7 @@ EnumValuesCreate(Oid enumTypeOid, List *vals) } /* sort them, just in case OID counter wrapped from high to low */ - qsort(oids, num_elems, sizeof(Oid), oid_cmp); + qsort_oid(oids, num_elems); /* and make the entries */ memset(nulls, false, sizeof(nulls)); diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c index 5ab7902827..5cf76b2cad 100644 --- a/src/backend/catalog/pg_inherits.c +++ b/src/backend/catalog/pg_inherits.c @@ -29,6 +29,7 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/memutils.h" +#include "utils/sortscalar.h" #include "utils/syscache.h" /* @@ -111,7 +112,7 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode) * lock children in the same order to avoid needless deadlocks. */ if (numoids > 1) - qsort(oidarr, numoids, sizeof(Oid), oid_cmp); + qsort_oid(oidarr, numoids); /* * Acquire locks and build the result list. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bfd3514546..a24db5dca8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,6 +44,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/sortscalar.h" #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -608,8 +609,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), - sizeof(Oid), oid_cmp); + qsort_oid(subrel_local_oids, list_length(subrel_states)); /* * Rels that we want to remove from subscription and drop any slots @@ -640,8 +640,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) pubrel_local_oids[off++] = relid; - if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + if (!bsearch_oid(&relid, subrel_local_oids, + list_length(subrel_states))) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -656,16 +656,15 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) * Next remove state for tables we should not care about anymore using * the data we collected above */ - qsort(pubrel_local_oids, list_length(pubrel_names), - sizeof(Oid), oid_cmp); + qsort_oid(pubrel_local_oids, list_length(pubrel_names)); remove_rel_len = 0; for (off = 0; off < list_length(subrel_states); off++) { Oid relid = subrel_local_oids[off]; - if (!bsearch(&relid, pubrel_local_oids, - list_length(pubrel_names), sizeof(Oid), oid_cmp)) + if (!bsearch_oid(&relid, pubrel_local_oids, + list_length(pubrel_names))) { char state; XLogRecPtr statelsn; diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index c7f029e218..2b90ee4c88 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -38,6 +38,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/sortscalar.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -1496,7 +1497,7 @@ aclmembers(const Acl *acl, Oid **roleids) } /* Sort the array */ - qsort(list, j, sizeof(Oid), oid_cmp); + qsort_oid(list, j); /* * We could repalloc the array down to minimum size, but it's hardly worth @@ -1505,7 +1506,7 @@ aclmembers(const Acl *acl, Oid **roleids) *roleids = list; /* Remove duplicates from the array */ - return qunique(list, j, sizeof(Oid), oid_cmp); + return unique_oid(list, j); } diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index e4dc4ee34e..26ef27b50d 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -76,6 +76,7 @@ #include "lib/qunique.h" #include "utils/catcache.h" #include "utils/rel.h" +#include "utils/sortscalar.h" #include "utils/syscache.h" /*--------------------------------------------------------------------------- @@ -1006,8 +1007,6 @@ static int SysCacheRelationOidSize; static Oid SysCacheSupportingRelOid[SysCacheSize * 2]; static int SysCacheSupportingRelOidSize; -static int oid_compare(const void *a, const void *b); - /* * InitCatalogCache - initialize the caches @@ -1055,17 +1054,13 @@ InitCatalogCache(void) Assert(SysCacheSupportingRelOidSize <= lengthof(SysCacheSupportingRelOid)); /* Sort and de-dup OID arrays, so we can use binary search. */ - pg_qsort(SysCacheRelationOid, SysCacheRelationOidSize, - sizeof(Oid), oid_compare); + qsort_oid(SysCacheRelationOid, SysCacheRelationOidSize); SysCacheRelationOidSize = - qunique(SysCacheRelationOid, SysCacheRelationOidSize, sizeof(Oid), - oid_compare); + unique_oid(SysCacheRelationOid, SysCacheRelationOidSize); - pg_qsort(SysCacheSupportingRelOid, SysCacheSupportingRelOidSize, - sizeof(Oid), oid_compare); + qsort_oid(SysCacheSupportingRelOid, SysCacheSupportingRelOidSize); SysCacheSupportingRelOidSize = - qunique(SysCacheSupportingRelOid, SysCacheSupportingRelOidSize, - sizeof(Oid), oid_compare); + unique_oid(SysCacheSupportingRelOid, SysCacheSupportingRelOidSize); CacheInitialized = true; } @@ -1548,18 +1543,3 @@ RelationSupportsSysCache(Oid relid) return false; } - - -/* - * OID comparator for pg_qsort - */ -static int -oid_compare(const void *a, const void *b) -{ - Oid oa = *((const Oid *) a); - Oid ob = *((const Oid *) b); - - if (oa == ob) - return 0; - return (oa > ob) ? 1 : -1; -} -- 2.30.1
From 7e6091ee0163e0c49abcf99952c493c05baaaab7 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 15:11:22 +1300 Subject: [PATCH 4/9] Supply specialized sort/search routines for ItemPtrData. --- src/backend/utils/sort/Makefile | 1 + src/backend/utils/sort/sortitemptr.c | 12 ++++++++++++ src/include/utils/sortitemptr.h | 19 +++++++++++++++++++ 3 files changed, 32 insertions(+) create mode 100644 src/backend/utils/sort/sortitemptr.c create mode 100644 src/include/utils/sortitemptr.h diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile index b4dce08614..32acbb7f20 100644 --- a/src/backend/utils/sort/Makefile +++ b/src/backend/utils/sort/Makefile @@ -17,6 +17,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ logtape.o \ sharedtuplestore.o \ + sortitemptr.o \ sortscalar.o \ sortsupport.o \ tuplesort.o \ diff --git a/src/backend/utils/sort/sortitemptr.c b/src/backend/utils/sort/sortitemptr.c new file mode 100644 index 0000000000..a0db6d6d0d --- /dev/null +++ b/src/backend/utils/sort/sortitemptr.c @@ -0,0 +1,12 @@ +#include "postgres.h" + +#include "catalog/index.h" +#include "utils/sortitemptr.h" + +#define ST_SORT qsort_itemptr +#define ST_SCOPE +#define ST_ELEMENT_TYPE ItemPointerData +#define ST_COMPARE(a, b) (itemptr_encode(a) - itemptr_encode(b)) +#define ST_COMPARE_TYPE int64 +#define ST_DEFINE +#include "lib/sort_template.h" diff --git a/src/include/utils/sortitemptr.h b/src/include/utils/sortitemptr.h new file mode 100644 index 0000000000..eb3df25eec --- /dev/null +++ b/src/include/utils/sortitemptr.h @@ -0,0 +1,19 @@ +#ifndef SORTITEMPTR_H +#define SORTITEMPTR_H + +#include "catalog/index.h" + +/* Declare sort function, from .c file. */ +extern void qsort_itemptr(ItemPointerData *tids, size_t n); + +/* Search and unique functions inline in header. */ +#define ST_SEARCH bsearch_itemptr +#define ST_UNIQUE unique_itemptr +#define ST_ELEMENT_TYPE ItemPointerData +#define ST_COMPARE(a, b) (itemptr_encode(a) - itemptr_encode(b)) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static inline +#define ST_DEFINE +#include "lib/sort_template.h" + +#endif -- 2.30.1
From eed4a91d7742166124c26dbdadd41aa4695cc5b8 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 13 Mar 2021 18:48:10 +1300 Subject: [PATCH 5/9] Use qsort_itemptr() and friends in various places. --- src/backend/access/heap/vacuumlazy.c | 40 +++------------------------- src/backend/executor/nodeTidscan.c | 32 +++------------------- 2 files changed, 7 insertions(+), 65 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index a65dcbebfa..ef57e88b43 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -79,6 +79,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_rusage.h" +#include "utils/sortitemptr.h" #include "utils/timestamp.h" @@ -367,7 +368,6 @@ static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks); static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr); static bool lazy_tid_reaped(ItemPointer itemptr, void *state); -static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(Relation rel, Buffer buf, LVRelStats *vacrelstats, TransactionId *visibility_cutoff_xid, bool *all_frozen); @@ -2943,45 +2943,13 @@ lazy_tid_reaped(ItemPointer itemptr, void *state) if (item < litem || item > ritem) return false; - res = (ItemPointer) bsearch((void *) itemptr, - (void *) dead_tuples->itemptrs, - dead_tuples->num_tuples, - sizeof(ItemPointerData), - vac_cmp_itemptr); + res = bsearch_itemptr(itemptr, + dead_tuples->itemptrs, + dead_tuples->num_tuples); return (res != NULL); } -/* - * Comparator routines for use with qsort() and bsearch(). - */ -static int -vac_cmp_itemptr(const void *left, const void *right) -{ - BlockNumber lblk, - rblk; - OffsetNumber loff, - roff; - - lblk = ItemPointerGetBlockNumber((ItemPointer) left); - rblk = ItemPointerGetBlockNumber((ItemPointer) right); - - if (lblk < rblk) - return -1; - if (lblk > rblk) - return 1; - - loff = ItemPointerGetOffsetNumber((ItemPointer) left); - roff = ItemPointerGetOffsetNumber((ItemPointer) right); - - if (loff < roff) - return -1; - if (loff > roff) - return 1; - - return 0; -} - /* * Check if every tuple in the given page is visible to all current and future * transactions. Also return the visibility_cutoff_xid which is the highest diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c index 48c3737da2..ee6bc4ee05 100644 --- a/src/backend/executor/nodeTidscan.c +++ b/src/backend/executor/nodeTidscan.c @@ -33,6 +33,7 @@ #include "storage/bufmgr.h" #include "utils/array.h" #include "utils/rel.h" +#include "utils/sortitemptr.h" #define IsCTIDVar(node) \ @@ -51,7 +52,6 @@ typedef struct TidExpr static void TidExprListCreate(TidScanState *tidstate); static void TidListEval(TidScanState *tidstate); -static int itemptr_comparator(const void *a, const void *b); static TupleTableSlot *TidNext(TidScanState *node); @@ -263,10 +263,8 @@ TidListEval(TidScanState *tidstate) /* CurrentOfExpr could never appear OR'd with something else */ Assert(!tidstate->tss_isCurrentOf); - qsort((void *) tidList, numTids, sizeof(ItemPointerData), - itemptr_comparator); - numTids = qunique(tidList, numTids, sizeof(ItemPointerData), - itemptr_comparator); + qsort_itemptr(tidList, numTids); + numTids = unique_itemptr(tidList, numTids); } tidstate->tss_TidList = tidList; @@ -274,30 +272,6 @@ TidListEval(TidScanState *tidstate) tidstate->tss_TidPtr = -1; } -/* - * qsort comparator for ItemPointerData items - */ -static int -itemptr_comparator(const void *a, const void *b) -{ - const ItemPointerData *ipa = (const ItemPointerData *) a; - const ItemPointerData *ipb = (const ItemPointerData *) b; - BlockNumber ba = ItemPointerGetBlockNumber(ipa); - BlockNumber bb = ItemPointerGetBlockNumber(ipb); - OffsetNumber oa = ItemPointerGetOffsetNumber(ipa); - OffsetNumber ob = ItemPointerGetOffsetNumber(ipb); - - if (ba < bb) - return -1; - if (ba > bb) - return 1; - if (oa < ob) - return -1; - if (oa > ob) - return 1; - return 0; -} - /* ---------------------------------------------------------------- * TidNext * -- 2.30.1
From 50bb6dfb24fb68559c87ce4e2785c9a65be9a674 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sat, 13 Mar 2021 17:30:56 +1300 Subject: [PATCH 6/9] Specialize the HeapTuple sort routine for ANALYZE. Instead of a branching comparator, use "encoded" format, based on 48 bit integers that can be compared by subtraction. --- src/backend/commands/analyze.c | 36 +++++++++------------------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 3a9f358dd4..e760821ef5 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -98,7 +98,6 @@ static VacAttrStats *examine_attribute(Relation onerel, int attnum, static int acquire_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows); -static int compare_rows(const void *a, const void *b); static int acquire_inherited_sample_rows(Relation onerel, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows); @@ -998,6 +997,14 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr) return stats; } +#define ST_SORT sort_heaptuples_by_tid +#define ST_ELEMENT_TYPE HeapTuple +#define ST_COMPARE(a, b) (itemptr_encode(&((*a)->t_self)) - itemptr_encode(&((*b)->t_self))) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + /* * acquire_sample_rows -- acquire a random sample of rows from the table * @@ -1141,7 +1148,7 @@ acquire_sample_rows(Relation onerel, int elevel, * tuples are already sorted. */ if (numrows == targrows) - qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows); + sort_heaptuples_by_tid(rows, numrows); /* * Estimate total numbers of live and dead rows in relation, extrapolating @@ -1176,31 +1183,6 @@ acquire_sample_rows(Relation onerel, int elevel, return numrows; } -/* - * qsort comparator for sorting rows[] array - */ -static int -compare_rows(const void *a, const void *b) -{ - HeapTuple ha = *(const HeapTuple *) a; - HeapTuple hb = *(const HeapTuple *) b; - BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self); - OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self); - BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self); - OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self); - - if (ba < bb) - return -1; - if (ba > bb) - return 1; - if (oa < ob) - return -1; - if (oa > ob) - return 1; - return 0; -} - - /* * acquire_inherited_sample_rows -- acquire sample rows from inheritance tree * -- 2.30.1
From 2352ce2f93fe8b9f9f597ef5ffc6a87468d769ac Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 13:48:14 +1300 Subject: [PATCH 7/9] Specialize pagetable sort routines in tidbitmap.c. Provide slightly more efficient routines for sorting the page table. --- src/backend/nodes/tidbitmap.c | 73 +++++++++++++---------------------- 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c index c5feacbff4..86d9ff0499 100644 --- a/src/backend/nodes/tidbitmap.c +++ b/src/backend/nodes/tidbitmap.c @@ -234,9 +234,6 @@ static PagetableEntry *tbm_get_pageentry(TIDBitmap *tbm, BlockNumber pageno); static bool tbm_page_is_lossy(const TIDBitmap *tbm, BlockNumber pageno); static void tbm_mark_page_lossy(TIDBitmap *tbm, BlockNumber pageno); static void tbm_lossify(TIDBitmap *tbm); -static int tbm_comparator(const void *left, const void *right); -static int tbm_shared_comparator(const void *left, const void *right, - void *arg); /* define hashtable mapping block numbers to PagetableEntry's */ #define SH_USE_NONDEFAULT_ALLOCATOR @@ -671,6 +668,29 @@ tbm_is_empty(const TIDBitmap *tbm) return (tbm->nentries == 0); } +/* An inline sort function to sort PagetableEntry pointers by block. */ +#define ST_SORT qsort_pagetable +#define ST_ELEMENT_TYPE PagetableEntry * +#define ST_COMPARE(a, b) (((int64) (*(a))->blockno) - (((int64) (*(b))->blockno))) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + +/* + * As above, but this will get index into PagetableEntry array. Therefore, + * it needs to get actual PagetableEntry using the index before comparing the + * blockno. + */ +#define ST_SORT qsort_pagetable_shared +#define ST_ELEMENT_TYPE int +#define ST_COMPARE(a, b, base) ((int64) (base)[*(a)].blockno - ((int64) (base)[*(b)].blockno)) +#define ST_COMPARE_ARG_TYPE PagetableEntry +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + /* * tbm_begin_iterate - prepare to iterate through a TIDBitmap * @@ -740,11 +760,9 @@ tbm_begin_iterate(TIDBitmap *tbm) Assert(npages == tbm->npages); Assert(nchunks == tbm->nchunks); if (npages > 1) - qsort(tbm->spages, npages, sizeof(PagetableEntry *), - tbm_comparator); + qsort_pagetable(tbm->spages, npages); if (nchunks > 1) - qsort(tbm->schunks, nchunks, sizeof(PagetableEntry *), - tbm_comparator); + qsort_pagetable(tbm->schunks, nchunks); } tbm->iterating = TBM_ITERATING_PRIVATE; @@ -853,11 +871,9 @@ tbm_prepare_shared_iterate(TIDBitmap *tbm) if (ptbase != NULL) pg_atomic_init_u32(&ptbase->refcount, 0); if (npages > 1) - qsort_arg((void *) (ptpages->index), npages, sizeof(int), - tbm_shared_comparator, (void *) ptbase->ptentry); + qsort_pagetable_shared(ptpages->index, npages, ptbase->ptentry); if (nchunks > 1) - qsort_arg((void *) (ptchunks->index), nchunks, sizeof(int), - tbm_shared_comparator, (void *) ptbase->ptentry); + qsort_pagetable_shared(ptchunks->index, nchunks, ptbase->ptentry); } /* @@ -1416,41 +1432,6 @@ tbm_lossify(TIDBitmap *tbm) tbm->maxentries = Min(tbm->nentries, (INT_MAX - 1) / 2) * 2; } -/* - * qsort comparator to handle PagetableEntry pointers. - */ -static int -tbm_comparator(const void *left, const void *right) -{ - BlockNumber l = (*((PagetableEntry *const *) left))->blockno; - BlockNumber r = (*((PagetableEntry *const *) right))->blockno; - - if (l < r) - return -1; - else if (l > r) - return 1; - return 0; -} - -/* - * As above, but this will get index into PagetableEntry array. Therefore, - * it needs to get actual PagetableEntry using the index before comparing the - * blockno. - */ -static int -tbm_shared_comparator(const void *left, const void *right, void *arg) -{ - PagetableEntry *base = (PagetableEntry *) arg; - PagetableEntry *lpage = &base[*(int *) left]; - PagetableEntry *rpage = &base[*(int *) right]; - - if (lpage->blockno < rpage->blockno) - return -1; - else if (lpage->blockno > rpage->blockno) - return 1; - return 0; -} - /* * tbm_attach_shared_iterate * -- 2.30.1
From 2a5676c8b79449bb563d3aa7a489310926e14f25 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 11:02:40 +1300 Subject: [PATCH 8/9] Specialize some sort/search routines in nbtree code. --- src/backend/access/nbtree/nbtpage.c | 29 ++++-------- src/backend/access/nbtree/nbtutils.c | 68 ++++++++++++++++------------ 2 files changed, 48 insertions(+), 49 deletions(-) diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index fc744cf9fd..78e04a3e9a 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -1468,24 +1468,16 @@ _bt_delitems_update(BTVacuumPosting *updatable, int nupdatable, } /* - * Comparator used by _bt_delitems_delete_check() to restore deltids array - * back to its original leaf-page-wise sort order + * Sort used by _bt_delitems_delete_check() to restore deltids array back to + * its original leaf-page-wise sort order. We can safely use a branchless + * int-based comparator expression because id is an int16. */ -static int -_bt_delitems_cmp(const void *a, const void *b) -{ - TM_IndexDelete *indexdelete1 = (TM_IndexDelete *) a; - TM_IndexDelete *indexdelete2 = (TM_IndexDelete *) b; - - if (indexdelete1->id > indexdelete2->id) - return 1; - if (indexdelete1->id < indexdelete2->id) - return -1; - - Assert(false); - - return 0; -} +#define ST_SORT qsort_delitems +#define ST_ELEMENT_TYPE TM_IndexDelete +#define ST_COMPARE(a, b) ((int) (a)->id - (int) (b)->id) +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" /* * Try to delete item(s) from a btree leaf page during single-page cleanup. @@ -1555,8 +1547,7 @@ _bt_delitems_delete_check(Relation rel, Buffer buf, Relation heapRel, * no entries at all (with bottom-up deletion caller), in which case there * is nothing left to do. */ - qsort(delstate->deltids, delstate->ndeltids, sizeof(TM_IndexDelete), - _bt_delitems_cmp); + qsort_delitems(delstate->deltids, delstate->ndeltids); if (delstate->ndeltids == 0) { Assert(delstate->bottomup); diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index d524310723..3012d9f702 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -22,7 +22,6 @@ #include "access/relscan.h" #include "catalog/catalog.h" #include "commands/progress.h" -#include "lib/qunique.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/datum.h" @@ -35,7 +34,6 @@ typedef struct BTSortArrayContext { FmgrInfo flinfo; Oid collation; - bool reverse; } BTSortArrayContext; static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, @@ -44,7 +42,6 @@ static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, bool reverse, Datum *elems, int nelems); -static int _bt_compare_array_elements(const void *a, const void *b, void *arg); static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op, ScanKey leftarg, ScanKey rightarg, bool *result); @@ -429,6 +426,33 @@ _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, return result; } +/* Define a specialized sort function for _bt_sort_array_elements. */ +#define ST_SORT qsort_bt_array_elements +#define ST_UNIQUE unique_bt_array_elements +#define ST_ELEMENT_TYPE Datum +#define ST_COMPARE(a, b, cxt) \ + DatumGetInt32(FunctionCall2Coll(&cxt->flinfo, cxt->collation, *a, *b)) +#define ST_COMPARE_ARG_TYPE BTSortArrayContext +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + +/* + * Define a reverse sort function for _bt_sort_array_elements. Note use of + * int64 comparator expression, so that we can safely invert even INT_MIN with + * simple substraction from zero. + */ +#define ST_SORT qsort_bt_array_elements_reverse +#define ST_UNIQUE unique_bt_array_elements_reverse +#define ST_ELEMENT_TYPE Datum +#define ST_COMPARE(a, b, cxt) \ + (0 - (DatumGetInt32(FunctionCall2Coll(&cxt->flinfo, cxt->collation, *a, *b)))) +#define ST_COMPARE_TYPE int64 +#define ST_COMPARE_ARG_TYPE BTSortArrayContext +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + /* * _bt_sort_array_elements() -- sort and de-dup array elements * @@ -477,35 +501,19 @@ _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, BTORDER_PROC, elemtype, elemtype, rel->rd_opfamily[skey->sk_attno - 1]); - /* Sort the array elements */ + /* Sort the array elements and remove duplicates */ fmgr_info(cmp_proc, &cxt.flinfo); cxt.collation = skey->sk_collation; - cxt.reverse = reverse; - qsort_arg((void *) elems, nelems, sizeof(Datum), - _bt_compare_array_elements, (void *) &cxt); - - /* Now scan the sorted elements and remove duplicates */ - return qunique_arg(elems, nelems, sizeof(Datum), - _bt_compare_array_elements, &cxt); -} - -/* - * qsort_arg comparator for sorting array elements - */ -static int -_bt_compare_array_elements(const void *a, const void *b, void *arg) -{ - Datum da = *((const Datum *) a); - Datum db = *((const Datum *) b); - BTSortArrayContext *cxt = (BTSortArrayContext *) arg; - int32 compare; - - compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo, - cxt->collation, - da, db)); - if (cxt->reverse) - INVERT_COMPARE_RESULT(compare); - return compare; + if (reverse) + { + qsort_bt_array_elements_reverse(elems, nelems, &cxt); + return unique_bt_array_elements_reverse(elems, nelems, &cxt); + } + else + { + qsort_bt_array_elements(elems, nelems, &cxt); + return unique_bt_array_elements(elems, nelems, &cxt); + } } /* -- 2.30.1
From 2308a8556fecdf3388ec4ac4c2a1588619a62b94 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Sun, 14 Mar 2021 14:53:10 +1300 Subject: [PATCH 9/9] Specialize sort routine used by multixact.c. --- src/backend/access/transam/multixact.c | 35 +++++++------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 1f9f1a1fa1..97c5120cdd 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -344,7 +344,6 @@ static void RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, static MultiXactId GetNewMultiXactId(int nmembers, MultiXactOffset *offset); /* MultiXact cache management */ -static int mxactMemberComparator(const void *arg1, const void *arg2); static MultiXactId mXactCacheGetBySet(int nmembers, MultiXactMember *members); static int mXactCacheGetById(MultiXactId multi, MultiXactMember **members); static void mXactCachePut(MultiXactId multi, int nmembers, @@ -1453,29 +1452,13 @@ retry: return truelength; } -/* - * mxactMemberComparator - * qsort comparison function for MultiXactMember - * - * We can't use wraparound comparison for XIDs because that does not respect - * the triangle inequality! Any old sort order will do. - */ -static int -mxactMemberComparator(const void *arg1, const void *arg2) -{ - MultiXactMember member1 = *(const MultiXactMember *) arg1; - MultiXactMember member2 = *(const MultiXactMember *) arg2; - - if (member1.xid > member2.xid) - return 1; - if (member1.xid < member2.xid) - return -1; - if (member1.status > member2.status) - return 1; - if (member1.status < member2.status) - return -1; - return 0; -} +#define ST_SORT qsort_mxact_members +#define ST_ELEMENT_TYPE MultiXactMember +#define ST_COMPARE(a, b) (((((int64) (a)->status) << 8) | (int64) (a)->xid) - ((((int64) (b)->status) << 8) | (int64) (b)->xid)) +#define ST_COMPARE_TYPE int64 +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" /* * mXactCacheGetBySet @@ -1499,7 +1482,7 @@ mXactCacheGetBySet(int nmembers, MultiXactMember *members) mxid_to_string(InvalidMultiXactId, nmembers, members)); /* sort the array so comparison is easy */ - qsort(members, nmembers, sizeof(MultiXactMember), mxactMemberComparator); + qsort_mxact_members(members, nmembers); dlist_foreach(iter, &MXactCache) { @@ -1605,7 +1588,7 @@ mXactCachePut(MultiXactId multi, int nmembers, MultiXactMember *members) memcpy(entry->members, members, nmembers * sizeof(MultiXactMember)); /* mXactCacheGetBySet assumes the entries are sorted, so sort them */ - qsort(entry->members, nmembers, sizeof(MultiXactMember), mxactMemberComparator); + qsort_mxact_members(entry->members, nmembers); dlist_push_head(&MXactCache, &entry->node); if (MXactCacheMembers++ >= MAX_CACHE_ENTRIES) -- 2.30.1