On 11.9.2014 16:33, Tomas Vondra wrote: > On 11 Září 2014, 15:31, Robert Haas wrote: >> On Wed, Sep 10, 2014 at 5:09 PM, Tomas Vondra <t...@fuzzy.cz> wrote: >>> OK. So here's v13 of the patch, reflecting this change. >> >> [...] It does three things: >> >> (1) It changes NTUP_PER_BUCKET to 1. Although this increases memory >> utilization, it also improves hash table performance, and the >> additional memory we use is less than what we saved as a result of >> commit 45f6240a8fa9d35548eb2ef23dba2c11540aa02a. >> >> (2) It changes ExecChooseHashTableSize() to include the effect of the >> memory consumed by the bucket array. If we care about properly >> respecting work_mem, this is clearly right for any NTUP_PER_BUCKET >> setting, but more important for a small setting like 1 than for the >> current value of 10. I'm not sure the logic in this function is as >> clean and simple as it can be just yet. >> (3) It allows the number of batches to increase on the fly while the >> hash join is in process. This case arises when we initially estimate >> that we only need a small hash table, and then it turns out that there >> are more tuples than we expect. Without this code, the hash table's >> load factor gets too high and things start to suck. >> >> I recommend separating this patch in two patches, the first covering >> items (1) and (2) and the second covering item (3), which really seems >> like a separate improvement. > > That probably makes sense.
Attached is the patch split as suggested: (a) hashjoin-nbuckets-v14a-size.patch * NTUP_PER_BUCKET=1 * counting buckets towards work_mem * changes in ExecChooseHashTableSize (due to the other changes) (b) hashjoin-nbuckets-v14a-resize.patch * rest of the patch, that is ... * tracking optimal number of buckets * dynamic resize of the hash table * adding info the the EXPLAIN ANALYZE output regards Tomas
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 781a736..d128e08 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1900,18 +1900,21 @@ show_hash_info(HashState *hashstate, ExplainState *es) if (es->format != EXPLAIN_FORMAT_TEXT) { ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Original Hash Buckets", + hashtable->nbuckets_original, es); ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); ExplainPropertyLong("Original Hash Batches", hashtable->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch) + else if ((hashtable->nbatch_original != hashtable->nbatch) || + (hashtable->nbuckets_original != hashtable->nbuckets)) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, - "Buckets: %d Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, - hashtable->nbatch_original, spacePeakKb); + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hashtable->nbuckets, hashtable->nbuckets_original, + hashtable->nbatch, hashtable->nbatch_original, spacePeakKb); } else { diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index fa0e700a..4f00426 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -39,6 +39,7 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); +static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable); static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse); static void ExecHashSkewTableInsert(HashJoinTable hashtable, @@ -49,8 +50,8 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); -/* Memory needed for buckets. */ -#define BUCKETS_SPACE(htab) ((htab)->nbuckets * sizeof(HashJoinTuple)) +/* Memory needed for optimal number of buckets. */ +#define BUCKETS_SPACE(htab) ((htab)->nbuckets_optimal * sizeof(HashJoinTuple)) /* ---------------------------------------------------------------- * ExecHash @@ -120,6 +121,7 @@ MultiExecHash(HashState *node) /* It's a skew tuple, so put it into that hash table */ ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber); + hashtable->skewTuples += 1; } else { @@ -130,6 +132,25 @@ MultiExecHash(HashState *node) } } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ + if (hashtable->nbuckets != hashtable->nbuckets_optimal) + { + /* We never decrease the number of buckets. */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + +#ifdef HJDEBUG + printf("Increasing nbuckets %d => %d\n", + hashtable->nbuckets, hashtable->nbuckets_optimal); +#endif + + ExecHashIncreaseNumBuckets(hashtable); + } + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + hashtable->spaceUsed += BUCKETS_SPACE(hashtable); + if (hashtable->spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = hashtable->spaceUsed; + /* must provide our own instrumentation support */ if (node->ps.instrument) InstrStopNode(node->ps.instrument, hashtable->totalTuples); @@ -275,7 +296,10 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData)); hashtable->nbuckets = nbuckets; + hashtable->nbuckets_original = nbuckets; + hashtable->nbuckets_optimal = nbuckets; hashtable->log2_nbuckets = log2_nbuckets; + hashtable->log2_nbuckets_optimal = log2_nbuckets; hashtable->buckets = NULL; hashtable->keepNulls = keepNulls; hashtable->skewEnabled = false; @@ -289,6 +313,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) hashtable->nbatch_outstart = nbatch; hashtable->growEnabled = true; hashtable->totalTuples = 0; + hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; hashtable->spaceUsed = 0; @@ -543,6 +568,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, */ while ((inner_rel_bytes / nbatch) > hash_table_bytes) nbatch <<= 1; + } else { @@ -672,6 +698,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) */ ninmemory = nfreed = 0; + /* If know we need to resize nbuckets, we can do it while rebatching. */ + if (hashtable->nbuckets_optimal != hashtable->nbuckets) + { + /* we never decrease the number of buckets */ + Assert(hashtable->nbuckets_optimal > hashtable->nbuckets); + + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal; + + hashtable->buckets = repalloc(hashtable->buckets, + sizeof(HashJoinTuple) * hashtable->nbuckets); + } + /* * We will scan through the chunks directly, so that we can reset the * buckets now and not have to keep track which tuples in the buckets have @@ -756,6 +795,93 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } /* + * ExecHashIncreaseNumBuckets + * increase the original number of buckets in order to reduce + * number of tuples per bucket + */ +static void +ExecHashIncreaseNumBuckets(HashJoinTable hashtable) +{ + HashMemoryChunk chunk; + + /* do nothing if not an increase (it's called increase for a reason) */ + if (hashtable->nbuckets >= hashtable->nbuckets_optimal) + return; + + /* + * We already know the optimal number of buckets, so let's just + * compute the log2_nbuckets for it. + */ + hashtable->nbuckets = hashtable->nbuckets_optimal; + hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal); + + Assert(hashtable->nbuckets > 1); + Assert(hashtable->nbuckets <= (INT_MAX/2)); + Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets)); + +#ifdef HJDEBUG + printf("Increasing nbuckets to %d\n", hashtable->nbuckets); +#endif + + /* + * Just reallocate the proper number of buckets - we don't need to + * walk through them - we can walk the dense-allocated chunks + * (just like in ExecHashIncreaseNumBatches, but without all the + * copying into new chunks) + */ + + hashtable->buckets = (HashJoinTuple *) repalloc(hashtable->buckets, + hashtable->nbuckets * sizeof(HashJoinTuple)); + + memset(hashtable->buckets, 0, sizeof(void*) * hashtable->nbuckets); + + /* scan through all tuples in all chunks, rebuild the hash table */ + chunk = hashtable->chunks; + while (chunk != NULL) + { + + /* position within the buffer (up to chunk->used) */ + size_t idx = 0; + + /* process all tuples stored in this chunk (and then free it) */ + while (idx < chunk->used) + { + + /* get the hashjoin tuple and mintuple stored right after it */ + HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + int bucketno; + int batchno; + + ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, + &bucketno, &batchno); + + /* just add the tuple to the proper bucket */ + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; + + /* bytes occupied in memory HJ tuple overhead + actual tuple length */ + idx += MAXALIGN(hashTupleSize); + + } + + /* proceed to the next chunk */ + chunk = chunk->next; + + } + +#ifdef HJDEBUG + printf("Nbuckets increased to %d, average items per bucket %.1f\n", + hashtable->nbuckets, batchTuples / hashtable->nbuckets); +#endif + +} + + +/* * ExecHashTableInsert * insert a tuple into the hash table depending on the hash value * it may just go to a temp file for later batches @@ -788,6 +914,7 @@ ExecHashTableInsert(HashJoinTable hashtable, */ HashJoinTuple hashTuple; int hashTupleSize; + double ntuples = (hashtable->totalTuples - hashtable->skewTuples); /* Create the HashJoinTuple */ hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; @@ -808,12 +935,30 @@ ExecHashTableInsert(HashJoinTable hashtable, hashTuple->next = hashtable->buckets[bucketno]; hashtable->buckets[bucketno] = hashTuple; + /* + * Increase the (optimal) number of buckets if we just exceeded NTUP_PER_BUCKET, + * but only when there's still a single batch at this moment. + */ + if ((hashtable->nbatch == 1) && + (hashtable->nbuckets_optimal <= INT_MAX/2) && /* overflow protection */ + (ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) + { + hashtable->nbuckets_optimal *= 2; + hashtable->log2_nbuckets_optimal += 1; + } + /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; + + /* + * Do we need to increase number of batches? Account for space required + * by buckets (optimal number). + */ if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); + } else { @@ -936,7 +1081,10 @@ ExecHashGetHashValue(HashJoinTable hashtable, * functions are good about randomizing all their output bits, else we are * likely to have very skewed bucket or batch occupancy.) * - * nbuckets doesn't change over the course of the join. + * The nbuckets/log2_nbuckets may change while (nbatch==1) because of dynamic + * buckets growth. Once we start batching, the value is fixed and does not + * change over the course of join (making it possible to compute batch number + * the way we do here). * * nbatch is always a power of 2; we increase it only by doubling it. This * effectively adds one more bit to the top of the batchno. diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index c9e61df..0e1e0cd 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -127,6 +127,10 @@ typedef struct HashJoinTableData int nbuckets; /* # buckets in the in-memory hash table */ int log2_nbuckets; /* its log2 (nbuckets must be a power of 2) */ + int nbuckets_original; /* # buckets when starting the first hash */ + int nbuckets_optimal; /* optimal # buckets (per batch) */ + int log2_nbuckets_optimal; /* same as log2_nbuckets optimal */ + /* buckets[i] is head of list of tuples in i'th in-memory bucket */ struct HashJoinTupleData **buckets; /* buckets array is per-batch storage, as are all the tuples */ @@ -148,6 +152,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double skewTuples; /* # tuples inserted into skew tuples */ /* * These arrays are allocated for the life of the hash join, but only if
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 3ef7cfb..fa0e700a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -49,6 +49,9 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); +/* Memory needed for buckets. */ +#define BUCKETS_SPACE(htab) ((htab)->nbuckets * sizeof(HashJoinTuple)) + /* ---------------------------------------------------------------- * ExecHash * @@ -386,7 +389,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls) */ /* Target bucket loading (tuples per bucket) */ -#define NTUP_PER_BUCKET 10 +#define NTUP_PER_BUCKET 1 void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, @@ -396,6 +399,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, { int tupsize; double inner_rel_bytes; + long buckets_bytes; long hash_table_bytes; long skew_table_bytes; long max_pointers; @@ -418,6 +422,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, inner_rel_bytes = ntuples * tupsize; /* + * Compute memory occupied by buckets, assuming all tuples fit into + * a single batch (consider NTUP_PER_BUCKET tuples per bucket) - buckets + * are just 'HashJoinTuple' elements (pointers to HashJoinTupleData). + * Also, we never use less than 1024 buckets. + */ + nbuckets = (1 << my_log2(ntuples / NTUP_PER_BUCKET)); + nbuckets = Max(1024, nbuckets); + buckets_bytes = sizeof(HashJoinTuple) * nbuckets; + + /* * Target in-memory hashtable size is work_mem kilobytes. */ hash_table_bytes = work_mem * 1024L; @@ -468,16 +482,13 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, /* also ensure we avoid integer overflow in nbatch and nbuckets */ max_pointers = Min(max_pointers, INT_MAX / 2); - if (inner_rel_bytes > hash_table_bytes) + if ((inner_rel_bytes + buckets_bytes) > hash_table_bytes) { /* We'll need multiple batches */ long lbuckets; double dbatch; int minbatch; - - lbuckets = (hash_table_bytes / tupsize) / NTUP_PER_BUCKET; - lbuckets = Min(lbuckets, max_pointers); - nbuckets = (int) lbuckets; + long bucket_size; dbatch = ceil(inner_rel_bytes / hash_table_bytes); dbatch = Min(dbatch, max_pointers); @@ -485,6 +496,53 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch = 2; while (nbatch < minbatch) nbatch <<= 1; + + /* memory needed by a single "full" bucket (including tuples) */ + bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple)); + + /* + * When batching, we use buckets size for full work_mem. We simply + * divide work_mem by memory needed per 'full bucket' (a pointer and + * NTUP_PER_BUCKET tuples, each 'tupsize' bytes, including additional + * overhead for hash, pointer to the next tuple etc.). + */ + lbuckets = 1 << my_log2(hash_table_bytes / bucket_size); + + /* protect against nbucket overflow */ + lbuckets = Min(lbuckets, max_pointers); + nbuckets = (int) lbuckets; + + /* Compute memory needed for buckets. */ + buckets_bytes = nbuckets * sizeof(HashJoinTuple); + + /* + * Buckets are simple pointers to hashjoin tuples, while tupsize is + * always >= 24B, plus actual data. So buckets should never really + * exceed 25% of work_mem (even for NTUP_PER_BUCKET=1). Except maybe + * for work_mem values that are not 2^N bytes, where we might get more + * because of doubling. So let's look for 50% here. + */ + Assert(buckets_bytes <= hash_table_bytes/2); + + /* + * Subtract the buckets from work_mem, so we know how much memory + * remains for the actual tuples. + */ + hash_table_bytes -= buckets_bytes; + + /* + * Increase the nbatch until we get both tuples and buckets into work_mem. + * + * The loop should not execute more than once in most cases, becase tuples are + * usually much wider than buckets (usually 8B pointers), so by using only + * (batch_bytes/2) should get us below work_mem. + * + * The worst case is that (nbuckets == 2*ntuples-1), giving us about twice the + * number of buckets, i.e. about 2*sizeof(void*) per tuple. But that's + * the consequence of how NTUP_PER_BUCKET is chosen, and work_mem limit. + */ + while ((inner_rel_bytes / nbatch) > hash_table_bytes) + nbatch <<= 1; } else { @@ -754,7 +812,7 @@ ExecHashTableInsert(HashJoinTable hashtable, hashtable->spaceUsed += hashTupleSize; if (hashtable->spaceUsed > hashtable->spacePeak) hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed > hashtable->spaceAllowed) + if (hashtable->spaceUsed + BUCKETS_SPACE(hashtable) > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers