On Tue, May 07, 2019 at 05:30:27PM -0700, Melanie Plageman wrote:
  On Mon, May 6, 2019 at 8:15 PM Tomas Vondra <tomas.von...@2ndquadrant.com>
  wrote:

    Nope, that's not how it works. It's the array of batches that gets
    sliced, not the batches themselves.

    It does slightly increase the amount of data we need to shuffle between
    the temp files, because we can't write the data directly to batches in
    "future" slices. But that amplification is capped to ~2.2x (compared to
    the ~1.4x in master) - I've shared some measurements in [1].

    [1]
    
https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development

  Cool, I misunderstood. I looked at the code again today, and, at the email
  thread where you measured "amplification".


Oh! I hope you're not too disgusted by the code in that PoC patch ;-)

  In terms of how many times you write each tuple, is it accurate to
  say that a tuple can now be spilled three times (in the worst case)
  whereas, before, it could be spilled only twice?

  1 - when building the inner side hashtable, tuple is spilled to a "slice"
  file
  2 - (assuming the number of batches was increased) during execution, when
  a tuple belonging to a later slice's spill file is found, it is re-spilled
  to that slice's spill file
  3 - during execution, when reading from its slice file, it is re-spilled
  (again) to its batch's spill file


Yes, that's mostly accurate understanding. Essentially this might add
one extra step of "reshuffling" from the per-slice to per-batch files.

  Is it correct that the max number of BufFile structs you will have
  is equal to the number of slices + number of batches in a slice
  because that is the max number of open BufFiles you would have at a
  time?

Yes. With the caveat that we need twice that number of BufFile structs,
because we need them on both sides of the join.

  By the way, applying v4 patch on master, in an assert build, I am tripping
  some
  asserts -- starting with
  Assert(!file->readOnly);
  in BufFileWrite

Whoooops :-/

  One thing I was a little confused by was the nbatch_inmemory member
  of the hashtable.  The comment in ExecChooseHashTableSize says that
  it is determining the number of batches we can fit in memory.  I
  thought that the problem was the amount of space taken up by the
  BufFile data structure itself--which is related to the number of
  open BufFiles you need at a time. This comment in
  ExecChooseHashTableSize makes it sound like you are talking about
  fitting more than one batch of tuples into memory at a time. I was
  under the impression that you could only fit one batch of tuples in
  memory at a time.

I suppose you mean this chunk:

   /*
    * See how many batches we can fit into memory (driven mostly by size
    * of BufFile, with PGAlignedBlock being the largest part of that).
    * We need one BufFile for inner and outer side, so we count it twice
    * for each batch, and we stop once we exceed (work_mem/2).
    */
   while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
          <= (work_mem * 1024L / 2))
       nbatch_inmemory *= 2;

Yeah, that comment is a bit confusing. What the code actually does is
computing the largest "slice" of batches for which we can keep the
BufFile structs in memory, without exceeding work_mem/2.

Maybe the nbatch_inmemory should be renamed to nbatch_slice, not sure.

  So, I was stepping through the code with work_mem set to the lower
  bound, and in ExecHashIncreaseNumBatches, I got confused.
  hashtable->nbatch_inmemory was 2 for me, thus, nbatch_tmp was 2 so,
  I didn't meet this condition if (nbatch_tmp >
  hashtable->nbatch_inmemory) since I just set nbatch_tmp using
  hashtable->nbatch_inmemory So, I didn't increase the number of
  slices, which is what I was expecting.  What happens when
  hashtable->nbatch_inmemory is equal to nbatch_tmp?


Ah, good catch. The condition you're refering to

   if (nbatch_tmp > hashtable->nbatch_inmemory)

should actually be

   if (nbatch > hashtable->nbatch_inmemory)

because the point is to initialize BufFile structs for the overflow
files, and we need to do that once we cross nbatch_inmemory.

And it turns out this actually causes the assert failures in regression
tests, you reported earlier. It failed to initialize the overflow files
in some cases, so the readOnly flag seemed to be set.

Attached is an updated patch, fixing this. I tried to clarify some of
the comments too, and I fixed another bug I found while running the
regression tests. It's still very much a crappy PoC code, though.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a6c6de78f1..105989baee 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2689,6 +2689,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                                                   
hinstrument.nbatch, es);
                        ExplainPropertyInteger("Original Hash Batches", NULL,
                                                                   
hinstrument.nbatch_original, es);
+                       ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+                                                                  
hinstrument.nbatch_original, es);
                        ExplainPropertyInteger("Peak Memory Usage", "kB",
                                                                   spacePeakKb, 
es);
                }
