Hi, I've been once again reminded of the batch explosion issue in hashjoin, due to how it enforces the memory limit. This resurfaces every now and then, when a used gets strange OOM issues - see for example these threads from ~2019 for an example, and even some patches: [1] [2] [3]
Let me restart the discussion, resubmit some of the older patches, and present a plan for what to do about this ... Just to remind the basic details, a brief summary - the hashjoin does not account for the spill files when enforcing the memory limit. The hash table gets full, it decides to double the number of batches which cuts the hash table size in half. But with enough batches the doubling can actually make the situation much worse - the new batches simply use more memory than was saved. This can happen for various reasons. A simple example is that we under estimate the size of the input relation, so the hash needs to be built on many more tuples. This is bad, but usually not disastrous. It's much worse when there's a batch that is not "divisible", i.e. adding more batches does not split it roughly in half. This can happen due to hash collisions (in the part that determines the batch), duplicate values that didn't make it into MCV (and thus the skew optimization does not kick in). This is fairly rare, but when it happens it can easily lead to batch explosion, i.e. rapidly increasing the number of batches. We add batches, but the batch does not split, so we promptly hit the limit again, triggering another increase. It often stops only when we exhaust the 32-bit hash space, ending with 100s of thousands of batches. Attached is a SQL script that reproduces something like this. It builds a table with values with hashes that have 0s in the upper bits. And then the hash join just spirals into a batch explosion. Note: The script is a bit dumb and needs a lot of temp space (~50GB) when generating the values with duplicate hashes. In 2019 I shared a bunch of patches [4] improving this, but then I got distracted and the discussion stalled because there were proposals to fix this by introducing a special hash join "mode" to address these issues [5], but we never got past a prototype and there's a lot of outstanding questions. So I decided to revisit the three patches from 2019. Attached are rebased and cleaned up versions. A couple comments on each one: 1) v20241231-adjust-limit-0001-Account-for-batch-files-in-ha.patch I believe this is the way to go, for now. The basic idea is to keep the overall behavior, but "relax" the memory limit as the number of batches increases to minimize the total memory use. This may seem a bit weird, but as the number of batches grows there's no way to not violate the limit. And the current code simply ignores this and allocates arbitrary amounts of memory. 2) v20241231-single-spill-0001-Hash-join-with-a-single-spill.patch The basic idea is that we keep only a small "slice" of batches in memory, and data for later batches are spilled into a single file. This means that even if the number of batches increases, the memory use does not change. Which means the memory limit is enforced very strictly. The problem is this performs *terribly* because it shuffles data many times, always just to the next slice. So if we happen to have 128 batches in memory and the number explodes to ~128k batches, we end up reading/writing each tuple ~500x. 3) v20241231-multi-spill-0001-Hash-join-with-a-multiple-spil.patch This is an improvement of the "single spill", except that we keep one spill file per slice, which greatly reduces the amount of temporary traffic. The trouble is this means we can no longer enforce the memory limit that strictly, because the number of files does grow with the number of batches, although not 1:1. But with a slice of 128 batches we get only 1 file per 128 batches, which is a nice reduction. This means that ultimately it's either (1) or (3), and the more I've been looking into this the more I prefer (1), for a couple reasons: * It's much simpler (it doesn't really change anything on the basic behavior, doesn't introduce any new files or anything like that. * There doesn't seem to be major difference in total memory consumption between the two approaches. Both allow allocating more memory. * It actually helps with the "indivisible batch" case - it relaxes the limit, so there's a chance the batch eventually fits and we stop adding more and more batches. With spill files that's not the case - we still keep the original limit, and we end up with the batch explosion (but then we handle it much more efficiently). Unless there are some objections, my plan is to get (1) cleaned up and try to get it in for 18, possibly in the January CF. It's not a particularly complex patch, and it already passes check-world (it only affected three plans in join_hash, and those make sense I think). One thing I'm not sure about yet is whether this needs to tweak the hashjoin costing to also consider the files when deciding how many batches to use. Maybe it should? regards [1] https://www.postgresql.org/message-id/20190504003414.bulcbnge3rhwhcsh%40development [2] https://www.postgresql.org/message-id/20230228190643.1e368315%40karst [3] https://www.postgresql.org/message-id/bc138e9f-c89e-9147-5395-61d51a757b3b%40gusw.net [4] https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development [5] https://www.postgresql.org/message-id/caakru_yswm7gc_b2nbgwfpe6wuhdolfc1lbz786duzacpud...@mail.gmail.com -- Tomas Vondra
From edcaac0705193acea33c4423bf0d59128219a46f Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@vondra.me> Date: Tue, 31 Dec 2024 17:09:40 +0100 Subject: [PATCH v20241231-adjust-limit] Account for batch files in hash join spilling Hash joins try to limit the amount of memory used by the Hash node by only keeping a single batch in memory and spilling future batches to disk. Unfortunately, the implementation does not account for the files used for spilling, which can lead to issues with many batches. Each file keeps a BLCKSZ buffer in memory, and we need 2*nbatches of those files, so with many batches this may use substatial amounts of memory. The hash join code however assumes adding batches is virtually free (in terms of memory needed), ignoring this issue. It increases the number of batches, possibly keeping the current batch within the limit, but ends up using much more memory for the files. This can be particularly painful with adversary data sets, with a batch that can't be split. This may happen due to hash collisions (overlaps in the part used to calculate "batch"), or a value with many duplicities that however didn't make it to the MCV list (and thus the skew table can't help). In these cases the hash can get into a cycle of increasing the number of batches, often reaching 256k or 512k batches before exhausting available hash space (32-bits). If this happens, there's not much point in enforcing the original memory limit. That's simply not feasible - especially in the case of a single batch exceeding the allowed space. Instead, the best we can do is relaxing the limit, and focusing on using as little total memory as possible. By allowing the batches to be larger, we reduce the number of batch files. The adjustment formula is based on the observation that doubling the number of batches doubles the amount of memory needed for the files, while cutting the batch size in half. This defines the "break even" point for the next batch increase. --- src/backend/executor/nodeHash.c | 153 +++++++++++++++++++++--- src/test/regress/expected/join_hash.out | 4 +- src/test/regress/sql/join_hash.sql | 4 +- 3 files changed, 142 insertions(+), 19 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 3e22d50e3a4..680d2897738 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -54,6 +54,9 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable, uint32 hashvalue, int bucketNumber); static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); +static bool ExecHashExceededMemoryLimit(HashJoinTable hashtable); +static void ExecHashAdjustMemoryLimit(HashJoinTable hashtable); static void *dense_alloc(HashJoinTable hashtable, Size size); static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, @@ -199,10 +202,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* update info about peak memory usage */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -1036,6 +1037,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable, nfreed, ninmemory, hashtable->spaceUsed); #endif + /* adjust the memory limit for the new nbatches etc. */ + ExecHashAdjustMemoryLimit(hashtable); + /* * If we dumped out either all or none of the tuples in the table, disable * further expansion of nbatch. This situation implies that we have @@ -1673,11 +1677,12 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; - if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) - > hashtable->spaceAllowed) + + /* update info about peak memory usage */ + ExecHashUpdateSpacePeak(hashtable); + + /* Should we add more batches, to enforce memory limit? */ + if (ExecHashExceededMemoryLimit(hashtable)) ExecHashIncreaseNumBatches(hashtable); } else @@ -1843,6 +1848,120 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +/* + * ExecHashUpdateSpacePeak + * Update information about peak memory usage. + * + * This considers tuples added to the hash table, buckets of the hash table + * itself, and also the bufferer batch files on both the inner and outer side. + * Each file has a BLCKSZ buffer, so with enough batches this may actually + * represent most of the memory used by the hash join node. + */ +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* buckets of the hash table */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* buffered batch files (inner + outer), each has a BLCKSZ buffer */ + spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2; + + /* if we exceeded the current peak, remember the new one */ + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + +/* + * ExecHashMemoryLimitExceeded + * Check if the amount of memory used exceeds spaceAllowed. + * + * Check if the total amount of space used by the hash join exceeds the + * current value of spaceAllowed, and we should try to increase the number + * of batches. + * + * We need to consider both the data added to the hash and the hashtable + * itself (i.e. buckets), but also the files used for future batches. + * Each batch needs a file for inner/outer side, so we need (2*nbatch) + * files in total, and each BufFile has a BLCKSZ buffer. If we ignored + * the files and simply doubled the number of batches, we could easily + * increase the total amount of memory because while we expect to cut the + * batch size in half to, doubling the number of batches also doubles the + * amount of memory allocated by BufFile. + * + * That means doubling the number of batches is pointless when + * + * (spaceUsed / 2) < 2 * (nbatches * sizeof(BufFile)) + * + * because it would result in allocating more memory than it saves. + * + * This is a temporary decision - we can't stop adding batches entirely, + * just until the hash table grows enough to make it a win again. + */ +static bool +ExecHashExceededMemoryLimit(HashJoinTable hashtable) +{ + return (hashtable->spaceUsed + + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + hashtable->nbatch * sizeof(PGAlignedBlock) * 2 + > hashtable->spaceAllowed); +} + +/* + * ExecHashAdjustMemoryLimit + * Adjust the memory limit after increasing the number of batches. + * + * We can't keep the same spaceAllowed value, because as we keep adding + * batches we're guaranteed to exceed the older values simply thanks to + * the BufFile allocations. + * + * Instead, we consider the "break even" threshold for the current number + * of batches, add a bit of slack (so that we don't get into a cycle of + * incrementing number of batches), and calculate the new limit from that. + * + * For well estimated cases this should do nothing, as the batches are + * expected to account only for a small fraction of work_mem. But if we + * significantly underestimate the number of batches, or if one batch + * happens to be very large, this will relax the limit a bit. + * + * This means we won't enforce the work_mem limit strictly - but without + * adjusting the limit that wouldn't be the case either, we'd just use + * a lot of memory for the BufFiles without accounting for that. This + * way we do our best to minimize the amount of memory used. + */ +static void +ExecHashAdjustMemoryLimit(HashJoinTable hashtable) +{ + Size newSpaceAllowed; + + /* + * The next time increasing the number of batches "breaks even" is when + * + * (spaceUsed / 2) == (2 * nbatches * BLCKSZ) + * + * which means + * + * spaceUsed == (4 * nbatches * BLCKSZ) + * + * However, this is a "break even" threshold, when we shrink the hash + * table just enough to compensate the new batches, and we'd hit the + * new threshold almost immediately again. In practice we want to free + * more memory to allow new data before having to increase the number + * of batches again. So we allow 25% more space. + */ + newSpaceAllowed + = 1.25 * (4 * hashtable->nbatch * sizeof(PGAlignedBlock)); + + /* but also account for the buckets, and the current batch files */ + newSpaceAllowed += hashtable->nbuckets_optimal * sizeof(HashJoinTuple); + newSpaceAllowed += (2 * hashtable->nbatch * sizeof(PGAlignedBlock)); + + /* shouldn't go down, but use Max() to make sure */ + hashtable->spaceAllowed = Max(hashtable->spaceAllowed, + newSpaceAllowed); +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2349,8 +2468,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak memory usage */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2399,8 +2519,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak memory usage */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2489,8 +2610,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak memory usage */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 4fc34a0e72a..8d54822eb8c 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -198,7 +198,7 @@ rollback to settings; -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; -set local work_mem = '128kB'; +set local work_mem = '512kB'; set local hash_mem_multiplier = 1.0; explain (costs off) select count(*) from simple r join simple s using (id); @@ -232,7 +232,7 @@ rollback to settings; -- parallel with parallel-oblivious hash join savepoint settings; set local max_parallel_workers_per_gather = 2; -set local work_mem = '128kB'; +set local work_mem = '512kB'; set local hash_mem_multiplier = 1.0; set local enable_parallel_hash = off; explain (costs off) diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 6b0688ab0a6..ca8758900aa 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -145,7 +145,7 @@ rollback to settings; -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; -set local work_mem = '128kB'; +set local work_mem = '512kB'; set local hash_mem_multiplier = 1.0; explain (costs off) select count(*) from simple r join simple s using (id); @@ -160,7 +160,7 @@ rollback to settings; -- parallel with parallel-oblivious hash join savepoint settings; set local max_parallel_workers_per_gather = 2; -set local work_mem = '128kB'; +set local work_mem = '512kB'; set local hash_mem_multiplier = 1.0; set local enable_parallel_hash = off; explain (costs off) -- 2.47.1
From 030fca4e56132dd516b8aba23ed36c47311b33f5 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@vondra.me> Date: Tue, 31 Dec 2024 17:08:17 +0100 Subject: [PATCH v20241231-multi-spill] Hash join with a multiple spill files We can't use arbitrary number of batches in the hash join, because that can use substantial memory use, ignored by the memory limit. Instead, decide how many batches we can keep in memory, and open files only for this "slice" of batches. For future batches we keep on spill file per slice. Then use these spill files after "switching" to the first batch in each of those non-in-memory slices. NB: Compared to the "single-spill" patch, this is less strict, as we need one spill file per slice, and the number of slices may grow during execution. It's better than the current behavior, because even with modest work_mem values we can fit 32-128 batches into memory, thus thus reducing the number of files to 1/32x - 1/128x. But with many batches (as with batch explosion) this can still use substantial amounts of memory (certainly more than work_mem). Ultimately, it behaves similarly to the "adjustment" patch, but with more complexity. And it can't handle the "oversized batch" really well, because the limit is not adjusted. --- src/backend/commands/explain.c | 6 +- src/backend/executor/nodeHash.c | 256 +++++++++++++++++++++++--- src/backend/executor/nodeHashjoin.c | 75 +++++--- src/backend/optimizer/path/costsize.c | 2 + src/include/executor/hashjoin.h | 4 + src/include/executor/nodeHash.h | 8 + src/include/nodes/execnodes.h | 1 + 7 files changed, 300 insertions(+), 52 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index a201ed30824..aa3fbeda3dd 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbatch, es); ExplainPropertyInteger("Original Hash Batches", NULL, hinstrument.nbatch_original, es); + ExplainPropertyInteger("In-Memory Hash Batches", NULL, + hinstrument.nbatch_original, es); ExplainPropertyUInteger("Peak Memory Usage", "kB", spacePeakKb, es); } else if (hinstrument.nbatch_original != hinstrument.nbatch || + hinstrument.nbatch_inmemory != hinstrument.nbatch || hinstrument.nbuckets_original != hinstrument.nbuckets) { ExplainIndentText(es); appendStringInfo(es->str, - "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: " UINT64_FORMAT "kB\n", + "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: " UINT64_FORMAT "kB\n", hinstrument.nbuckets, hinstrument.nbuckets_original, hinstrument.nbatch, hinstrument.nbatch_original, + hinstrument.nbatch_inmemory, spacePeakKb); } else diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 3e22d50e3a4..e9d482577ac 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state) size_t space_allowed; int nbuckets; int nbatch; + int nbatch_inmemory; double rows; int num_skew_mcvs; int log2_nbuckets; @@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state) state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, &space_allowed, - &nbuckets, &nbatch, &num_skew_mcvs); + &nbuckets, &nbatch, &nbatch_inmemory, + &num_skew_mcvs); /* nbuckets must be a power of 2 */ log2_nbuckets = my_log2(nbuckets); @@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state) hashtable->nSkewBuckets = 0; hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; + hashtable->nbatch_inmemory = nbatch_inmemory; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; hashtable->nbatch_outstart = nbatch; @@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state) hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; + hashtable->innerOverflowFiles = NULL; + hashtable->outerOverflowFiles = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; hashtable->spaceAllowed = space_allowed; @@ -552,6 +556,7 @@ ExecHashTableCreate(HashState *state) if (nbatch > 1 && hashtable->parallel_state == NULL) { MemoryContext oldctx; + int cnt = Min(nbatch, nbatch_inmemory); /* * allocate and initialize the file arrays in hashCxt (not needed for @@ -559,8 +564,19 @@ ExecHashTableCreate(HashState *state) */ oldctx = MemoryContextSwitchTo(hashtable->spillCxt); - hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); - hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + hashtable->innerBatchFile = palloc0_array(BufFile *, cnt); + hashtable->outerBatchFile = palloc0_array(BufFile *, cnt); + + /* also allocate files for overflow batches */ + if (nbatch > nbatch_inmemory) + { + int nslices = (nbatch / nbatch_inmemory); + + Assert(nslices % 2 == 0); + + hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1); + hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1); + } MemoryContextSwitchTo(oldctx); @@ -661,6 +677,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs) { int tupsize; @@ -669,6 +686,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t bucket_bytes; size_t max_pointers; int nbatch = 1; + int nbatch_inmemory = 1; int nbuckets; double dbuckets; @@ -811,6 +829,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, space_allowed, numbuckets, numbatches, + numbatches_inmemory, num_skew_mcvs); return; } @@ -848,11 +867,24 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch = pg_nextpower2_32(Max(2, minbatch)); } + /* + * See how many batches we can fit into memory (driven mostly by size + * of BufFile, with PGAlignedBlock being the largest part of that). + * We need one BufFile for inner and outer side, so we count it twice + * for each batch, and we stop once we exceed (work_mem/2). + */ + while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2 + <= (work_mem * 1024L / 2)) + nbatch_inmemory *= 2; + + // nbatch_inmemory = nbatch; + Assert(nbuckets > 0); Assert(nbatch > 0); *numbuckets = nbuckets; *numbatches = nbatch; + *numbatches_inmemory = nbatch_inmemory; } @@ -874,13 +906,27 @@ ExecHashTableDestroy(HashJoinTable hashtable) */ if (hashtable->innerBatchFile != NULL) { - for (i = 1; i < hashtable->nbatch; i++) + int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory); + + for (i = 1; i < n; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } + + /* number of batch slices */ + n = (hashtable->nbatch / hashtable->nbatch_inmemory) + 1; + + for (i = 1; i < n; i++) + { + if (hashtable->innerOverflowFiles[i]) + BufFileClose(hashtable->innerOverflowFiles[i]); + + if (hashtable->outerOverflowFiles[i]) + BufFileClose(hashtable->outerOverflowFiles[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -923,11 +969,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if (hashtable->innerBatchFile == NULL) { + /* XXX nbatch=1, no need to deal with nbatch_inmemory here */ + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); /* we had no file arrays before */ - hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); - hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp); + hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp); MemoryContextSwitchTo(oldcxt); @@ -936,9 +985,35 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + int oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory); + /* enlarge arrays and zero out added entries */ - hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch); - hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch); + hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp); + hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp); + + if (nbatch > hashtable->nbatch_inmemory) + { + int nslices = (nbatch / hashtable->nbatch_inmemory); + int oldnslices = (oldnbatch / hashtable->nbatch_inmemory); + + Assert(nslices > 1); + Assert(nslices % 2 == 0); + Assert((oldnslices == 1) || (oldnslices % 2 == 0)); + Assert(oldnslices <= nslices); + + if (hashtable->innerOverflowFiles == NULL) + { + hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1); + hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1); + } + else + { + hashtable->innerOverflowFiles = repalloc0_array(hashtable->innerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1); + hashtable->outerOverflowFiles = repalloc0_array(hashtable->outerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1); + } + } + } hashtable->nbatch = nbatch; @@ -1008,11 +1083,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* dump it out */ Assert(batchno > curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); hashtable->spaceUsed -= hashTupleSize; @@ -1673,22 +1755,33 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* Consider increasing number of batches if we filled work_mem. */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */ > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { + BufFile **batchFile; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); } @@ -1843,6 +1936,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +int +ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno) +{ + int slice, + curslice; + + if (hashtable->nbatch < hashtable->nbatch_inmemory) + return batchno; + + slice = batchno / hashtable->nbatch_inmemory; + curslice = hashtable->curbatch / hashtable->nbatch_inmemory; + + /* slices can't go backwards */ + Assert(slice >= curslice); + + /* overflow slice */ + if (slice > curslice) + return -1; + + /* current slice, compute index in the current array */ + return (batchno % hashtable->nbatch_inmemory); +} + +BufFile ** +ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFiles) +{ + int idx = ExecHashGetBatchIndex(hashtable, batchno); + + /* get the right overflow file */ + if (idx == -1) + { + int slice = (batchno / hashtable->nbatch_inmemory); + + return &overflowFiles[slice]; + } + + /* batch file in the current slice */ + return &batchFiles[idx]; +} + +void +ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable) +{ + int slice = (hashtable->curbatch / hashtable->nbatch_inmemory); + + memset(hashtable->innerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice]; + hashtable->innerOverflowFiles[slice] = NULL; + + memset(hashtable->outerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice]; + hashtable->outerOverflowFiles[slice] = NULL; +} + +int +ExecHashSwitchToNextBatch(HashJoinTable hashtable) +{ + int batchidx; + + hashtable->curbatch++; + + /* see if we skipped to the next batch slice */ + batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch); + + /* Can't be -1, current batch is in the current slice by definition. */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); + + /* + * If we skipped to the next slice of batches, reset the array of files + * and use the overflow file as the first batch. + */ + if (batchidx == 0) + ExecHashSwitchToNextBatchSlice(hashtable); + + return hashtable->curbatch; +} + +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + /* Account for slice files (inner + outer) */ + spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2349,8 +2544,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2399,8 +2595,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2489,8 +2686,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); @@ -2569,10 +2768,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; @@ -2750,6 +2956,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument, hashtable->nbatch); instrument->nbatch_original = Max(instrument->nbatch_original, hashtable->nbatch_original); + instrument->nbatch_inmemory = Min(hashtable->nbatch, + hashtable->nbatch_inmemory); instrument->space_peak = Max(instrument->space_peak, hashtable->spacePeak); } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ea0045bc0f3..580b5da93bd 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -481,10 +481,15 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + BufFile **batchFile; bool shouldFree; MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, &shouldFree); + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); + /* * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. @@ -492,7 +497,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); ExecHashJoinSaveTuple(mintuple, hashvalue, - &hashtable->outerBatchFile[batchno], + batchFile, hashtable); if (shouldFree) @@ -1030,17 +1035,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { - BufFile *file = hashtable->outerBatchFile[curbatch]; + BufFile **file = ExecHashGetBatchFile(hashtable, curbatch, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); /* * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (*file == NULL) return NULL; slot = ExecHashJoinGetSavedTuple(hjstate, - file, + *file, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -1135,9 +1142,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) BufFile *innerFile; TupleTableSlot *slot; uint32 hashvalue; + int batchidx; + int curbatch_old; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + curbatch_old = curbatch; + + /* index of the old batch */ + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + + /* has to be in the current slice of batches */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); if (curbatch > 0) { @@ -1145,9 +1161,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * We no longer need the previous outer batch file; close it right * away to free disk space. */ - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; } else /* we just finished the first batch */ { @@ -1182,45 +1198,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * scan, we have to rescan outer batches in case they contain tuples that * need to be reassigned. */ - curbatch++; + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + while (curbatch < nbatch && - (hashtable->outerBatchFile[curbatch] == NULL || - hashtable->innerBatchFile[curbatch] == NULL)) + (hashtable->outerBatchFile[batchidx] == NULL || + hashtable->innerBatchFile[batchidx] == NULL)) { - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && HJ_FILL_OUTER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && HJ_FILL_INNER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && nbatch != hashtable->nbatch_original) break; /* must process due to rule 2 */ - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ /* We can ignore this batch. */ /* Release associated temp files right away. */ - if (hashtable->innerBatchFile[curbatch]) - BufFileClose(hashtable->innerBatchFile[curbatch]); - hashtable->innerBatchFile[curbatch] = NULL; - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; - curbatch++; + if (hashtable->innerBatchFile[batchidx]) + BufFileClose(hashtable->innerBatchFile[batchidx]); + hashtable->innerBatchFile[batchidx] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; + + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); } if (curbatch >= nbatch) + { + hashtable->curbatch = curbatch_old; return false; /* no more batches */ - - hashtable->curbatch = curbatch; + } /* * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); - innerFile = hashtable->innerBatchFile[curbatch]; + innerFile = hashtable->innerBatchFile[batchidx]; if (innerFile != NULL) { @@ -1246,15 +1267,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * needed */ BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + hashtable->innerBatchFile[batchidx] = NULL; } /* * Rewind outer batch file (if present), so that we can start reading it. */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (hashtable->outerBatchFile[batchidx] != NULL) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET)) + if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file"))); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index c36687aa4df..6f40600d10b 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; + int numbatches_inmemory; int num_skew_mcvs; size_t space_allowed; /* unused */ @@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, &space_allowed, &numbuckets, &numbatches, + &numbatches_inmemory, &num_skew_mcvs); /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 2d8ed8688cd..76c983dbd4d 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -320,6 +320,7 @@ typedef struct HashJoinTableData int *skewBucketNums; /* array indexes of active skew buckets */ int nbatch; /* number of batches */ + int nbatch_inmemory; /* max number of in-memory batches */ int curbatch; /* current batch #; 0 during 1st pass */ int nbatch_original; /* nbatch when we started inner scan */ @@ -331,6 +332,9 @@ typedef struct HashJoinTableData double partialTuples; /* # tuples obtained from inner plan by me */ double skewTuples; /* # tuples inserted into skew tuples */ + BufFile **innerOverflowFiles; /* temp file for inner overflow batches */ + BufFile **outerOverflowFiles; /* temp file for outer overflow batches */ + /* * These arrays are allocated for the life of the hash join, but only if * nbatch > 1. A file is opened only when we first write a tuple into it diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index e4eb7bc6359..ef1d2bec15a 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -16,6 +16,7 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +#include "storage/buffile.h" struct SharedHashJoinBatch; @@ -46,6 +47,12 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno); +extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno); +extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, + BufFile **overflowFiles); +extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable); +extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); @@ -62,6 +69,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1590b643920..eb8eaea89ab 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ + int nbatch_inmemory; /* number of batches kept in memory */ Size space_peak; /* peak memory usage in bytes */ } HashInstrumentation; -- 2.47.1
From 55142837eaf439652fd91e165f3d4c3607ea9987 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <to...@vondra.me> Date: Tue, 31 Dec 2024 17:34:59 +0100 Subject: [PATCH v20241231-single-spill] Hash join with a single spill file We can't use arbitrary number of batches in the hash join, because that can use substantial memory use, ignored by the memory limit. Instead, decide how many batches we can keep in memory, and open files only for those batches, and one "overflow" file for all future batches. Then use this spill file after "switching" to the first batch that is not in memory. NB: This allows enforcing the memory limit very strictly, but it's very inefficient and causes a lot of temporary file traffic. --- src/backend/commands/explain.c | 6 +- src/backend/executor/nodeHash.c | 206 +++++++++++++++++++++++--- src/backend/executor/nodeHashjoin.c | 76 ++++++---- src/backend/optimizer/path/costsize.c | 2 + src/include/executor/hashjoin.h | 4 + src/include/executor/nodeHash.h | 7 + src/include/nodes/execnodes.h | 1 + 7 files changed, 250 insertions(+), 52 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index a201ed30824..aa3fbeda3dd 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbatch, es); ExplainPropertyInteger("Original Hash Batches", NULL, hinstrument.nbatch_original, es); + ExplainPropertyInteger("In-Memory Hash Batches", NULL, + hinstrument.nbatch_original, es); ExplainPropertyUInteger("Peak Memory Usage", "kB", spacePeakKb, es); } else if (hinstrument.nbatch_original != hinstrument.nbatch || + hinstrument.nbatch_inmemory != hinstrument.nbatch || hinstrument.nbuckets_original != hinstrument.nbuckets) { ExplainIndentText(es); appendStringInfo(es->str, - "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: " UINT64_FORMAT "kB\n", + "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: " UINT64_FORMAT "kB\n", hinstrument.nbuckets, hinstrument.nbuckets_original, hinstrument.nbatch, hinstrument.nbatch_original, + hinstrument.nbatch_inmemory, spacePeakKb); } else diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 3e22d50e3a4..58062a8b256 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state) size_t space_allowed; int nbuckets; int nbatch; + int nbatch_inmemory; double rows; int num_skew_mcvs; int log2_nbuckets; @@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state) state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, &space_allowed, - &nbuckets, &nbatch, &num_skew_mcvs); + &nbuckets, &nbatch, &nbatch_inmemory, + &num_skew_mcvs); /* nbuckets must be a power of 2 */ log2_nbuckets = my_log2(nbuckets); @@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state) hashtable->nSkewBuckets = 0; hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; + hashtable->nbatch_inmemory = nbatch_inmemory; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; hashtable->nbatch_outstart = nbatch; @@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state) hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; + hashtable->innerOverflowFile = NULL; + hashtable->outerOverflowFile = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; hashtable->spaceAllowed = space_allowed; @@ -553,14 +557,16 @@ ExecHashTableCreate(HashState *state) { MemoryContext oldctx; + int cnt = Min(nbatch, nbatch_inmemory); + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ oldctx = MemoryContextSwitchTo(hashtable->spillCxt); - hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); - hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + hashtable->innerBatchFile = palloc0_array(BufFile *, cnt); + hashtable->outerBatchFile = palloc0_array(BufFile *, cnt); MemoryContextSwitchTo(oldctx); @@ -661,6 +667,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs) { int tupsize; @@ -669,6 +676,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t bucket_bytes; size_t max_pointers; int nbatch = 1; + int nbatch_inmemory = 1; int nbuckets; double dbuckets; @@ -811,6 +819,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, space_allowed, numbuckets, numbatches, + numbatches_inmemory, num_skew_mcvs); return; } @@ -848,11 +857,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch = pg_nextpower2_32(Max(2, minbatch)); } + /* + * See how many batches we can fit into memory (driven mostly by size + * of BufFile, with PGAlignedBlock being the largest part of that). + * We need one BufFile for inner and outer side, so we count it twice + * for each batch, and we stop once we exceed (work_mem/2). + */ + while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2 + <= (work_mem * 1024L / 2)) + nbatch_inmemory *= 2; + Assert(nbuckets > 0); Assert(nbatch > 0); *numbuckets = nbuckets; *numbatches = nbatch; + *numbatches_inmemory = nbatch_inmemory; } @@ -874,13 +894,21 @@ ExecHashTableDestroy(HashJoinTable hashtable) */ if (hashtable->innerBatchFile != NULL) { - for (i = 1; i < hashtable->nbatch; i++) + int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory); + + for (i = 1; i < nbatch; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } + + if (hashtable->innerOverflowFile) + BufFileClose(hashtable->innerOverflowFile); + + if (hashtable->outerOverflowFile) + BufFileClose(hashtable->outerOverflowFile); } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -923,11 +951,13 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if (hashtable->innerBatchFile == NULL) { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt); /* we had no file arrays before */ - hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); - hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); + hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp); + hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp); MemoryContextSwitchTo(oldcxt); @@ -936,9 +966,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + int oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory); + /* enlarge arrays and zero out added entries */ - hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch); - hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch); + hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp); + hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp); } hashtable->nbatch = nbatch; @@ -1008,11 +1041,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* dump it out */ Assert(batchno > curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); hashtable->spaceUsed -= hashTupleSize; @@ -1673,22 +1713,33 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* Consider increasing number of batches if we filled work_mem. */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */ > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { + BufFile **batchFile; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); } @@ -1843,6 +1894,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +int +ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno) +{ + int slice, + curslice; + + if (hashtable->nbatch <= hashtable->nbatch_inmemory) + return batchno; + + slice = batchno / hashtable->nbatch_inmemory; + curslice = hashtable->curbatch / hashtable->nbatch_inmemory; + + /* slices can't go backwards */ + Assert(slice >= curslice); + + /* overflow slice */ + if (slice > curslice) + return -1; + + /* current slice, compute index in the current array */ + return (batchno % hashtable->nbatch_inmemory); +} + +BufFile ** +ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFile) +{ + int idx = ExecHashGetBatchIndex(hashtable, batchno); + + if (idx == -1) + return overflowFile; + + return &batchFiles[idx]; +} + +void +ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable) +{ + memset(hashtable->innerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->innerBatchFile[0] = hashtable->innerOverflowFile; + hashtable->innerOverflowFile = NULL; + + memset(hashtable->outerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->outerBatchFile[0] = hashtable->outerOverflowFile; + hashtable->outerOverflowFile = NULL; +} + +int +ExecHashSwitchToNextBatch(HashJoinTable hashtable) +{ + int batchidx; + + hashtable->curbatch++; + + /* see if we skipped to the next batch slice */ + batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch); + + /* Can't be -1, current batch is in the current slice by definition. */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); + + /* + * If we skipped to the next slice of batches, reset the array of files + * and use the overflow file as the first batch. + */ + if (batchidx == 0) + ExecHashSwitchToNextBatchSlice(hashtable); + + return hashtable->curbatch; +} + +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + /* Account for slice files (inner + outer) */ + spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2349,8 +2494,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2399,8 +2545,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2489,8 +2636,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); @@ -2569,10 +2718,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + &hashtable->innerOverflowFile); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno], + batchFile, hashtable); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; @@ -2750,6 +2906,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument, hashtable->nbatch); instrument->nbatch_original = Max(instrument->nbatch_original, hashtable->nbatch_original); + instrument->nbatch_inmemory = Min(hashtable->nbatch, + hashtable->nbatch_inmemory); instrument->space_peak = Max(instrument->space_peak, hashtable->spacePeak); } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index ea0045bc0f3..2f5d94653e8 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -481,6 +481,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + BufFile **batchFile; bool shouldFree; MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, &shouldFree); @@ -491,8 +492,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) */ Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->outerBatchFile, + &hashtable->outerOverflowFile); + ExecHashJoinSaveTuple(mintuple, hashvalue, - &hashtable->outerBatchFile[batchno], + batchFile, hashtable); if (shouldFree) @@ -1030,17 +1036,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { - BufFile *file = hashtable->outerBatchFile[curbatch]; + BufFile **file = ExecHashGetBatchFile(hashtable, curbatch, + hashtable->outerBatchFile, + &hashtable->outerOverflowFile); /* * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (*file == NULL) return NULL; slot = ExecHashJoinGetSavedTuple(hjstate, - file, + *file, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -1135,9 +1143,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) BufFile *innerFile; TupleTableSlot *slot; uint32 hashvalue; + int batchidx; + int curbatch_old; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + curbatch_old = curbatch; + + /* index of the old batch */ + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + + /* has to be in the current slice of batches */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); if (curbatch > 0) { @@ -1145,9 +1162,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * We no longer need the previous outer batch file; close it right * away to free disk space. */ - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; } else /* we just finished the first batch */ { @@ -1182,45 +1199,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * scan, we have to rescan outer batches in case they contain tuples that * need to be reassigned. */ - curbatch++; + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + while (curbatch < nbatch && - (hashtable->outerBatchFile[curbatch] == NULL || - hashtable->innerBatchFile[curbatch] == NULL)) + (hashtable->outerBatchFile[batchidx] == NULL || + hashtable->innerBatchFile[batchidx] == NULL)) { - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && HJ_FILL_OUTER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && HJ_FILL_INNER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && nbatch != hashtable->nbatch_original) break; /* must process due to rule 2 */ - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ /* We can ignore this batch. */ /* Release associated temp files right away. */ - if (hashtable->innerBatchFile[curbatch]) - BufFileClose(hashtable->innerBatchFile[curbatch]); - hashtable->innerBatchFile[curbatch] = NULL; - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; - curbatch++; + if (hashtable->innerBatchFile[batchidx]) + BufFileClose(hashtable->innerBatchFile[batchidx]); + hashtable->innerBatchFile[batchidx] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; + + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); } if (curbatch >= nbatch) + { + hashtable->curbatch = curbatch_old; return false; /* no more batches */ - - hashtable->curbatch = curbatch; + } /* * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); - innerFile = hashtable->innerBatchFile[curbatch]; + innerFile = hashtable->innerBatchFile[batchidx]; if (innerFile != NULL) { @@ -1246,15 +1268,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * needed */ BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + hashtable->innerBatchFile[batchidx] = NULL; } /* * Rewind outer batch file (if present), so that we can start reading it. */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (hashtable->outerBatchFile[batchidx] != NULL) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET)) + if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file"))); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index c36687aa4df..6f40600d10b 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; + int numbatches_inmemory; int num_skew_mcvs; size_t space_allowed; /* unused */ @@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, &space_allowed, &numbuckets, &numbatches, + &numbatches_inmemory, &num_skew_mcvs); /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 2d8ed8688cd..c612ae98117 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -320,6 +320,7 @@ typedef struct HashJoinTableData int *skewBucketNums; /* array indexes of active skew buckets */ int nbatch; /* number of batches */ + int nbatch_inmemory; /* max number of in-memory batches */ int curbatch; /* current batch #; 0 during 1st pass */ int nbatch_original; /* nbatch when we started inner scan */ @@ -331,6 +332,9 @@ typedef struct HashJoinTableData double partialTuples; /* # tuples obtained from inner plan by me */ double skewTuples; /* # tuples inserted into skew tuples */ + BufFile *innerOverflowFile; /* temp file for overflow batch batch */ + BufFile *outerOverflowFile; /* temp file for overflow batch batch */ + /* * These arrays are allocated for the life of the hash join, but only if * nbatch > 1. A file is opened only when we first write a tuple into it diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index e4eb7bc6359..fc82ea43c3b 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -16,6 +16,7 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +#include "storage/buffile.h" struct SharedHashJoinBatch; @@ -46,6 +47,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno); +extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno); +extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **files, BufFile **overflow); +extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable); +extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); @@ -62,6 +68,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1590b643920..eb8eaea89ab 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ + int nbatch_inmemory; /* number of batches kept in memory */ Size space_peak; /* peak memory usage in bytes */ } HashInstrumentation; -- 2.47.1
hash.sql
Description: application/sql