Hi,

I'm starting this thread mostly to keep track of patches developed in
response to issue [1] reported on pgsql-performance. The symptoms are
very simple - query performing a hash join ends up using much more
memory than expected (pretty much ignoring work_mem), and possibly
ending up with OOM.

The root cause is that hash join treats batches as pretty much free, but
that's not really true - we do allocate two BufFile structs per batch,
and each BufFile is ~8kB as it includes PGAlignedBuffer.

This is not ideal even if we happen to estimate everything correctly,
because for example with work_mem=4MB and nbatch=1024, it means we'll
use about 16MB (2*8kB*1024) for the BufFile structures alone, plus the
work_mem for hash table itself.

But it can easily explode when we under-estimate the hash side. In the
pgsql-performance message, the hash side (with the patches applied,
allowing the query to complete) it looks like this:

 Hash (cost=2823846.37..2823846.37 rows=34619 width=930)
      (actual time=252946.367..252946.367 rows=113478127 loops=1)

So it's 3277x under-estimated. It starts with 16 batches, and ends up
adding more and more batches until it fails with 524288 of them (it gets
to that many batches because some of the values are very common and we
don't disable the growth earlier).

The OOM is not very surprising, because with 524288 batches it'd need
about 8GB of memory, and the system only has 8GB RAM installed.

The two attached patches both account for the BufFile memory, but then
use very different strategies when the work_mem limit is reached.

The first patch realizes it's impossible to keep adding batches without
breaking the work_mem limit, because at some point the BufFile will need
more memory than that. But it does not make sense to stop adding batches
entirely, because then the hash table could grow indefinitely.

So the patch abandons the idea of enforcing work_mem in this situation,
and instead attempts to minimize memory usage over time - it increases
the spaceAllowed in a way that ensures doubling the number of batches
actually reduces memory usage in the long run.

The second patch tries to enforce work_mem more strictly. That would be
impossible if we were to keep all the BufFile structs in memory, so
instead it slices the batches into chunks that fit into work_mem, and
then uses a single "overflow" file for slices currently not in memory.
These extra slices can't be counted into work_mem, but we should need
just very few of them. For example with work_mem=4MB the slice is 128
batches, so we need 128x less overflow files (compared to per-batch).


Neither of those patches tweaks ExecChooseHashTableSize() to consider
memory needed for BufFiles while deciding how many batches will be
needed. That's something that probably needs to happen, but it would not
help with the underestimate issue.

I'm not entirely sure which of those approaches is the right one. The
first one is clearly just a "damage control" for cases where the hash
side turned out to be much larger than we expected. With good estimates
we probably would not have picked a hash join for those (that is, we
should have realized we can't keep work_mem and prohibit hash join).

The second patch however makes hash join viable for some of those cases,
and it seems to work pretty well (there are some numbers in the message
posted to pgsql-performance thread). So I kinda like this second one.

It's all just PoC quality, at this point, far from committable state.


[1] 
https://www.postgresql.org/message-id/flat/bc138e9f-c89e-9147-5395-61d51a757b3b%40gusw.net


--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4d5a6872cc 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /*
+                * Consider increasing number of batches.
+                *
+                * Each batch requires a non-trivial amount of memory, because 
BufFile
+                * includes a PGAlignedBlock (typically 8kB buffer). So when 
doubling
+                * the number of batches, we need to be careful and only allow 
that if
+                * it actually has a chance of reducing memory usage.
+                *
+                * In particular, doubling the number of batches is pointless 
when
+                *
+                *              (spaceUsed / 2) < (nbatches * sizeof(BufFile))
+                *
+                * because we expect to save roughly 1/2 of memory currently 
used for
+                * data (rows) at the price of doubling the memory used for 
BufFile.
+                *
+                * We can't stop adding batches entirely, because that would 
just mean
+                * the batches would need more and more memory. So we need to 
increase
+                * the number of batches, even if we can't enforce work_mem 
properly.
+                * The goal is to minimize the overall memory usage of the hash 
join.
+                *
+                * Note: This applies mostly to cases of significant 
underestimates,
+                * resulting in an explosion of the number of batches. The 
properly
+                * estimated cases should generally end up using merge join 
based on
+                * high cost of the batched hash join.
+                */
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       hashtable->nbatch * sizeof(PGAlignedBlock) * 2
                        > hashtable->spaceAllowed)
+               {
                        ExecHashIncreaseNumBatches(hashtable);
+
+                       /*
+                        * Consider increasing the resize threshold.
+                        *
+                        * For well estimated cases this does nothing, because 
batches are
+                        * expected to account only for small fraction of 
work_mem. But if
+                        * we significantly underestimate the number of 
batches, we may end
+                        * up in a situation where BufFile alone exceed 
work_mem. So move
+                        * the threshold a bit, until the next point where 
it'll make sense
+                        * to consider adding batches again.
+                        */
+                       hashtable->spaceAllowed
+                               = Max(hashtable->spaceAllowed,
+                                         hashtable->nbatch * 
sizeof(PGAlignedBlock) * 3);
+               }
        }
        else
        {
@@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                                                   
hinstrument.nbatch, es);
                        ExplainPropertyInteger("Original Hash Batches", NULL,
                                                                   
hinstrument.nbatch_original, es);
+                       ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+                                                                  
hinstrument.nbatch_original, es);
                        ExplainPropertyInteger("Peak Memory Usage", "kB",
                                                                   spacePeakKb, 
es);
                }
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
                                 hinstrument.nbuckets_original != 