@@ -2696,21 +2698,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                 hinstrument.nbuckets_original != 
hinstrument.nbuckets)
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets,
-                                                        
hinstrument.nbuckets_original,
-                                                        hinstrument.nbatch,
-                                                        
hinstrument.nbatch_original,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: 
%ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                spacePeakKb);
                }
                else
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d  Batches: 
%d  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets, 
hinstrument.nbatch,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                spacePeakKb);
                }
        }
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 64eec91f8b..b23f4ea802 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
List *hashCollations,
        size_t          space_allowed;
        int                     nbuckets;
        int                     nbatch;
+       int                     nbatch_inmemory;
        double          rows;
        int                     num_skew_mcvs;
        int                     log2_nbuckets;
@@ -463,7 +463,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
List *hashCollations,
                                                        state->parallel_state 
!= NULL ?
                                                        
state->parallel_state->nparticipants - 1 : 0,
                                                        &space_allowed,
-                                                       &nbuckets, &nbatch, 
&num_skew_mcvs);
+                                                       &nbuckets, &nbatch, 
&nbatch_inmemory,
+                                                       &num_skew_mcvs);
 
        /* nbuckets must be a power of 2 */
        log2_nbuckets = my_log2(nbuckets);
@@ -490,6 +491,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
List *hashCollations,
        hashtable->nSkewBuckets = 0;
        hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
+       hashtable->nbatch_inmemory = nbatch_inmemory;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
        hashtable->nbatch_outstart = nbatch;
@@ -499,6 +501,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
List *hashCollations,
        hashtable->skewTuples = 0;
        hashtable->innerBatchFile = NULL;
        hashtable->outerBatchFile = NULL;
+       hashtable->innerOverflowFiles = NULL;
+       hashtable->outerOverflowFiles = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spacePeak = 0;
        hashtable->spaceAllowed = space_allowed;
@@ -562,16 +566,30 @@ ExecHashTableCreate(HashState *state, List 
*hashOperators, List *hashCollations,
 
        if (nbatch > 1 && hashtable->parallel_state == NULL)
        {
+               int     cnt = Min(nbatch, nbatch_inmemory);
+
                /*
                 * allocate and initialize the file arrays in hashCxt (not 
needed for
                 * parallel case which uses shared tuplestores instead of raw 
files)
                 */
                hashtable->innerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                /* The files will not be opened until needed... */
                /* ... but make sure we have temp tablespaces established for 
them */
+
+               /* also allocate files for overflow batches */
+               if (nbatch > nbatch_inmemory)
+               {
+                       int nslices = (nbatch / nbatch_inmemory);
+
+                       hashtable->innerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+                       hashtable->outerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+               }
+
                PrepareTempTablespaces();
        }
 
@@ -668,6 +686,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs)
 {
        int                     tupsize;
@@ -678,6 +697,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
        long            max_pointers;
        long            mppow2;
        int                     nbatch = 1;
+       int                     nbatch_inmemory = 1;
        int                     nbuckets;
        double          dbuckets;
 
@@ -798,6 +818,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                                        
space_allowed,
                                                                        
numbuckets,
                                                                        
numbatches,
+                                                                       
numbatches_inmemory,
                                                                        
num_skew_mcvs);
                        return;
                }
@@ -834,11 +855,33 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, 
bool useskew,
                        nbatch <<= 1;
        }
 
+       /*
+        * See what's the largest slice of batches for which we can keep the
+        * BufFile structs in memory (PGAlignedBlock being the largest part
+        * of that struct). We need two BufFiles per batch (one for inner, one
+        * for outer side), and we stop once we'd need more than (work_mem/2),
+        * because we need to leave some memory for the hash table.
+        *
+        * This only accounts for per-batch BufFiles. We ignore (per-slice)
+        * overflow files, because those serve as "damage control" for cases
+        * when per-batch BufFiles would exceed work_mem. Given enough batches
+        * it's impossible to enforce work_mem strictly, because the overflow
+        * files alone will consume more memory. But the larger the slice the
+        * slower the memory growth, so we want the largest slice possible.
+        *
+        * Also, in case of an underestimates we won't even know how many
+        * slices we'll actually need until execution time.
+        */
+       while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+                       <= (work_mem * 1024L / 2))
+               nbatch_inmemory *= 2;
+
        Assert(nbuckets > 0);
        Assert(nbatch > 0);
 
        *numbuckets = nbuckets;
        *numbatches = nbatch;
