On Sat, 2020-07-25 at 13:27 -0700, Peter Geoghegan wrote: > It's not clear to me that overpartitioning is a real problem in > > this > > case -- but I think the fact that it's causing confusion is enough > > reason to see if we can fix it. > > I'm not sure about that either. > > FWIW I notice that when I reduce work_mem a little further (to 3MB) > with the same query, the number of partitions is still 128, while the > number of run time batches is 16,512 (an increase from 11,456 from > 6MB > work_mem). I notice that 16512/128 is 129, which hints at the nature > of what's going on with the recursion. I guess it would be ideal if > the growth in batches was more gradual as I subtract memory.
I wrote a quick patch to use HyperLogLog to estimate the number of groups contained in a spill file. It seems to reduce the overpartitioning effect, and is a more principled approach than what I was doing before. It does seem to hurt the runtime slightly when spilling to disk in some cases. I haven't narrowed down whether this is because we end up recursing multiple times, or if it's just more efficient to overpartition, or if the cost of doing the HLL itself is significant. Regards, Jeff Davis
From 424f611f442e36a91747fd39cd28acba42eb958b Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 25 Jul 2020 14:29:59 -0700 Subject: [PATCH] HLL --- src/backend/executor/nodeAgg.c | 64 ++++++++++++++++++++++------------ src/include/executor/nodeAgg.h | 2 +- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index b79c845a6b7..25771cb4869 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -245,9 +245,11 @@ #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "common/hashfn.h" #include "executor/execExpr.h" #include "executor/executor.h" #include "executor/nodeAgg.h" +#include "lib/hyperloglog.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -295,6 +297,14 @@ #define HASHAGG_READ_BUFFER_SIZE BLCKSZ #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ +/* + * HyperLogLog is used for estimating the cardinality of the spilled tuples in + * a given partition. 5 bits corresponds to a size of about 32 bytes and a + * worst-case error of around 18%. That's effective enough to choose a + * reasonable number of partitions when recursing. + */ +#define HASHAGG_HLL_BIT_WIDTH 5 + /* * Estimate chunk overhead as a constant 16 bytes. XXX: should this be * improved? @@ -339,6 +349,7 @@ typedef struct HashAggSpill int64 *ntuples; /* number of tuples in each partition */ uint32 mask; /* mask to find partition from hash value */ int shift; /* after masking, shift by this amount */ + hyperLogLogState *hll_card; /* cardinality estimate for contents */ } HashAggSpill; /* @@ -357,6 +368,7 @@ typedef struct HashAggBatch LogicalTapeSet *tapeset; /* borrowed reference to tape set */ int input_tapenum; /* input partition tape */ int64 input_tuples; /* number of tuples in this batch */ + double input_card; /* estimated group cardinality */ } HashAggBatch; /* used to find referenced colnos */ @@ -409,7 +421,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory); -static int hash_choose_num_partitions(uint64 input_groups, +static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartittions); @@ -429,10 +441,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate); static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, int input_tapenum, int setno, - int64 input_tuples, int used_bits); + int64 input_tuples, double input_card, + int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, - int used_bits, uint64 input_tuples, + int used_bits, double input_groups, double hashentrysize); static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash); @@ -1775,7 +1788,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) * substantially larger than the initial value. */ void -hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, +hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions) { @@ -1967,7 +1980,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) * *log2_npartitions to the log2() of the number of partitions. */ static int -hash_choose_num_partitions(uint64 input_groups, double hashentrysize, +hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartitions) { Size mem_wanted; @@ -2590,7 +2603,6 @@ agg_refill_hash_table(AggState *aggstate) HashAggBatch *batch; HashAggSpill spill; HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; - uint64 ngroups_estimate; bool spill_initialized = false; if (aggstate->hash_batches == NIL) @@ -2599,16 +2611,7 @@ agg_refill_hash_table(AggState *aggstate) batch = linitial(aggstate->hash_batches); aggstate->hash_batches = list_delete_first(aggstate->hash_batches); - /* - * Estimate the number of groups for this batch as the total number of - * tuples in its input file. Although that's a worst case, it's not bad - * here for two reasons: (1) overestimating is better than - * underestimating; and (2) we've already scanned the relation once, so - * it's likely that we've already finalized many of the common values. - */ - ngroups_estimate = batch->input_tuples; - - hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate, + hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, batch->used_bits, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, NULL); @@ -2685,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate) */ spill_initialized = true; hashagg_spill_init(&spill, tapeinfo, batch->used_bits, - ngroups_estimate, aggstate->hashentrysize); + batch->input_card, aggstate->hashentrysize); } /* no memory for a new group, spill */ hashagg_spill_tuple(aggstate, &spill, slot, hash); @@ -2941,7 +2944,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) */ static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, - uint64 input_groups, double hashentrysize) + double input_groups, double hashentrysize) { int npartitions; int partition_bits; @@ -2951,6 +2954,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, spill->partitions = palloc0(sizeof(int) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions); + spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); @@ -2958,6 +2962,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, spill->shift = 32 - used_bits - partition_bits; spill->mask = (npartitions - 1) << spill->shift; spill->npartitions = npartitions; + + for (int i = 0; i < npartitions; i++) + initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH); } /* @@ -3006,6 +3013,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, partition = (hash & spill->mask) >> spill->shift; spill->ntuples[partition]++; + /* + * All hash values destined for a given partition have some bits in + * common, which causes bad HLL cardinality estimates. Hash the hash to + * get a more uniform distribution. + */ + addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash)); + tapenum = spill->partitions[partition]; LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); @@ -3028,7 +3042,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, */ static HashAggBatch * hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, - int64 input_tuples, int used_bits) + int64 input_tuples, double input_card, int used_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); @@ -3037,6 +3051,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, batch->tapeset = tapeset; batch->input_tapenum = tapenum; batch->input_tuples = input_tuples; + batch->input_card = input_card; return batch; } @@ -3140,21 +3155,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) for (i = 0; i < spill->npartitions; i++) { - int tapenum = spill->partitions[i]; - HashAggBatch *new_batch; + int tapenum = spill->partitions[i]; + HashAggBatch *new_batch; + double cardinality; /* if the partition is empty, don't create a new batch of work */ if (spill->ntuples[i] == 0) continue; + cardinality = estimateHyperLogLog(&spill->hll_card[i]); + freeHyperLogLog(&spill->hll_card[i]); + new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset, tapenum, setno, spill->ntuples[i], - used_bits); + cardinality, used_bits); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); aggstate->hash_batches_used++; } pfree(spill->ntuples); + pfree(spill->hll_card); pfree(spill->partitions); } diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index bb0805abe09..b9551695384 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -320,7 +320,7 @@ extern void ExecReScanAgg(AggState *node); extern Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace); -extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups, +extern void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions); -- 2.17.1