hinstrument.nbuckets)
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets,
-                                                        
hinstrument.nbuckets_original,
-                                                        hinstrument.nbatch,
-                                                        
hinstrument.nbatch_original,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: 
%ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d 
(originally %d)  Batches: %d (originally %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets,
+                                                                
hinstrument.nbuckets_original,
+                                                                
hinstrument.nbatch,
+                                                                
hinstrument.nbatch_original,
+                                                                spacePeakKb);
                }
                else
                {
                        appendStringInfoSpaces(es->str, es->indent * 2);
-                       appendStringInfo(es->str,
-                                                        "Buckets: %d  Batches: 
%d  Memory Usage: %ldkB\n",
-                                                        hinstrument.nbuckets, 
hinstrument.nbatch,
-                                                        spacePeakKb);
+                       if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d (in-memory: %d)  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                
hinstrument.nbatch_inmemory,
+                                                                spacePeakKb);
+                       else
+                               appendStringInfo(es->str,
+                                                                "Buckets: %d  
Batches: %d  Memory Usage: %ldkB\n",
+                                                                
hinstrument.nbuckets, hinstrument.nbatch,
+                                                                spacePeakKb);
                }
        }
 }
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4364eb7cdd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable 
hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *             ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
        if (hashtable->nbuckets != hashtable->nbuckets_optimal)
                ExecHashIncreaseNumBuckets(hashtable);
 
-       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-       hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
 
        hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        size_t          space_allowed;
        int                     nbuckets;
        int                     nbatch;
+       int                     nbatch_inmemory;
        double          rows;
        int                     num_skew_mcvs;
        int                     log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
                                                        state->parallel_state 
!= NULL ?
                                                        
state->parallel_state->nparticipants - 1 : 0,
                                                        &space_allowed,
-                                                       &nbuckets, &nbatch, 
&num_skew_mcvs);
+                                                       &nbuckets, &nbatch, 
&nbatch_inmemory,
+                                                       &num_skew_mcvs);
 
        /* nbuckets must be a power of 2 */
        log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->nSkewBuckets = 0;
        hashtable->skewBucketNums = NULL;
        hashtable->nbatch = nbatch;
+       hashtable->nbatch_inmemory = nbatch_inmemory;
        hashtable->curbatch = 0;
        hashtable->nbatch_original = nbatch;
        hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, 
bool keepNulls)
        hashtable->skewTuples = 0;
        hashtable->innerBatchFile = NULL;
        hashtable->outerBatchFile = NULL;
+       hashtable->innerOverflowFiles = NULL;
+       hashtable->outerOverflowFiles = NULL;
        hashtable->spaceUsed = 0;
        hashtable->spacePeak = 0;
        hashtable->spaceAllowed = space_allowed;