+       *numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -860,13 +903,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
         */
        if (hashtable->innerBatchFile != NULL)
        {
-               for (i = 1; i < hashtable->nbatch; i++)
+               int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+               for (i = 1; i < n; i++)
                {
                        if (hashtable->innerBatchFile[i])
                                BufFileClose(hashtable->innerBatchFile[i]);
                        if (hashtable->outerBatchFile[i])
                                BufFileClose(hashtable->outerBatchFile[i]);
                }
+
+               /* number of batch slices */
+               n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+               for (i = 1; i < n; i++)
+               {
+                       if (hashtable->innerOverflowFiles[i])
+                               BufFileClose(hashtable->innerOverflowFiles[i]);
+
+                       if (hashtable->outerOverflowFiles[i])
+                               BufFileClose(hashtable->outerOverflowFiles[i]);
+               }
        }
 
        /* Release working memory (batchCxt is a child, so it goes away too) */
@@ -912,6 +969,10 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
        if (hashtable->innerBatchFile == NULL)
        {
+               /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+               Assert(nbatch == 2);
+               Assert(hashtable->nbatch_inmemory >= nbatch);
+
                /* we had no file arrays before */
                hashtable->innerBatchFile = (BufFile **)
                        palloc0(nbatch * sizeof(BufFile *));
@@ -922,15 +983,51 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
        }
        else
        {
+               int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
                /* enlarge arrays and zero out added entries */
                hashtable->innerBatchFile = (BufFile **)
-                       repalloc(hashtable->innerBatchFile, nbatch * 
sizeof(BufFile *));
+                       repalloc(hashtable->innerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       repalloc(hashtable->outerBatchFile, nbatch * 
sizeof(BufFile *));
-               MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
-               MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
+                       repalloc(hashtable->outerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
+
+               if (oldnbatch < nbatch_tmp)
+               {
+                       MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+                       MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+               }
+
+               /* add overflow files if needed (once we exceed 
nbatch_inmemory) */
+               if (nbatch > hashtable->nbatch_inmemory)
+               {
+                       int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+                       if (hashtable->innerOverflowFiles == NULL)
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                       }
+                       else
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->innerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->outerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+
+                               /* we double the number of batches, so we know 
the old
+                                * value was nslices/2 exactly */
+                               memset(hashtable->innerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                               memset(hashtable->outerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                       }
+               }
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -1002,11 +1099,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                        }
                        else
                        {
+                               BufFile **batchFile;
+
                                /* dump it out */
                                Assert(batchno > curbatch);
+
+                               batchFile = ExecHashGetBatchFile(hashtable, 
batchno,
+                                                                               
                 hashtable->innerBatchFile,
+                                                                               
                 hashtable->innerOverflowFiles);
+
                                
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
                                                                          
hashTuple->hashvalue,
-                                                                         
&hashtable->innerBatchFile[batchno]);
+                                                                         
batchFile);
 
                                hashtable->spaceUsed -= hashTupleSize;
                                nfreed++;
@@ -1651,22 +1755,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /* Consider increasing number of batches if we filled work_mem. 
*/
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       Min(hashtable->nbatch, hashtable->nbatch_inmemory) * 
sizeof(PGAlignedBlock) * 2 /* inner + outer */
                        > hashtable->spaceAllowed)
                        ExecHashIncreaseNumBatches(hashtable);
        }
        else
        {
+               BufFile **batchFile;
+
                /*
                 * put the tuple into a temp file for later batches
                 */
                Assert(batchno > hashtable->curbatch);
+
+               batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
 hashtable->innerBatchFile,
+                                                                               
 hashtable->innerOverflowFiles);
+
                ExecHashJoinSaveTuple(tuple,
                                                          hashvalue,
-                                                         
&hashtable->innerBatchFile[batchno]);
+                                                         batchFile);
        }
 
        if (shouldFree)
