On Sat, 2020-07-25 at 13:27 -0700, Peter Geoghegan wrote:
> It's not clear to me that overpartitioning is a real problem in
> > this
> > case -- but I think the fact that it's causing confusion is enough
> > reason to see if we can fix it.
> 
> I'm not sure about that either.
> 
> FWIW I notice that when I reduce work_mem a little further (to 3MB)
> with the same query, the number of partitions is still 128, while the
> number of run time batches is 16,512 (an increase from 11,456 from
> 6MB
> work_mem). I notice that 16512/128 is 129, which hints at the nature
> of what's going on with the recursion. I guess it would be ideal if
> the growth in batches was more gradual as I subtract memory.

I wrote a quick patch to use HyperLogLog to estimate the number of
groups contained in a spill file. It seems to reduce the
overpartitioning effect, and is a more principled approach than what I
was doing before.

It does seem to hurt the runtime slightly when spilling to disk in some
cases. I haven't narrowed down whether this is because we end up
recursing multiple times, or if it's just more efficient to
overpartition, or if the cost of doing the HLL itself is significant.

Regards,
        Jeff Davis

From 424f611f442e36a91747fd39cd28acba42eb958b Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 25 Jul 2020 14:29:59 -0700
Subject: [PATCH] HLL

---
 src/backend/executor/nodeAgg.c | 64 ++++++++++++++++++++++------------
 src/include/executor/nodeAgg.h |  2 +-
 2 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index b79c845a6b7..25771cb4869 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -245,9 +245,11 @@
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "common/hashfn.h"
 #include "executor/execExpr.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
+#include "lib/hyperloglog.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
@@ -295,6 +297,14 @@
 #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
 #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
 
+/*
+ * HyperLogLog is used for estimating the cardinality of the spilled tuples in
+ * a given partition. 5 bits corresponds to a size of about 32 bytes and a
+ * worst-case error of around 18%. That's effective enough to choose a
+ * reasonable number of partitions when recursing.
+ */
+#define HASHAGG_HLL_BIT_WIDTH 5
+
 /*
  * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
  * improved?
@@ -339,6 +349,7 @@ typedef struct HashAggSpill
 	int64	   *ntuples;		/* number of tuples in each partition */
 	uint32		mask;			/* mask to find partition from hash value */
 	int			shift;			/* after masking, shift by this amount */
+	hyperLogLogState *hll_card;	/* cardinality estimate for contents */
 } HashAggSpill;
 
 /*
@@ -357,6 +368,7 @@ typedef struct HashAggBatch
 	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
 	int			input_tapenum;	/* input partition tape */
 	int64		input_tuples;	/* number of tuples in this batch */
+	double		input_card;		/* estimated group cardinality */
 } HashAggBatch;
 
 /* used to find referenced colnos */
@@ -409,7 +421,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
 static long hash_choose_num_buckets(double hashentrysize,
 									long estimated_nbuckets,
 									Size memory);
-static int	hash_choose_num_partitions(uint64 input_groups,
+static int	hash_choose_num_partitions(double input_groups,
 									   double hashentrysize,
 									   int used_bits,
 									   int *log2_npartittions);
@@ -429,10 +441,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate);
 static void hashagg_reset_spill_state(AggState *aggstate);
 static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
 									   int input_tapenum, int setno,
-									   int64 input_tuples, int used_bits);
+									   int64 input_tuples, double input_card,
+									   int used_bits);
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
 static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
-							   int used_bits, uint64 input_tuples,
+							   int used_bits, double input_groups,
 							   double hashentrysize);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 								TupleTableSlot *slot, uint32 hash);
@@ -1775,7 +1788,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
  * substantially larger than the initial value.
  */
 void
-hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
+hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
 					Size *mem_limit, uint64 *ngroups_limit,
 					int *num_partitions)
 {
@@ -1967,7 +1980,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
  * *log2_npartitions to the log2() of the number of partitions.
  */
 static int
-hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
+hash_choose_num_partitions(double input_groups, double hashentrysize,
 						   int used_bits, int *log2_npartitions)
 {
 	Size		mem_wanted;
@@ -2590,7 +2603,6 @@ agg_refill_hash_table(AggState *aggstate)
 	HashAggBatch *batch;
 	HashAggSpill spill;
 	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-	uint64		ngroups_estimate;
 	bool		spill_initialized = false;
 
 	if (aggstate->hash_batches == NIL)
@@ -2599,16 +2611,7 @@ agg_refill_hash_table(AggState *aggstate)
 	batch = linitial(aggstate->hash_batches);
 	aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
 
-	/*
-	 * Estimate the number of groups for this batch as the total number of
-	 * tuples in its input file. Although that's a worst case, it's not bad
-	 * here for two reasons: (1) overestimating is better than
-	 * underestimating; and (2) we've already scanned the relation once, so
-	 * it's likely that we've already finalized many of the common values.
-	 */
-	ngroups_estimate = batch->input_tuples;
-
-	hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
+	hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
 						batch->used_bits, &aggstate->hash_mem_limit,
 						&aggstate->hash_ngroups_limit, NULL);
 
@@ -2685,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate)
 				 */
 				spill_initialized = true;
 				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
-								   ngroups_estimate, aggstate->hashentrysize);
+								   batch->input_card, aggstate->hashentrysize);
 			}
 			/* no memory for a new group, spill */
 			hashagg_spill_tuple(aggstate, &spill, slot, hash);
@@ -2941,7 +2944,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
  */
 static void
 hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
-				   uint64 input_groups, double hashentrysize)
+				   double input_groups, double hashentrysize)
 {
 	int			npartitions;
 	int			partition_bits;
@@ -2951,6 +2954,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
 
 	spill->partitions = palloc0(sizeof(int) * npartitions);
 	spill->ntuples = palloc0(sizeof(int64) * npartitions);
+	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
 
 	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
 
@@ -2958,6 +2962,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
 	spill->shift = 32 - used_bits - partition_bits;
 	spill->mask = (npartitions - 1) << spill->shift;
 	spill->npartitions = npartitions;
+
+	for (int i = 0; i < npartitions; i++)
+		initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
 }
 
 /*
@@ -3006,6 +3013,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 	partition = (hash & spill->mask) >> spill->shift;
 	spill->ntuples[partition]++;
 
+	/*
+	 * All hash values destined for a given partition have some bits in
+	 * common, which causes bad HLL cardinality estimates. Hash the hash to
+	 * get a more uniform distribution.
+	 */
+	addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
+
 	tapenum = spill->partitions[partition];
 
 	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
@@ -3028,7 +3042,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
  */
 static HashAggBatch *
 hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
-				  int64 input_tuples, int used_bits)
+				  int64 input_tuples, double input_card, int used_bits)
 {
 	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
@@ -3037,6 +3051,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
 	batch->tapeset = tapeset;
 	batch->input_tapenum = tapenum;
 	batch->input_tuples = input_tuples;
+	batch->input_card = input_card;
 
 	return batch;
 }
@@ -3140,21 +3155,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 
 	for (i = 0; i < spill->npartitions; i++)
 	{
-		int			tapenum = spill->partitions[i];
-		HashAggBatch *new_batch;
+		int				 tapenum = spill->partitions[i];
+		HashAggBatch	*new_batch;
+		double			 cardinality;
 
 		/* if the partition is empty, don't create a new batch of work */
 		if (spill->ntuples[i] == 0)
 			continue;
 
+		cardinality = estimateHyperLogLog(&spill->hll_card[i]);
+		freeHyperLogLog(&spill->hll_card[i]);
+
 		new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
 									  tapenum, setno, spill->ntuples[i],
-									  used_bits);
+									  cardinality, used_bits);
 		aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
 		aggstate->hash_batches_used++;
 	}
 
 	pfree(spill->ntuples);
+	pfree(spill->hll_card);
 	pfree(spill->partitions);
 }
 
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index bb0805abe09..b9551695384 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -320,7 +320,7 @@ extern void ExecReScanAgg(AggState *node);
 
 extern Size hash_agg_entry_size(int numTrans, Size tupleWidth,
 								Size transitionSpace);
-extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups,
+extern void hash_agg_set_limits(double hashentrysize, double input_groups,
 								int used_bits, Size *mem_limit,
 								uint64 *ngroups_limit, int *num_partitions);
 
-- 
2.17.1

Reply via email to