On Tue, May 07, 2019 at 05:30:27PM -0700, Melanie Plageman wrote:
On Mon, May 6, 2019 at 8:15 PM Tomas Vondra <tomas.von...@2ndquadrant.com>
wrote:
Nope, that's not how it works. It's the array of batches that gets
sliced, not the batches themselves.
It does slightly increase the amount of data we need to shuffle between
the temp files, because we can't write the data directly to batches in
"future" slices. But that amplification is capped to ~2.2x (compared to
the ~1.4x in master) - I've shared some measurements in [1].
[1]
https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development
Cool, I misunderstood. I looked at the code again today, and, at the email
thread where you measured "amplification".
Oh! I hope you're not too disgusted by the code in that PoC patch ;-)
In terms of how many times you write each tuple, is it accurate to
say that a tuple can now be spilled three times (in the worst case)
whereas, before, it could be spilled only twice?
1 - when building the inner side hashtable, tuple is spilled to a "slice"
file
2 - (assuming the number of batches was increased) during execution, when
a tuple belonging to a later slice's spill file is found, it is re-spilled
to that slice's spill file
3 - during execution, when reading from its slice file, it is re-spilled
(again) to its batch's spill file
Yes, that's mostly accurate understanding. Essentially this might add
one extra step of "reshuffling" from the per-slice to per-batch files.
Is it correct that the max number of BufFile structs you will have
is equal to the number of slices + number of batches in a slice
because that is the max number of open BufFiles you would have at a
time?
Yes. With the caveat that we need twice that number of BufFile structs,
because we need them on both sides of the join.
By the way, applying v4 patch on master, in an assert build, I am tripping
some
asserts -- starting with
Assert(!file->readOnly);
in BufFileWrite
Whoooops :-/
One thing I was a little confused by was the nbatch_inmemory member
of the hashtable. The comment in ExecChooseHashTableSize says that
it is determining the number of batches we can fit in memory. I
thought that the problem was the amount of space taken up by the
BufFile data structure itself--which is related to the number of
open BufFiles you need at a time. This comment in
ExecChooseHashTableSize makes it sound like you are talking about
fitting more than one batch of tuples into memory at a time. I was
under the impression that you could only fit one batch of tuples in
memory at a time.
I suppose you mean this chunk:
/*
* See how many batches we can fit into memory (driven mostly by size
* of BufFile, with PGAlignedBlock being the largest part of that).
* We need one BufFile for inner and outer side, so we count it twice
* for each batch, and we stop once we exceed (work_mem/2).
*/
while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
<= (work_mem * 1024L / 2))
nbatch_inmemory *= 2;
Yeah, that comment is a bit confusing. What the code actually does is
computing the largest "slice" of batches for which we can keep the
BufFile structs in memory, without exceeding work_mem/2.
Maybe the nbatch_inmemory should be renamed to nbatch_slice, not sure.
So, I was stepping through the code with work_mem set to the lower
bound, and in ExecHashIncreaseNumBatches, I got confused.
hashtable->nbatch_inmemory was 2 for me, thus, nbatch_tmp was 2 so,
I didn't meet this condition if (nbatch_tmp >
hashtable->nbatch_inmemory) since I just set nbatch_tmp using
hashtable->nbatch_inmemory So, I didn't increase the number of
slices, which is what I was expecting. What happens when
hashtable->nbatch_inmemory is equal to nbatch_tmp?
Ah, good catch. The condition you're refering to
if (nbatch_tmp > hashtable->nbatch_inmemory)
should actually be
if (nbatch > hashtable->nbatch_inmemory)
because the point is to initialize BufFile structs for the overflow
files, and we need to do that once we cross nbatch_inmemory.
And it turns out this actually causes the assert failures in regression
tests, you reported earlier. It failed to initialize the overflow files
in some cases, so the readOnly flag seemed to be set.
Attached is an updated patch, fixing this. I tried to clarify some of
the comments too, and I fixed another bug I found while running the
regression tests. It's still very much a crappy PoC code, though.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a6c6de78f1..105989baee 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2689,6 +2689,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+
hinstrument.nbatch_original, es);
ExplainPropertyInteger("Peak Memory Usage", "kB",
spacePeakKb,
es);
}
@@ -2696,21 +2698,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbuckets_original !=
hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
-
hinstrument.nbuckets_original,
- hinstrument.nbatch,
-
hinstrument.nbatch_original,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage:
%ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+ spacePeakKb);
}
else
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d Batches:
%d Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
hinstrument.nbatch,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d (in-memory: %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+ spacePeakKb);
}
}
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 64eec91f8b..b23f4ea802 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
List *hashCollations,
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -463,7 +463,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
List *hashCollations,
state->parallel_state
!= NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch,
&num_skew_mcvs);
+ &nbuckets, &nbatch,
&nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -490,6 +491,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
List *hashCollations,
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -499,6 +501,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
List *hashCollations,
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFiles = NULL;
+ hashtable->outerOverflowFiles = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -562,16 +566,30 @@ ExecHashTableCreate(HashState *state, List
*hashOperators, List *hashCollations,
if (nbatch > 1 && hashtable->parallel_state == NULL)
{
+ int cnt = Min(nbatch, nbatch_inmemory);
+
/*
* allocate and initialize the file arrays in hashCxt (not
needed for
* parallel case which uses shared tuplestores instead of raw
files)
*/
hashtable->innerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for
them */
+
+ /* also allocate files for overflow batches */
+ if (nbatch > nbatch_inmemory)
+ {
+ int nslices = (nbatch / nbatch_inmemory);
+
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+
PrepareTempTablespaces();
}
@@ -668,6 +686,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -678,6 +697,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
long max_pointers;
long mppow2;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -798,6 +818,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
space_allowed,
numbuckets,
numbatches,
+
numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -834,11 +855,33 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
bool useskew,
nbatch <<= 1;
}
+ /*
+ * See what's the largest slice of batches for which we can keep the
+ * BufFile structs in memory (PGAlignedBlock being the largest part
+ * of that struct). We need two BufFiles per batch (one for inner, one
+ * for outer side), and we stop once we'd need more than (work_mem/2),
+ * because we need to leave some memory for the hash table.
+ *
+ * This only accounts for per-batch BufFiles. We ignore (per-slice)
+ * overflow files, because those serve as "damage control" for cases
+ * when per-batch BufFiles would exceed work_mem. Given enough batches
+ * it's impossible to enforce work_mem strictly, because the overflow
+ * files alone will consume more memory. But the larger the slice the
+ * slower the memory growth, so we want the largest slice possible.
+ *
+ * Also, in case of an underestimates we won't even know how many
+ * slices we'll actually need until execution time.
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -860,13 +903,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < n; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ /* number of batch slices */
+ n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+ for (i = 1; i < n; i++)
+ {
+ if (hashtable->innerOverflowFiles[i])
+ BufFileClose(hashtable->innerOverflowFiles[i]);
+
+ if (hashtable->outerOverflowFiles[i])
+ BufFileClose(hashtable->outerOverflowFiles[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -912,6 +969,10 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+ Assert(nbatch == 2);
+ Assert(hashtable->nbatch_inmemory >= nbatch);
+
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -922,15 +983,51 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
- repalloc(hashtable->innerBatchFile, nbatch *
sizeof(BufFile *));
+ repalloc(hashtable->innerBatchFile, nbatch_tmp *
sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- repalloc(hashtable->outerBatchFile, nbatch *
sizeof(BufFile *));
- MemSet(hashtable->innerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
- MemSet(hashtable->outerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
+ repalloc(hashtable->outerBatchFile, nbatch_tmp *
sizeof(BufFile *));
+
+ if (oldnbatch < nbatch_tmp)
+ {
+ MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ }
+
+ /* add overflow files if needed (once we exceed
nbatch_inmemory) */
+ if (nbatch > hashtable->nbatch_inmemory)
+ {
+ int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+ if (hashtable->innerOverflowFiles == NULL)
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+ else
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ repalloc(hashtable->innerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ repalloc(hashtable->outerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+
+ /* we double the number of batches, so we know
the old
+ * value was nslices/2 exactly */
+ memset(hashtable->innerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ memset(hashtable->outerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ }
+ }
}
MemoryContextSwitchTo(oldcxt);
@@ -1002,11 +1099,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable,
batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+
batchFile);
hashtable->spaceUsed -= hashTupleSize;
nfreed++;
@@ -1651,22 +1755,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem.
*/
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
}
if (shouldFree)
@@ -1908,6 +2023,116 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ /* get the right overflow file */
+ if (idx == -1)
+ {
+ int slice = (batchno / hashtable->nbatch_inmemory);
+
+ return &overflowFiles[slice];
+ }
+
+ /* batch file in the current slice */
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ int slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+ /* this should only happen when batches are sliced */
+ Assert(hashtable->nbatch_inmemory < hashtable->nbatch);
+ Assert(hashtable->curbatch < hashtable->nbatch);
+
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+ hashtable->innerOverflowFiles[slice] = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+ hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* we've processed all batches, we're done */
+ if (hashtable->curbatch >= hashtable->nbatch)
+ return hashtable->curbatch;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2287,8 +2512,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2338,8 +2564,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2428,8 +2655,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2508,11 +2737,19 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
pfree(hashTuple);
+
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
}
@@ -2660,6 +2897,7 @@ ExecHashGetInstrumentation(HashInstrumentation
*instrument,
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
hashtable->nbatch_inmemory);
instrument->space_peak = hashtable->spacePeak;
}
diff --git a/src/backend/executor/nodeHashjoin.c
b/src/backend/executor/nodeHashjoin.c
index aa43296e26..ac27e8c738 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -391,6 +391,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
node->hj_CurSkewBucketNo ==
INVALID_SKEW_BUCKET_NO)
{
bool shouldFree;
+ BufFile **batchFile;
+
MinimalTuple mintuple =
ExecFetchSlotMinimalTuple(outerTupleSlot,
&shouldFree);
@@ -400,8 +402,12 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
- ExecHashJoinSaveTuple(mintuple,
hashvalue,
-
&hashtable->outerBatchFile[batchno]);
+
+ batchFile =
ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
+
+ ExecHashJoinSaveTuple(mintuple,
hashvalue, batchFile);
if (shouldFree)
heap_free_minimal_tuple(mintuple);
@@ -867,17 +873,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
/*
* In outer-join cases, we could get here even though the batch
file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
-
file,
+
*file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -965,9 +973,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -975,9 +992,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it
right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the
first batch */
{
@@ -1011,45 +1028,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples
that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to
rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to
rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1075,15 +1097,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading
it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L,
SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L,
SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c
b/src/backend/optimizer/path/costsize.c
index afd32884a2..0128fc91ed 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3268,6 +3268,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -3317,6 +3318,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..58c4b9d67b 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew
buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of
in-memory batches */
int curbatch; /* current batch #; 0
during 1st pass */
int nbatch_original; /* nbatch when we
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
+ BufFile **innerOverflowFiles; /* temp file for inner overflow batches
*/
+ BufFile **outerOverflowFiles; /* temp file for outer overflow batches
*/
+
/*
* Info about the datatype-specific hash functions for the datatypes
being
* hashed. These are arrays of the same length as the number of hash
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1233766023..ab1f21e06b 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable
hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext
*econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int
tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32
hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ff3328752e..3f8b4f7e33 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2198,6 +2198,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of
buckets */
int nbatch; /* number of batches at
end of execution */
int nbatch_original; /* planned number of
batches */
+ int nbatch_inmemory; /* number of batches
kept in memory */
size_t space_peak; /* speak memory usage in bytes
*/
} HashInstrumentation;