@@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List 
*hashOperators, bool keepNulls)
 
        if (nbatch > 1 && hashtable->parallel_state == NULL)
        {
+               int     cnt = Min(nbatch, nbatch_inmemory);
+
                /*
                 * allocate and initialize the file arrays in hashCxt (not 
needed for
                 * parallel case which uses shared tuplestores instead of raw 
files)
                 */
                hashtable->innerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       palloc0(nbatch * sizeof(BufFile *));
+                       palloc0(cnt * sizeof(BufFile *));
                /* The files will not be opened until needed... */
                /* ... but make sure we have temp tablespaces established for 
them */
+
+               /* also allocate files for overflow batches */
+               if (nbatch > nbatch_inmemory)
+               {
+                       int nslices = (nbatch / nbatch_inmemory);
+
+                       hashtable->innerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+                       hashtable->outerOverflowFiles = (BufFile **)
+                               palloc0(nslices * sizeof(BufFile *));
+               }
+
                PrepareTempTablespaces();
        }
 
@@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs)
 {
        int                     tupsize;
@@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
        long            max_pointers;
        long            mppow2;
        int                     nbatch = 1;
+       int                     nbatch_inmemory = 1;
        int                     nbuckets;
        double          dbuckets;
 
@@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool 
useskew,
                                                                        
space_allowed,
                                                                        
numbuckets,
                                                                        
numbatches,
+                                                                       
numbatches_inmemory,
                                                                        
num_skew_mcvs);
                        return;
                }
@@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, 
bool useskew,
                        nbatch <<= 1;
        }
 
+       /*
+        * See how many batches we can fit into memory (driven mostly by size
+        * of BufFile, with PGAlignedBlock being the largest part of that).
+        * We need one BufFile for inner and outer side, so we count it twice
+        * for each batch, and we stop once we exceed (work_mem/2).
+        */
+       while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+                       <= (work_mem * 1024L / 2))
+               nbatch_inmemory *= 2;
+
        Assert(nbuckets > 0);
        Assert(nbatch > 0);
 
        *numbuckets = nbuckets;
        *numbatches = nbatch;
+       *numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
         */
        if (hashtable->innerBatchFile != NULL)
        {
-               for (i = 1; i < hashtable->nbatch; i++)
+               int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+               for (i = 1; i < n; i++)
                {
                        if (hashtable->innerBatchFile[i])
                                BufFileClose(hashtable->innerBatchFile[i]);
                        if (hashtable->outerBatchFile[i])
                                BufFileClose(hashtable->outerBatchFile[i]);
                }
+
+               /* number of batch slices */
+               n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+               for (i = 1; i < n; i++)
+               {
+                       if (hashtable->innerOverflowFiles[i])
+                               BufFileClose(hashtable->innerOverflowFiles[i]);
+
+                       if (hashtable->outerOverflowFiles[i])
+                               BufFileClose(hashtable->outerOverflowFiles[i]);
+               }
        }
 
        /* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
        if (hashtable->innerBatchFile == NULL)
        {
+               /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
                /* we had no file arrays before */
                hashtable->innerBatchFile = (BufFile **)
                        palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
        }
        else
        {
+               int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
                /* enlarge arrays and zero out added entries */
                hashtable->innerBatchFile = (BufFile **)
-                       repalloc(hashtable->innerBatchFile, nbatch * 
sizeof(BufFile *));
+                       repalloc(hashtable->innerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
                hashtable->outerBatchFile = (BufFile **)
-                       repalloc(hashtable->outerBatchFile, nbatch * 
sizeof(BufFile *));
-               MemSet(hashtable->innerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
-               MemSet(hashtable->outerBatchFile + oldnbatch, 0,
-                          (nbatch - oldnbatch) * sizeof(BufFile *));
+                       repalloc(hashtable->outerBatchFile, nbatch_tmp * 
sizeof(BufFile *));
+
+               if (oldnbatch < nbatch_tmp)
+               {
+                       MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+                       MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+                                  (nbatch_tmp - oldnbatch) * sizeof(BufFile 
*));
+               }
+
+               if (nbatch_tmp > hashtable->nbatch_inmemory)
+               {
+                       int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+                       if (hashtable->innerOverflowFiles == NULL)
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       palloc0(nslices * sizeof(BufFile *));
+                       }
+                       else
+                       {
+                               hashtable->innerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->innerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+                               hashtable->outerOverflowFiles = (BufFile **)
+                                       repalloc(hashtable->outerOverflowFiles,
+                                                        nslices * 
sizeof(BufFile *));
+
+                               /* we double the number of batches, so we know 
the old
+                                * value was nslices/2 exactly */
+                               memset(hashtable->innerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                               memset(hashtable->outerOverflowFiles + 
nslices/2, 0,
+                                          (nslices/2) * sizeof(BufFile *));
+                       }
+               }
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                        }
                        else
                        {
+                               BufFile **batchFile;
+
                                /* dump it out */
                                Assert(batchno > curbatch);
+
+                               batchFile = ExecHashGetBatchFile(hashtable, 
batchno,
+                                                                               
                 hashtable->innerBatchFile,
+                                                                               
                 hashtable->innerOverflowFiles);
+
                                
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
                                                                          
hashTuple->hashvalue,
-                                                                         
&hashtable->innerBatchFile[batchno]);
+                                                                         
batchFile);
 
                                hashtable->spaceUsed -= hashTupleSize;
                                nfreed++;