@@ -1908,6 +2023,116 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+       int     slice,
+               curslice;
+
+       if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+               return batchno;
+
+       slice = batchno / hashtable->nbatch_inmemory;
+       curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+       /* slices can't go backwards */
+       Assert(slice >= curslice);
+
+       /* overflow slice */
+       if (slice > curslice)
+               return -1;
+
+       /* current slice, compute index in the current array */
+       return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles)
+{
+       int             idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+       /* get the right overflow file */
+       if (idx == -1)
+       {
+               int slice = (batchno / hashtable->nbatch_inmemory);
+
+               return &overflowFiles[slice];
+       }
+
+       /* batch file in the current slice */
+       return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+       int     slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+       /* this should only happen when batches are sliced */
+       Assert(hashtable->nbatch_inmemory < hashtable->nbatch);
+       Assert(hashtable->curbatch < hashtable->nbatch);
+
+       memset(hashtable->innerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+       hashtable->innerOverflowFiles[slice] = NULL;
+
+       memset(hashtable->outerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+       hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+       int             batchidx;
+
+       hashtable->curbatch++;
+
+       /* we've processed all batches, we're done */
+       if (hashtable->curbatch >= hashtable->nbatch)
+               return hashtable->curbatch;
+
+       /* see if we skipped to the next batch slice */
+       batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+       /* Can't be -1, current batch is in the current slice by definition. */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+       /*
+        * If we skipped to the next slice of batches, reset the array of files
+        * and use the overflow file as the first batch.
+        */
+       if (batchidx == 0)
+               ExecHashSwitchToNextBatchSlice(hashtable);
+
+       return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       /* Account for slice files (inner + outer) */
+       spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2287,8 +2512,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2338,8 +2564,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2428,8 +2655,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2508,11 +2737,19 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
                }
                else
                {
+                       BufFile **batchFile;
+
                        /* Put the tuple into a temp file for later batches */
                        Assert(batchno > hashtable->curbatch);
+
+                       batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
         hashtable->innerBatchFile,
+                                                                               
         hashtable->innerOverflowFiles);
+
                        ExecHashJoinSaveTuple(tuple, hashvalue,
-                                                                 
&hashtable->innerBatchFile[batchno]);
+                                                                 batchFile);
                        pfree(hashTuple);
+
                        hashtable->spaceUsed -= tupleSize;
                        hashtable->spaceUsedSkew -= tupleSize;
                }
@@ -2660,6 +2897,7 @@ ExecHashGetInstrumentation(HashInstrumentation 
*instrument,
        instrument->nbuckets_original = hashtable->nbuckets_original;
        instrument->nbatch = hashtable->nbatch;
        instrument->nbatch_original = hashtable->nbatch_original;
+       instrument->nbatch_inmemory = Min(hashtable->nbatch, 
hashtable->nbatch_inmemory);
        instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index aa43296e26..ac27e8c738 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -391,6 +391,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                        node->hj_CurSkewBucketNo == 
INVALID_SKEW_BUCKET_NO)
                                {
                                        bool            shouldFree;
+                                       BufFile   **batchFile;
+
                                        MinimalTuple mintuple = 
ExecFetchSlotMinimalTuple(outerTupleSlot,
                                                                                
                                                          &shouldFree);
 
@@ -400,8 +402,12 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                         */
                                        Assert(parallel_state == NULL);
                                        Assert(batchno > hashtable->curbatch);
-                                       ExecHashJoinSaveTuple(mintuple, 
hashvalue,
-                                                                               
  &hashtable->outerBatchFile[batchno]);
+
+                                       batchFile = 
ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
                         hashtable->outerBatchFile,
+                                                                               
                         hashtable->outerOverflowFiles);
+
+                                       ExecHashJoinSaveTuple(mintuple, 
hashvalue, batchFile);
 
                                        if (shouldFree)
                                                
heap_free_minimal_tuple(mintuple);
@@ -867,17 +873,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
        }
        else if (curbatch < hashtable->nbatch)
        {
-               BufFile    *file = hashtable->outerBatchFile[curbatch];
+               BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+                                                                               
                 hashtable->outerBatchFile,
+                                                                               
                 hashtable->outerOverflowFiles);
 
                /*
                 * In outer-join cases, we could get here even though the batch 
file
                 * is empty.
                 */
-               if (file == NULL)
+               if (*file == NULL)
                        return NULL;
 
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                               
 file,
+                                                                               
 *file,
                                                                                
 hashvalue,
                                                                                
 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
@@ -965,9 +973,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
        BufFile    *innerFile;
        TupleTableSlot *slot;
        uint32          hashvalue;
+       int                     batchidx;
+       int                     curbatch_old;
 
        nbatch = hashtable->nbatch;
        curbatch = hashtable->curbatch;
