Hi,
I'm starting this thread mostly to keep track of patches developed in
response to issue [1] reported on pgsql-performance. The symptoms are
very simple - query performing a hash join ends up using much more
memory than expected (pretty much ignoring work_mem), and possibly
ending up with OOM.
The root cause is that hash join treats batches as pretty much free, but
that's not really true - we do allocate two BufFile structs per batch,
and each BufFile is ~8kB as it includes PGAlignedBuffer.
This is not ideal even if we happen to estimate everything correctly,
because for example with work_mem=4MB and nbatch=1024, it means we'll
use about 16MB (2*8kB*1024) for the BufFile structures alone, plus the
work_mem for hash table itself.
But it can easily explode when we under-estimate the hash side. In the
pgsql-performance message, the hash side (with the patches applied,
allowing the query to complete) it looks like this:
Hash (cost=2823846.37..2823846.37 rows=34619 width=930)
(actual time=252946.367..252946.367 rows=113478127 loops=1)
So it's 3277x under-estimated. It starts with 16 batches, and ends up
adding more and more batches until it fails with 524288 of them (it gets
to that many batches because some of the values are very common and we
don't disable the growth earlier).
The OOM is not very surprising, because with 524288 batches it'd need
about 8GB of memory, and the system only has 8GB RAM installed.
The two attached patches both account for the BufFile memory, but then
use very different strategies when the work_mem limit is reached.
The first patch realizes it's impossible to keep adding batches without
breaking the work_mem limit, because at some point the BufFile will need
more memory than that. But it does not make sense to stop adding batches
entirely, because then the hash table could grow indefinitely.
So the patch abandons the idea of enforcing work_mem in this situation,
and instead attempts to minimize memory usage over time - it increases
the spaceAllowed in a way that ensures doubling the number of batches
actually reduces memory usage in the long run.
The second patch tries to enforce work_mem more strictly. That would be
impossible if we were to keep all the BufFile structs in memory, so
instead it slices the batches into chunks that fit into work_mem, and
then uses a single "overflow" file for slices currently not in memory.
These extra slices can't be counted into work_mem, but we should need
just very few of them. For example with work_mem=4MB the slice is 128
batches, so we need 128x less overflow files (compared to per-batch).
Neither of those patches tweaks ExecChooseHashTableSize() to consider
memory needed for BufFiles while deciding how many batches will be
needed. That's something that probably needs to happen, but it would not
help with the underestimate issue.
I'm not entirely sure which of those approaches is the right one. The
first one is clearly just a "damage control" for cases where the hash
side turned out to be much larger than we expected. With good estimates
we probably would not have picked a hash join for those (that is, we
should have realized we can't keep work_mem and prohibit hash join).
The second patch however makes hash join viable for some of those cases,
and it seems to work pretty well (there are some numbers in the message
posted to pgsql-performance thread). So I kinda like this second one.
It's all just PoC quality, at this point, far from committable state.
[1]
https://www.postgresql.org/message-id/flat/bc138e9f-c89e-9147-5395-61d51a757b3b%40gusw.net
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4d5a6872cc 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /*
+ * Consider increasing number of batches.
+ *
+ * Each batch requires a non-trivial amount of memory, because
BufFile
+ * includes a PGAlignedBlock (typically 8kB buffer). So when
doubling
+ * the number of batches, we need to be careful and only allow
that if
+ * it actually has a chance of reducing memory usage.
+ *
+ * In particular, doubling the number of batches is pointless
when
+ *
+ * (spaceUsed / 2) < (nbatches * sizeof(BufFile))
+ *
+ * because we expect to save roughly 1/2 of memory currently
used for
+ * data (rows) at the price of doubling the memory used for
BufFile.
+ *
+ * We can't stop adding batches entirely, because that would
just mean
+ * the batches would need more and more memory. So we need to
increase
+ * the number of batches, even if we can't enforce work_mem
properly.
+ * The goal is to minimize the overall memory usage of the hash
join.
+ *
+ * Note: This applies mostly to cases of significant
underestimates,
+ * resulting in an explosion of the number of batches. The
properly
+ * estimated cases should generally end up using merge join
based on
+ * high cost of the batched hash join.
+ */
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ hashtable->nbatch * sizeof(PGAlignedBlock) * 2
> hashtable->spaceAllowed)
+ {
ExecHashIncreaseNumBatches(hashtable);
+
+ /*
+ * Consider increasing the resize threshold.
+ *
+ * For well estimated cases this does nothing, because
batches are
+ * expected to account only for small fraction of
work_mem. But if
+ * we significantly underestimate the number of
batches, we may end
+ * up in a situation where BufFile alone exceed
work_mem. So move
+ * the threshold a bit, until the next point where
it'll make sense
+ * to consider adding batches again.
+ */
+ hashtable->spaceAllowed
+ = Max(hashtable->spaceAllowed,
+ hashtable->nbatch *
sizeof(PGAlignedBlock) * 3);
+ }
}
else
{
@@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+
hinstrument.nbatch_original, es);
ExplainPropertyInteger("Peak Memory Usage", "kB",
spacePeakKb,
es);
}
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbuckets_original !=
hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
-
hinstrument.nbuckets_original,
- hinstrument.nbatch,
-
hinstrument.nbatch_original,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage:
%ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+ spacePeakKb);
}
else
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d Batches:
%d Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
hinstrument.nbatch,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d (in-memory: %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+ spacePeakKb);
}
}
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4364eb7cdd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
state->parallel_state
!= NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch,
&num_skew_mcvs);
+ &nbuckets, &nbatch,
&nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFiles = NULL;
+ hashtable->outerOverflowFiles = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List
*hashOperators, bool keepNulls)
if (nbatch > 1 && hashtable->parallel_state == NULL)
{
+ int cnt = Min(nbatch, nbatch_inmemory);
+
/*
* allocate and initialize the file arrays in hashCxt (not
needed for
* parallel case which uses shared tuplestores instead of raw
files)
*/
hashtable->innerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for
them */
+
+ /* also allocate files for overflow batches */
+ if (nbatch > nbatch_inmemory)
+ {
+ int nslices = (nbatch / nbatch_inmemory);
+
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+
PrepareTempTablespaces();
}
@@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
long max_pointers;
long mppow2;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
space_allowed,
numbuckets,
numbatches,
+
numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
bool useskew,
nbatch <<= 1;
}
+ /*
+ * See how many batches we can fit into memory (driven mostly by size
+ * of BufFile, with PGAlignedBlock being the largest part of that).
+ * We need one BufFile for inner and outer side, so we count it twice
+ * for each batch, and we stop once we exceed (work_mem/2).
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < n; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ /* number of batch slices */
+ n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+ for (i = 1; i < n; i++)
+ {
+ if (hashtable->innerOverflowFiles[i])
+ BufFileClose(hashtable->innerOverflowFiles[i]);
+
+ if (hashtable->outerOverflowFiles[i])
+ BufFileClose(hashtable->outerOverflowFiles[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
- repalloc(hashtable->innerBatchFile, nbatch *
sizeof(BufFile *));
+ repalloc(hashtable->innerBatchFile, nbatch_tmp *
sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- repalloc(hashtable->outerBatchFile, nbatch *
sizeof(BufFile *));
- MemSet(hashtable->innerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
- MemSet(hashtable->outerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
+ repalloc(hashtable->outerBatchFile, nbatch_tmp *
sizeof(BufFile *));
+
+ if (oldnbatch < nbatch_tmp)
+ {
+ MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ }
+
+ if (nbatch_tmp > hashtable->nbatch_inmemory)
+ {
+ int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+ if (hashtable->innerOverflowFiles == NULL)
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+ else
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ repalloc(hashtable->innerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ repalloc(hashtable->outerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+
+ /* we double the number of batches, so we know
the old
+ * value was nslices/2 exactly */
+ memset(hashtable->innerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ memset(hashtable->outerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ }
+ }
}
MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable,
batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+
batchFile);
hashtable->spaceUsed -= hashTupleSize;
nfreed++;
@@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem.
*/
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
}
}
@@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ /* get the right overflow file */
+ if (idx == -1)
+ {
+ int slice = (batchno / hashtable->nbatch_inmemory);
+
+ return &overflowFiles[slice];
+ }
+
+ /* batch file in the current slice */
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ int slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+ hashtable->innerOverflowFiles[slice] = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+ hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation
*instrument,
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
hashtable->nbatch_inmemory);
instrument->space_peak = hashtable->spacePeak;
}
diff --git a/src/backend/executor/nodeHashjoin.c
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..a8db71925b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo ==
INVALID_SKEW_BUCKET_NO)
{
+ BufFile **batchFile;
+
/*
* Need to postpone this outer tuple to
a later batch.
* Save it in the corresponding
outer-batch file.
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
+
+ batchFile =
ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
+
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
-
&hashtable->outerBatchFile[batchno]);
+
batchFile);
/* Loop around, staying in
HJ_NEED_NEW_OUTER state */
continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
/*
* In outer-join cases, we could get here even though the batch
file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
-
file,
+
*file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it
right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the
first batch */
{
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples
that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to
rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to
rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading
it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L,
SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L,
SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..311a0980ee 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew
buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of
in-memory batches */
int curbatch; /* current batch #; 0
during 1st pass */
int nbatch_original; /* nbatch when we
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
+ BufFile **innerOverflowFiles; /* temp file for inner overflow batches
*/
+ BufFile **outerOverflowFiles; /* temp file for outer overflow batches
*/
+
/*
* Info about the datatype-specific hash functions for the datatypes
being
* hashed. These are arrays of the same length as the number of hash
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..bb6b24a1b4 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable
hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext
*econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int
tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32
hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of
buckets */
int nbatch; /* number of batches at
end of execution */
int nbatch_original; /* planned number of
batches */
+ int nbatch_inmemory; /* number of batches
kept in memory */
size_t space_peak; /* speak memory usage in bytes
*/
} HashInstrumentation;