@@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
                /* Account for space used, and back off if we've used too much 
*/
                hashtable->spaceUsed += hashTupleSize;
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
+
+               /* Consider increasing number of batches if we filled work_mem. 
*/
                if (hashtable->spaceUsed +
-                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+                       hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+                       Min(hashtable->nbatch, hashtable->nbatch_inmemory) * 
sizeof(PGAlignedBlock) * 2 /* inner + outer */
                        > hashtable->spaceAllowed)
                        ExecHashIncreaseNumBatches(hashtable);
        }
        else
        {
+               BufFile **batchFile;
+
                /*
                 * put the tuple into a temp file for later batches
                 */
                Assert(batchno > hashtable->curbatch);
+
+               batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
 hashtable->innerBatchFile,
+                                                                               
 hashtable->innerOverflowFiles);
+
                ExecHashJoinSaveTuple(tuple,
                                                          hashvalue,
-                                                         
&hashtable->innerBatchFile[batchno]);
+                                                         batchFile);
        }
 }
 
@@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
        }
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+       int     slice,
+               curslice;
+
+       if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+               return batchno;
+
+       slice = batchno / hashtable->nbatch_inmemory;
+       curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+       /* slices can't go backwards */
+       Assert(slice >= curslice);
+
+       /* overflow slice */
+       if (slice > curslice)
+               return -1;
+
+       /* current slice, compute index in the current array */
+       return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles)
+{
+       int             idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+       /* get the right overflow file */
+       if (idx == -1)
+       {
+               int slice = (batchno / hashtable->nbatch_inmemory);
+
+               return &overflowFiles[slice];
+       }
+
+       /* batch file in the current slice */
+       return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+       int     slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+       memset(hashtable->innerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+       hashtable->innerOverflowFiles[slice] = NULL;
+
+       memset(hashtable->outerBatchFile, 0,
+                  hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+       hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+       hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+       int             batchidx;
+
+       hashtable->curbatch++;
+
+       /* see if we skipped to the next batch slice */
+       batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+       /* Can't be -1, current batch is in the current slice by definition. */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+       /*
+        * If we skipped to the next slice of batches, reset the array of files
+        * and use the overflow file as the first batch.
+        */
+       if (batchidx == 0)
+               ExecHashSwitchToNextBatchSlice(hashtable);
+
+       return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+       Size    spaceUsed = hashtable->spaceUsed;
+
+       /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+       spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+       /* Account for memory used for batch files (inner + outer) */
+       spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       /* Account for slice files (inner + outer) */
+       spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+                                sizeof(PGAlignedBlock) * 2;
+
+       if (spaceUsed > hashtable->spacePeak)
+               hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *             scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        + mcvsToUse * sizeof(int);
                hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
                        + mcvsToUse * sizeof(int);
-               if (hashtable->spaceUsed > hashtable->spacePeak)
-                       hashtable->spacePeak = hashtable->spaceUsed;
+
+               /* refresh info about peak used memory */
+               ExecHashUpdateSpacePeak(hashtable);
 
                /*
                 * Create a skew bucket for each MCV hash value.
@@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash 
*node, int mcvsToUse)
                        hashtable->nSkewBuckets++;
                        hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
                        hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-                       if (hashtable->spaceUsed > hashtable->spacePeak)
-                               hashtable->spacePeak = hashtable->spaceUsed;
+
+                       /* refresh info about peak used memory */
+                       ExecHashUpdateSpacePeak(hashtable);
                }
 
                free_attstatsslot(&sslot);
@@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
        /* Account for space used, and back off if we've used too much */
        hashtable->spaceUsed += hashTupleSize;
        hashtable->spaceUsedSkew += hashTupleSize;
-       if (hashtable->spaceUsed > hashtable->spacePeak)
-               hashtable->spacePeak = hashtable->spaceUsed;
+
+       /* refresh info about peak used memory */
+       ExecHashUpdateSpacePeak(hashtable);
+
        while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
                ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
                }
                else
                {
+                       BufFile **batchFile;
+
                        /* Put the tuple into a temp file for later batches */
                        Assert(batchno > hashtable->curbatch);
+
+                       batchFile = ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
         hashtable->innerBatchFile,
+                                                                               
         hashtable->innerOverflowFiles);
+
                        ExecHashJoinSaveTuple(tuple, hashvalue,
-                                                                 
&hashtable->innerBatchFile[batchno]);
+                                                                 batchFile);
                        pfree(hashTuple);
                        hashtable->spaceUsed -= tupleSize;
                        hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation 