+       curbatch_old = curbatch;
+
+       /* index of the old batch */
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+       /* has to be in the current slice of batches */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
        if (curbatch > 0)
        {
@@ -975,9 +992,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * We no longer need the previous outer batch file; close it 
right
                 * away to free disk space.
                 */
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
        }
        else                                            /* we just finished the 
first batch */
        {
@@ -1011,45 +1028,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
         * scan, we have to rescan outer batches in case they contain tuples 
that
         * need to be reassigned.
         */
-       curbatch++;
+       curbatch = ExecHashSwitchToNextBatch(hashtable);
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
        while (curbatch < nbatch &&
-                  (hashtable->outerBatchFile[curbatch] == NULL ||
-                       hashtable->innerBatchFile[curbatch] == NULL))
+                  (hashtable->outerBatchFile[batchidx] == NULL ||
+                       hashtable->innerBatchFile[batchidx] == NULL))
        {
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        HJ_FILL_OUTER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        HJ_FILL_INNER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_original)
                        break;                          /* must process due to 
rule 2 */
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_outstart)
                        break;                          /* must process due to 
rule 3 */
                /* We can ignore this batch. */
                /* Release associated temp files right away. */
-               if (hashtable->innerBatchFile[curbatch])
-                       BufFileClose(hashtable->innerBatchFile[curbatch]);
-               hashtable->innerBatchFile[curbatch] = NULL;
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
-               curbatch++;
+               if (hashtable->innerBatchFile[batchidx])
+                       BufFileClose(hashtable->innerBatchFile[batchidx]);
+               hashtable->innerBatchFile[batchidx] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
+
+               curbatch = ExecHashSwitchToNextBatch(hashtable);
+               batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
        }
 
        if (curbatch >= nbatch)
+       {
+               hashtable->curbatch = curbatch_old;
                return false;                   /* no more batches */
-
-       hashtable->curbatch = curbatch;
+       }
 
        /*
         * Reload the hash table with the new inner batch (which could be empty)
         */
        ExecHashTableReset(hashtable);
 
-       innerFile = hashtable->innerBatchFile[curbatch];
+       innerFile = hashtable->innerBatchFile[batchidx];
 
        if (innerFile != NULL)
        {
@@ -1075,15 +1097,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * needed
                 */
                BufFileClose(innerFile);
-               hashtable->innerBatchFile[curbatch] = NULL;
+               hashtable->innerBatchFile[batchidx] = NULL;
        }
 
        /*
         * Rewind outer batch file (if present), so that we can start reading 
it.
         */
-       if (hashtable->outerBatchFile[curbatch] != NULL)
+       if (hashtable->outerBatchFile[batchidx] != NULL)
        {
-               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+               if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, 
SEEK_SET))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not rewind hash-join 
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index afd32884a2..0128fc91ed 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3268,6 +3268,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     numbatches_inmemory;
        int                     num_skew_mcvs;
        size_t          space_allowed;  /* unused */
 
@@ -3317,6 +3318,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
                                                        &space_allowed,
                                                        &numbuckets,
                                                        &numbatches,
+                                                       &numbatches_inmemory,
                                                        &num_skew_mcvs);
 
        /*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2c94b926d3..58c4b9d67b 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
        int                *skewBucketNums; /* array indexes of active skew 
buckets */
 
        int                     nbatch;                 /* number of batches */
+       int                     nbatch_inmemory;        /* max number of 
in-memory batches */
        int                     curbatch;               /* current batch #; 0 
during 1st pass */
 
        int                     nbatch_original;        /* nbatch when we 
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
        BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
        BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+       BufFile   **innerOverflowFiles; /* temp file for inner overflow batches 
*/
+       BufFile   **outerOverflowFiles; /* temp file for outer overflow batches 
*/
+
        /*
         * Info about the datatype-specific hash functions for the datatypes 
being
         * hashed. These are arrays of the same length as the number of hash 
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1233766023..ab1f21e06b 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable 
hashtable,
                                                  uint32 hashvalue,
                                                  int *bucketno,
                                                  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext 
*econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int 
tupwidth, bool useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs);
 extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 
hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ff3328752e..3f8b4f7e33 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2198,6 +2198,7 @@ typedef struct HashInstrumentation
        int                     nbuckets_original;      /* planned number of 
buckets */
        int                     nbatch;                 /* number of batches at 
end of execution */
        int                     nbatch_original;        /* planned number of 
batches */
+       int                     nbatch_inmemory;        /* number of batches 
kept in memory */
        size_t          space_peak;             /* speak memory usage in bytes 
*/
 } HashInstrumentation;
 

Reply via email to