*instrument,
        instrument->nbuckets_original = hashtable->nbuckets_original;
        instrument->nbatch = hashtable->nbatch;
        instrument->nbatch_original = hashtable->nbatch_original;
+       instrument->nbatch_inmemory = Min(hashtable->nbatch, 
hashtable->nbatch_inmemory);
        instrument->space_peak = hashtable->spacePeak;
 }
 
diff --git a/src/backend/executor/nodeHashjoin.c 
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..a8db71925b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                if (batchno != hashtable->curbatch &&
                                        node->hj_CurSkewBucketNo == 
INVALID_SKEW_BUCKET_NO)
                                {
+                                       BufFile   **batchFile;
+
                                        /*
                                         * Need to postpone this outer tuple to 
a later batch.
                                         * Save it in the corresponding 
outer-batch file.
                                         */
                                        Assert(parallel_state == NULL);
                                        Assert(batchno > hashtable->curbatch);
+
+                                       batchFile = 
ExecHashGetBatchFile(hashtable, batchno,
+                                                                               
                         hashtable->outerBatchFile,
+                                                                               
                         hashtable->outerOverflowFiles);
+
                                        
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
                                                                                
  hashvalue,
-                                                                               
  &hashtable->outerBatchFile[batchno]);
+                                                                               
  batchFile);
 
                                        /* Loop around, staying in 
HJ_NEED_NEW_OUTER state */
                                        continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
        }
        else if (curbatch < hashtable->nbatch)
        {
-               BufFile    *file = hashtable->outerBatchFile[curbatch];
+               BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+                                                                               
                 hashtable->outerBatchFile,
+                                                                               
                 hashtable->outerOverflowFiles);
 
                /*
                 * In outer-join cases, we could get here even though the batch 
file
                 * is empty.
                 */
-               if (file == NULL)
+               if (*file == NULL)
                        return NULL;
 
                slot = ExecHashJoinGetSavedTuple(hjstate,
-                                                                               
 file,
+                                                                               
 *file,
                                                                                
 hashvalue,
                                                                                
 hjstate->hj_OuterTupleSlot);
                if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
        BufFile    *innerFile;
        TupleTableSlot *slot;
        uint32          hashvalue;
+       int                     batchidx;
+       int                     curbatch_old;
 
        nbatch = hashtable->nbatch;
        curbatch = hashtable->curbatch;
+       curbatch_old = curbatch;
+
+       /* index of the old batch */
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+       /* has to be in the current slice of batches */
+       Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
        if (curbatch > 0)
        {
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * We no longer need the previous outer batch file; close it 
right
                 * away to free disk space.
                 */
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
        }
        else                                            /* we just finished the 
first batch */
        {
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
         * scan, we have to rescan outer batches in case they contain tuples 
that
         * need to be reassigned.
         */
-       curbatch++;
+       curbatch = ExecHashSwitchToNextBatch(hashtable);
+       batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
        while (curbatch < nbatch &&
-                  (hashtable->outerBatchFile[curbatch] == NULL ||
-                       hashtable->innerBatchFile[curbatch] == NULL))
+                  (hashtable->outerBatchFile[batchidx] == NULL ||
+                       hashtable->innerBatchFile[batchidx] == NULL))
        {
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        HJ_FILL_OUTER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        HJ_FILL_INNER(hjstate))
                        break;                          /* must process due to 
rule 1 */
-               if (hashtable->innerBatchFile[curbatch] &&
+               if (hashtable->innerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_original)
                        break;                          /* must process due to 
rule 2 */
-               if (hashtable->outerBatchFile[curbatch] &&
+               if (hashtable->outerBatchFile[batchidx] &&
                        nbatch != hashtable->nbatch_outstart)
                        break;                          /* must process due to 
rule 3 */
                /* We can ignore this batch. */
                /* Release associated temp files right away. */
-               if (hashtable->innerBatchFile[curbatch])
-                       BufFileClose(hashtable->innerBatchFile[curbatch]);
-               hashtable->innerBatchFile[curbatch] = NULL;
-               if (hashtable->outerBatchFile[curbatch])
-                       BufFileClose(hashtable->outerBatchFile[curbatch]);
-               hashtable->outerBatchFile[curbatch] = NULL;
-               curbatch++;
+               if (hashtable->innerBatchFile[batchidx])
+                       BufFileClose(hashtable->innerBatchFile[batchidx]);
+               hashtable->innerBatchFile[batchidx] = NULL;
+               if (hashtable->outerBatchFile[batchidx])
+                       BufFileClose(hashtable->outerBatchFile[batchidx]);
+               hashtable->outerBatchFile[batchidx] = NULL;
+
+               curbatch = ExecHashSwitchToNextBatch(hashtable);
+               batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
        }
 
        if (curbatch >= nbatch)
+       {
+               hashtable->curbatch = curbatch_old;
                return false;                   /* no more batches */
-
-       hashtable->curbatch = curbatch;
+       }
 
        /*
         * Reload the hash table with the new inner batch (which could be empty)
         */
        ExecHashTableReset(hashtable);
 
-       innerFile = hashtable->innerBatchFile[curbatch];
+       innerFile = hashtable->innerBatchFile[batchidx];
 
        if (innerFile != NULL)
        {
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
                 * needed
                 */
                BufFileClose(innerFile);
-               hashtable->innerBatchFile[curbatch] = NULL;
+               hashtable->innerBatchFile[batchidx] = NULL;
        }
 
        /*
         * Rewind outer batch file (if present), so that we can start reading 
it.
         */
-       if (hashtable->outerBatchFile[curbatch] != NULL)
+       if (hashtable->outerBatchFile[batchidx] != NULL)
        {
-               if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, 
SEEK_SET))
+               if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, 
SEEK_SET))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not rewind hash-join 
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c 
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
        int                     num_hashclauses = list_length(hashclauses);
        int                     numbuckets;
        int                     numbatches;
+       int                     numbatches_inmemory;
        int                     num_skew_mcvs;
        size_t          space_allowed;  /* unused */
 
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, 
JoinCostWorkspace *workspace,
                                                        &space_allowed,
                                                        &numbuckets,
                                                        &numbatches,
+                                                       &numbatches_inmemory,
                                                        &num_skew_mcvs);
 
        /*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..311a0980ee 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
        int                *skewBucketNums; /* array indexes of active skew 
buckets */
 
        int                     nbatch;                 /* number of batches */
+       int                     nbatch_inmemory;        /* max number of 
in-memory batches */
        int                     curbatch;               /* current batch #; 0 
during 1st pass */
 
        int                     nbatch_original;        /* nbatch when we 
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
        BufFile   **innerBatchFile; /* buffered virtual temp file per batch */
        BufFile   **outerBatchFile; /* buffered virtual temp file per batch */
 
+       BufFile   **innerOverflowFiles; /* temp file for inner overflow batches 
*/
+       BufFile   **outerOverflowFiles; /* temp file for outer overflow batches 
*/
+
        /*
         * Info about the datatype-specific hash functions for the datatypes 
being
         * hashed. These are arrays of the same length as the number of hash 
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..bb6b24a1b4 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable 
hashtable,
                                                  uint32 hashvalue,
                                                  int *bucketno,
                                                  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+                                        BufFile **batchFiles, BufFile 
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext 
*econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int 
tupwidth, bool useskew,
                                                size_t *space_allowed,
                                                int *numbuckets,
                                                int *numbatches,
+                                               int *numbatches_inmemory,
                                                int *num_skew_mcvs);
 extern int     ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 
hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
        int                     nbuckets_original;      /* planned number of 
buckets */
        int                     nbatch;                 /* number of batches at 
end of execution */
        int                     nbatch_original;        /* planned number of 
batches */
+       int                     nbatch_inmemory;        /* number of batches 
kept in memory */
        size_t          space_peak;             /* speak memory usage in bytes 
*/
 } HashInstrumentation;
 

Reply via email to