Hi,

I've been once again reminded of the batch explosion issue in hashjoin,
due to how it enforces the memory limit. This resurfaces every now and
then, when a used gets strange OOM issues - see for example these
threads from ~2019 for an example, and even some patches: [1] [2] [3]

Let me restart the discussion, resubmit some of the older patches, and
present a plan for what to do about this ...

Just to remind the basic details, a brief summary - the hashjoin does
not account for the spill files when enforcing the memory limit. The
hash table gets full, it decides to double the number of batches which
cuts the hash table size in half. But with enough batches the doubling
can actually make the situation much worse - the new batches simply use
more memory than was saved.

This can happen for various reasons. A simple example is that we under
estimate the size of the input relation, so the hash needs to be built
on many more tuples. This is bad, but usually not disastrous.

It's much worse when there's a batch that is not "divisible", i.e.
adding more batches does not split it roughly in half. This can happen
due to hash collisions (in the part that determines the batch),
duplicate values that didn't make it into MCV (and thus the skew
optimization does not kick in).

This is fairly rare, but when it happens it can easily lead to batch
explosion, i.e. rapidly increasing the number of batches. We add
batches, but the batch does not split, so we promptly hit the limit
again, triggering another increase. It often stops only when we exhaust
the 32-bit hash space, ending with 100s of thousands of batches.

Attached is a SQL script that reproduces something like this. It builds
a table with values with hashes that have 0s in the upper bits. And then
the hash join just spirals into a batch explosion.

Note: The script is a bit dumb and needs a lot of temp space (~50GB)
when generating the values with duplicate hashes.

In 2019 I shared a bunch of patches [4] improving this, but then I got
distracted and the discussion stalled because there were proposals to
fix this by introducing a special hash join "mode" to address these
issues [5], but we never got past a prototype and there's a lot of
outstanding questions.

So I decided to revisit the three patches from 2019. Attached are
rebased and cleaned up versions. A couple comments on each one:


1) v20241231-adjust-limit-0001-Account-for-batch-files-in-ha.patch

I believe this is the way to go, for now. The basic idea is to keep the
overall behavior, but "relax" the memory limit as the number of batches
increases to minimize the total memory use.

This may seem a bit weird, but as the number of batches grows there's no
way to not violate the limit. And the current code simply ignores this
and allocates arbitrary amounts of memory.


2) v20241231-single-spill-0001-Hash-join-with-a-single-spill.patch

The basic idea is that we keep only a small "slice" of batches in
memory, and data for later batches are spilled into a single file. This
means that even if the number of batches increases, the memory use does
not change. Which means the memory limit is enforced very strictly.

The problem is this performs *terribly* because it shuffles data many
times, always just to the next slice. So if we happen to have 128
batches in memory and the number explodes to ~128k batches, we end up
reading/writing each tuple ~500x.


3) v20241231-multi-spill-0001-Hash-join-with-a-multiple-spil.patch

This is an improvement of the "single spill", except that we keep one
spill file per slice, which greatly reduces the amount of temporary
traffic. The trouble is this means we can no longer enforce the memory
limit that strictly, because the number of files does grow with the
number of batches, although not 1:1. But with a slice of 128 batches we
get only 1 file per 128 batches, which is a nice reduction.


This means that ultimately it's either (1) or (3), and the more I've
been looking into this the more I prefer (1), for a couple reasons:

* It's much simpler (it doesn't really change anything on the basic
behavior, doesn't introduce any new files or anything like that.

* There doesn't seem to be major difference in total memory consumption
between the two approaches. Both allow allocating more memory.

* It actually helps with the "indivisible batch" case - it relaxes the
limit, so there's a chance the batch eventually fits and we stop adding
more and more batches. With spill files that's not the case - we still
keep the original limit, and we end up with the batch explosion (but
then we handle it much more efficiently).


Unless there are some objections, my plan is to get (1) cleaned up and
try to get it in for 18, possibly in the January CF. It's not a
particularly complex patch, and it already passes check-world (it only
affected three plans in join_hash, and those make sense I think).

One thing I'm not sure about yet is whether this needs to tweak the
hashjoin costing to also consider the files when deciding how many
batches to use. Maybe it should?


regards



[1]
https://www.postgresql.org/message-id/20190504003414.bulcbnge3rhwhcsh%40development

[2] https://www.postgresql.org/message-id/20230228190643.1e368315%40karst

[3]
https://www.postgresql.org/message-id/bc138e9f-c89e-9147-5395-61d51a757b3b%40gusw.net

[4]
https://www.postgresql.org/message-id/20190428141901.5dsbge2ka3rxmpk6%40development

[5]
https://www.postgresql.org/message-id/caakru_yswm7gc_b2nbgwfpe6wuhdolfc1lbz786duzacpud...@mail.gmail.com

-- 
Tomas Vondra
From edcaac0705193acea33c4423bf0d59128219a46f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Tue, 31 Dec 2024 17:09:40 +0100
Subject: [PATCH v20241231-adjust-limit] Account for batch files in hash join
 spilling

Hash joins try to limit the amount of memory used by the Hash node by
only keeping a single batch in memory and spilling future batches to
disk. Unfortunately, the implementation does not account for the files
used for spilling, which can lead to issues with many batches. Each file
keeps a BLCKSZ buffer in memory, and we need 2*nbatches of those files,
so with many batches this may use substatial amounts of memory.

The hash join code however assumes adding batches is virtually free (in
terms of memory needed), ignoring this issue. It increases the number of
batches, possibly keeping the current batch within the limit, but ends
up using much more memory for the files.

This can be particularly painful with adversary data sets, with a batch
that can't be split. This may happen due to hash collisions (overlaps in
the part used to calculate "batch"), or a value with many duplicities
that however didn't make it to the MCV list (and thus the skew table
can't help). In these cases the hash can get into a cycle of increasing
the number of batches, often reaching 256k or 512k batches before
exhausting available hash space (32-bits).

If this happens, there's not much point in enforcing the original memory
limit. That's simply not feasible - especially in the case of a single
batch exceeding the allowed space.

Instead, the best we can do is relaxing the limit, and focusing on using
as little total memory as possible. By allowing the batches to be
larger, we reduce the number of batch files. The adjustment formula is
based on the observation that doubling the number of batches doubles the
amount of memory needed for the files, while cutting the batch size in
half. This defines the "break even" point for the next batch increase.
---
 src/backend/executor/nodeHash.c         | 153 +++++++++++++++++++++---
 src/test/regress/expected/join_hash.out |   4 +-
 src/test/regress/sql/join_hash.sql      |   4 +-
 3 files changed, 142 insertions(+), 19 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..680d2897738 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -54,6 +54,9 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
 									uint32 hashvalue,
 									int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
+static bool ExecHashExceededMemoryLimit(HashJoinTable hashtable);
+static void ExecHashAdjustMemoryLimit(HashJoinTable hashtable);
 
 static void *dense_alloc(HashJoinTable hashtable, Size size);
 static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
@@ -199,10 +202,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* update info about peak memory usage */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -1036,6 +1037,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		   hashtable, nfreed, ninmemory, hashtable->spaceUsed);
 #endif
 
+	/* adjust the memory limit for the new nbatches etc. */
+	ExecHashAdjustMemoryLimit(hashtable);
+
 	/*
 	 * If we dumped out either all or none of the tuples in the table, disable
 	 * further expansion of nbatch.  This situation implies that we have
@@ -1673,11 +1677,12 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
-		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
-			> hashtable->spaceAllowed)
+
+		/* update info about peak memory usage */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/* Should we add more batches, to enforce memory limit? */
+		if (ExecHashExceededMemoryLimit(hashtable))
 			ExecHashIncreaseNumBatches(hashtable);
 	}
 	else
@@ -1843,6 +1848,120 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+/*
+ * ExecHashUpdateSpacePeak
+ *		Update information about peak memory usage.
+ *
+ * This considers tuples added to the hash table, buckets of the hash table
+ * itself, and also the bufferer batch files on both the inner and outer side.
+ * Each file has a BLCKSZ buffer, so with enough batches this may actually
+ * represent most of the memory used by the hash join node.
+ */
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* buckets of the hash table */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* buffered batch files (inner + outer), each has a BLCKSZ buffer */
+	spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+	/* if we exceeded the current peak, remember the new one */
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
+/*
+ * ExecHashMemoryLimitExceeded
+ *		Check if the amount of memory used exceeds spaceAllowed.
+ *
+ * Check if the total amount of space used by the hash join exceeds the
+ * current value of spaceAllowed, and we should try to increase the number
+ * of batches.
+ *
+ * We need to consider both the data added to the hash and the hashtable
+ * itself (i.e. buckets), but also the files used for future batches.
+ * Each batch needs a file for inner/outer side, so we need (2*nbatch)
+ * files in total, and each BufFile has a BLCKSZ buffer. If we ignored
+ * the files and simply doubled the number of batches, we could easily
+ * increase the total amount of memory because while we expect to cut the
+ * batch size in half to, doubling the number of batches also doubles the
+ * amount of memory allocated by BufFile.
+ *
+ * That means doubling the number of batches is pointless when
+ *
+ *		(spaceUsed / 2) < 2 * (nbatches * sizeof(BufFile))
+ *
+ * because it would result in allocating more memory than it saves.
+ *
+ * This is a temporary decision - we can't stop adding batches entirely,
+ * just until the hash table grows enough to make it a win again.
+ */
+static bool
+ExecHashExceededMemoryLimit(HashJoinTable hashtable)
+{
+	return (hashtable->spaceUsed +
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			hashtable->nbatch * sizeof(PGAlignedBlock) * 2
+			> hashtable->spaceAllowed);
+}
+
+/*
+ * ExecHashAdjustMemoryLimit
+ *		Adjust the memory limit after increasing the number of batches.
+ *
+ * We can't keep the same spaceAllowed value, because as we keep adding
+ * batches we're guaranteed to exceed the older values simply thanks to
+ * the BufFile allocations.
+ *
+ * Instead, we consider the "break even" threshold for the current number
+ * of batches, add a bit of slack (so that we don't get into a cycle of
+ * incrementing number of batches), and calculate the new limit from that.
+ *
+ * For well estimated cases this should do nothing, as the batches are
+ * expected to account only for a small fraction of work_mem. But if we
+ * significantly underestimate the number of batches, or if one batch
+ * happens to be very large, this will relax the limit a bit.
+ *
+ * This means we won't enforce the work_mem limit strictly - but without
+ * adjusting the limit that wouldn't be the case either, we'd just use
+ * a lot of memory for the BufFiles without accounting for that. This
+ * way we do our best to minimize the amount of memory used.
+ */
+static void
+ExecHashAdjustMemoryLimit(HashJoinTable hashtable)
+{
+	Size	newSpaceAllowed;
+
+	/*
+	 * The next time increasing the number of batches "breaks even" is when
+	 *
+	 * (spaceUsed / 2) == (2 * nbatches * BLCKSZ)
+	 *
+	 * which means
+	 *
+	 * spaceUsed == (4 * nbatches * BLCKSZ)
+	 *
+	 * However, this is a "break even" threshold, when we shrink the hash
+	 * table just enough to compensate the new batches, and we'd hit the
+	 * new threshold almost immediately again. In practice we want to free
+	 * more memory to allow new data before having to increase the number
+	 * of batches again. So we allow 25% more space.
+	 */
+	newSpaceAllowed
+		= 1.25 * (4 * hashtable->nbatch * sizeof(PGAlignedBlock));
+
+	/* but also account for the buckets, and the current batch files */
+	newSpaceAllowed += hashtable->nbuckets_optimal * sizeof(HashJoinTuple);
+	newSpaceAllowed += (2 * hashtable->nbatch * sizeof(PGAlignedBlock));
+
+	/* shouldn't go down, but use Max() to make sure */
+	hashtable->spaceAllowed = Max(hashtable->spaceAllowed,
+								  newSpaceAllowed);
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2468,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak memory usage */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2399,8 +2519,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak memory usage */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2489,8 +2610,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak memory usage */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out
index 4fc34a0e72a..8d54822eb8c 100644
--- a/src/test/regress/expected/join_hash.out
+++ b/src/test/regress/expected/join_hash.out
@@ -198,7 +198,7 @@ rollback to settings;
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
 set local hash_mem_multiplier = 1.0;
 explain (costs off)
   select count(*) from simple r join simple s using (id);
@@ -232,7 +232,7 @@ rollback to settings;
 -- parallel with parallel-oblivious hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
 set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = off;
 explain (costs off)
diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql
index 6b0688ab0a6..ca8758900aa 100644
--- a/src/test/regress/sql/join_hash.sql
+++ b/src/test/regress/sql/join_hash.sql
@@ -145,7 +145,7 @@ rollback to settings;
 -- non-parallel
 savepoint settings;
 set local max_parallel_workers_per_gather = 0;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
 set local hash_mem_multiplier = 1.0;
 explain (costs off)
   select count(*) from simple r join simple s using (id);
@@ -160,7 +160,7 @@ rollback to settings;
 -- parallel with parallel-oblivious hash join
 savepoint settings;
 set local max_parallel_workers_per_gather = 2;
-set local work_mem = '128kB';
+set local work_mem = '512kB';
 set local hash_mem_multiplier = 1.0;
 set local enable_parallel_hash = off;
 explain (costs off)
-- 
2.47.1

From 030fca4e56132dd516b8aba23ed36c47311b33f5 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Tue, 31 Dec 2024 17:08:17 +0100
Subject: [PATCH v20241231-multi-spill] Hash join with a multiple spill files

We can't use arbitrary number of batches in the hash join, because that
can use substantial memory use, ignored by the memory limit. Instead,
decide how many batches we can keep in memory, and open files only for
this "slice" of batches. For future batches we keep on spill file per
slice.

Then use these spill files after "switching" to the first batch in each
of those non-in-memory slices.

NB: Compared to the "single-spill" patch, this is less strict, as we
need one spill file per slice, and the number of slices may grow during
execution. It's better than the current behavior, because even with
modest work_mem values we can fit 32-128 batches into memory, thus
thus reducing the number of files to 1/32x - 1/128x. But with many
batches (as with batch explosion) this can still use substantial
amounts of memory (certainly more than work_mem).

Ultimately, it behaves similarly to the "adjustment" patch, but with
more complexity. And it can't handle the "oversized batch" really well,
because the limit is not adjusted.
---
 src/backend/commands/explain.c        |   6 +-
 src/backend/executor/nodeHash.c       | 256 +++++++++++++++++++++++---
 src/backend/executor/nodeHashjoin.c   |  75 +++++---
 src/backend/optimizer/path/costsize.c |   2 +
 src/include/executor/hashjoin.h       |   4 +
 src/include/executor/nodeHash.h       |   8 +
 src/include/nodes/execnodes.h         |   1 +
 7 files changed, 300 insertions(+), 52 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a201ed30824..aa3fbeda3dd 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 								   hinstrument.nbatch, es);
 			ExplainPropertyInteger("Original Hash Batches", NULL,
 								   hinstrument.nbatch_original, es);
+			ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+								   hinstrument.nbatch_original, es);
 			ExplainPropertyUInteger("Peak Memory Usage", "kB",
 									spacePeakKb, es);
 		}
 		else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+				 hinstrument.nbatch_inmemory != hinstrument.nbatch ||
 				 hinstrument.nbuckets_original != hinstrument.nbuckets)
 		{
 			ExplainIndentText(es);
 			appendStringInfo(es->str,
-							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: " UINT64_FORMAT "kB\n",
+							 "Buckets: %d (originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: " UINT64_FORMAT "kB\n",
 							 hinstrument.nbuckets,
 							 hinstrument.nbuckets_original,
 							 hinstrument.nbatch,
 							 hinstrument.nbatch_original,
+							 hinstrument.nbatch_inmemory,
 							 spacePeakKb);
 		}
 		else
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..e9d482577ac 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state)
 	size_t		space_allowed;
 	int			nbuckets;
 	int			nbatch;
+	int			nbatch_inmemory;
 	double		rows;
 	int			num_skew_mcvs;
 	int			log2_nbuckets;
@@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state)
 							state->parallel_state != NULL ?
 							state->parallel_state->nparticipants - 1 : 0,
 							&space_allowed,
-							&nbuckets, &nbatch, &num_skew_mcvs);
+							&nbuckets, &nbatch, &nbatch_inmemory,
+							&num_skew_mcvs);
 
 	/* nbuckets must be a power of 2 */
 	log2_nbuckets = my_log2(nbuckets);
@@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state)
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
 	hashtable->nbatch = nbatch;
+	hashtable->nbatch_inmemory = nbatch_inmemory;
 	hashtable->curbatch = 0;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
@@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state)
 	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
+	hashtable->innerOverflowFiles = NULL;
+	hashtable->outerOverflowFiles = NULL;
 	hashtable->spaceUsed = 0;
 	hashtable->spacePeak = 0;
 	hashtable->spaceAllowed = space_allowed;
@@ -552,6 +556,7 @@ ExecHashTableCreate(HashState *state)
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
 		MemoryContext oldctx;
+		int	cnt = Min(nbatch, nbatch_inmemory);
 
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
@@ -559,8 +564,19 @@ ExecHashTableCreate(HashState *state)
 		 */
 		oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
-		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+		hashtable->innerBatchFile = palloc0_array(BufFile *, cnt);
+		hashtable->outerBatchFile = palloc0_array(BufFile *, cnt);
+
+		/* also allocate files for overflow batches */
+		if (nbatch > nbatch_inmemory)
+		{
+			int nslices = (nbatch / nbatch_inmemory);
+
+			Assert(nslices % 2 == 0);
+
+			hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+			hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+		}
 
 		MemoryContextSwitchTo(oldctx);
 
@@ -661,6 +677,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs)
 {
 	int			tupsize;
@@ -669,6 +686,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	size_t		bucket_bytes;
 	size_t		max_pointers;
 	int			nbatch = 1;
+	int			nbatch_inmemory = 1;
 	int			nbuckets;
 	double		dbuckets;
 
@@ -811,6 +829,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									space_allowed,
 									numbuckets,
 									numbatches,
+									numbatches_inmemory,
 									num_skew_mcvs);
 			return;
 		}
@@ -848,11 +867,24 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch = pg_nextpower2_32(Max(2, minbatch));
 	}
 
+	/*
+	 * See how many batches we can fit into memory (driven mostly by size
+	 * of BufFile, with PGAlignedBlock being the largest part of that).
+	 * We need one BufFile for inner and outer side, so we count it twice
+	 * for each batch, and we stop once we exceed (work_mem/2).
+	 */
+	while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+			<= (work_mem * 1024L / 2))
+		nbatch_inmemory *= 2;
+
+	// nbatch_inmemory = nbatch;
+
 	Assert(nbuckets > 0);
 	Assert(nbatch > 0);
 
 	*numbuckets = nbuckets;
 	*numbatches = nbatch;
+	*numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -874,13 +906,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+		for (i = 1; i < n; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
 			if (hashtable->outerBatchFile[i])
 				BufFileClose(hashtable->outerBatchFile[i]);
 		}
+
+		/* number of batch slices */
+		n = (hashtable->nbatch / hashtable->nbatch_inmemory) + 1;
+
+		for (i = 1; i < n; i++)
+		{
+			if (hashtable->innerOverflowFiles[i])
+				BufFileClose(hashtable->innerOverflowFiles[i]);
+
+			if (hashtable->outerOverflowFiles[i])
+				BufFileClose(hashtable->outerOverflowFiles[i]);
+		}
 	}
 
 	/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -923,11 +969,14 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	if (hashtable->innerBatchFile == NULL)
 	{
+		/* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
 		MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
 
 		/* we had no file arrays before */
-		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
-		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
+		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
 
 		MemoryContextSwitchTo(oldcxt);
 
@@ -936,9 +985,35 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 	else
 	{
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+		int	oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory);
+
 		/* enlarge arrays and zero out added entries */
-		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
-		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+
+		if (nbatch > hashtable->nbatch_inmemory)
+		{
+			int nslices = (nbatch / hashtable->nbatch_inmemory);
+			int oldnslices = (oldnbatch / hashtable->nbatch_inmemory);
+
+			Assert(nslices > 1);
+			Assert(nslices % 2 == 0);
+			Assert((oldnslices == 1) || (oldnslices % 2 == 0));
+			Assert(oldnslices <= nslices);
+
+			if (hashtable->innerOverflowFiles == NULL)
+			{
+				hashtable->innerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+				hashtable->outerOverflowFiles = palloc0_array(BufFile *, nslices + 1);
+			}
+			else
+			{
+				hashtable->innerOverflowFiles = repalloc0_array(hashtable->innerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1);
+				hashtable->outerOverflowFiles = repalloc0_array(hashtable->outerOverflowFiles, BufFile *, oldnslices + 1, nslices + 1);
+			}
+		}
+
 	}
 
 	hashtable->nbatch = nbatch;
@@ -1008,11 +1083,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			else
 			{
+				BufFile **batchFile;
+
 				/* dump it out */
 				Assert(batchno > curbatch);
+
+				batchFile = ExecHashGetBatchFile(hashtable, batchno,
+												 hashtable->innerBatchFile,
+												 hashtable->innerOverflowFiles);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno],
+									  batchFile,
 									  hashtable);
 
 				hashtable->spaceUsed -= hashTupleSize;
@@ -1673,22 +1755,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/* Consider increasing number of batches if we filled work_mem. */
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2	/* inner + outer */
 			> hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
 	}
 	else
 	{
+		BufFile **batchFile;
+
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
+		batchFile = ExecHashGetBatchFile(hashtable, batchno,
+										 hashtable->innerBatchFile,
+										 hashtable->innerOverflowFiles);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno],
+							  batchFile,
 							  hashtable);
 	}
 
@@ -1843,6 +1936,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+	int	slice,
+		curslice;
+
+	if (hashtable->nbatch < hashtable->nbatch_inmemory)
+		return batchno;
+
+	slice = batchno / hashtable->nbatch_inmemory;
+	curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+	/* slices can't go backwards */
+	Assert(slice >= curslice);
+
+	/* overflow slice */
+	if (slice > curslice)
+		return -1;
+
+	/* current slice, compute index in the current array */
+	return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **batchFiles, BufFile **overflowFiles)
+{
+	int		idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+	/* get the right overflow file */
+	if (idx == -1)
+	{
+		int slice = (batchno / hashtable->nbatch_inmemory);
+
+		return &overflowFiles[slice];
+	}
+
+	/* batch file in the current slice */
+	return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+	int	slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+	memset(hashtable->innerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+	hashtable->innerOverflowFiles[slice] = NULL;
+
+	memset(hashtable->outerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+	hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+	int		batchidx;
+
+	hashtable->curbatch++;
+
+	/* see if we skipped to the next batch slice */
+	batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+	/* Can't be -1, current batch is in the current slice by definition. */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+	/*
+	 * If we skipped to the next slice of batches, reset the array of files
+	 * and use the overflow file as the first batch.
+	 */
+	if (batchidx == 0)
+		ExecHashSwitchToNextBatchSlice(hashtable);
+
+	return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* Account for memory used for batch files (inner + outer) */
+	spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	/* Account for slice files (inner + outer) */
+	spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2544,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2399,8 +2595,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak used memory */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2489,8 +2686,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2569,10 +2768,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		}
 		else
 		{
+			BufFile **batchFile;
+
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
+			batchFile = ExecHashGetBatchFile(hashtable, batchno,
+											 hashtable->innerBatchFile,
+											 hashtable->innerOverflowFiles);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno],
+								  batchFile,
 								  hashtable);
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
@@ -2750,6 +2956,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument,
 							 hashtable->nbatch);
 	instrument->nbatch_original = Max(instrument->nbatch_original,
 									  hashtable->nbatch_original);
+	instrument->nbatch_inmemory = Min(hashtable->nbatch,
+									  hashtable->nbatch_inmemory);
 	instrument->space_peak = Max(instrument->space_peak,
 								 hashtable->spacePeak);
 }
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ea0045bc0f3..580b5da93bd 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -481,10 +481,15 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (batchno != hashtable->curbatch &&
 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
 				{
+					BufFile	  **batchFile;
 					bool		shouldFree;
 					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
 																	  &shouldFree);
 
+					batchFile = ExecHashGetBatchFile(hashtable, batchno,
+													 hashtable->outerBatchFile,
+													 hashtable->outerOverflowFiles);
+
 					/*
 					 * Need to postpone this outer tuple to a later batch.
 					 * Save it in the corresponding outer-batch file.
@@ -492,7 +497,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
 					ExecHashJoinSaveTuple(mintuple, hashvalue,
-										  &hashtable->outerBatchFile[batchno],
+										  batchFile,
 										  hashtable);
 
 					if (shouldFree)
@@ -1030,17 +1035,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	}
 	else if (curbatch < hashtable->nbatch)
 	{
-		BufFile    *file = hashtable->outerBatchFile[curbatch];
+		BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+												 hashtable->outerBatchFile,
+												 hashtable->outerOverflowFiles);
 
 		/*
 		 * In outer-join cases, we could get here even though the batch file
 		 * is empty.
 		 */
-		if (file == NULL)
+		if (*file == NULL)
 			return NULL;
 
 		slot = ExecHashJoinGetSavedTuple(hjstate,
-										 file,
+										 *file,
 										 hashvalue,
 										 hjstate->hj_OuterTupleSlot);
 		if (!TupIsNull(slot))
@@ -1135,9 +1142,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	BufFile    *innerFile;
 	TupleTableSlot *slot;
 	uint32		hashvalue;
+	int			batchidx;
+	int			curbatch_old;
 
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
+	curbatch_old = curbatch;
+
+	/* index of the old batch */
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+	/* has to be in the current slice of batches */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
 	if (curbatch > 0)
 	{
@@ -1145,9 +1161,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * We no longer need the previous outer batch file; close it right
 		 * away to free disk space.
 		 */
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
 	}
 	else						/* we just finished the first batch */
 	{
@@ -1182,45 +1198,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	 * scan, we have to rescan outer batches in case they contain tuples that
 	 * need to be reassigned.
 	 */
-	curbatch++;
+	curbatch = ExecHashSwitchToNextBatch(hashtable);
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
 	while (curbatch < nbatch &&
-		   (hashtable->outerBatchFile[curbatch] == NULL ||
-			hashtable->innerBatchFile[curbatch] == NULL))
+		   (hashtable->outerBatchFile[batchidx] == NULL ||
+			hashtable->innerBatchFile[batchidx] == NULL))
 	{
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			HJ_FILL_OUTER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			HJ_FILL_INNER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_original)
 			break;				/* must process due to rule 2 */
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_outstart)
 			break;				/* must process due to rule 3 */
 		/* We can ignore this batch. */
 		/* Release associated temp files right away. */
-		if (hashtable->innerBatchFile[curbatch])
-			BufFileClose(hashtable->innerBatchFile[curbatch]);
-		hashtable->innerBatchFile[curbatch] = NULL;
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
-		curbatch++;
+		if (hashtable->innerBatchFile[batchidx])
+			BufFileClose(hashtable->innerBatchFile[batchidx]);
+		hashtable->innerBatchFile[batchidx] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
+
+		curbatch = ExecHashSwitchToNextBatch(hashtable);
+		batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
 	}
 
 	if (curbatch >= nbatch)
+	{
+		hashtable->curbatch = curbatch_old;
 		return false;			/* no more batches */
-
-	hashtable->curbatch = curbatch;
+	}
 
 	/*
 	 * Reload the hash table with the new inner batch (which could be empty)
 	 */
 	ExecHashTableReset(hashtable);
 
-	innerFile = hashtable->innerBatchFile[curbatch];
+	innerFile = hashtable->innerBatchFile[batchidx];
 
 	if (innerFile != NULL)
 	{
@@ -1246,15 +1267,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * needed
 		 */
 		BufFileClose(innerFile);
-		hashtable->innerBatchFile[curbatch] = NULL;
+		hashtable->innerBatchFile[batchidx] = NULL;
 	}
 
 	/*
 	 * Rewind outer batch file (if present), so that we can start reading it.
 	 */
-	if (hashtable->outerBatchFile[curbatch] != NULL)
+	if (hashtable->outerBatchFile[batchidx] != NULL)
 	{
-		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
+		if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not rewind hash-join temporary file")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c36687aa4df..6f40600d10b 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	int			num_hashclauses = list_length(hashclauses);
 	int			numbuckets;
 	int			numbatches;
+	int			numbatches_inmemory;
 	int			num_skew_mcvs;
 	size_t		space_allowed;	/* unused */
 
@@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 							&space_allowed,
 							&numbuckets,
 							&numbatches,
+							&numbatches_inmemory,
 							&num_skew_mcvs);
 
 	/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2d8ed8688cd..76c983dbd4d 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -320,6 +320,7 @@ typedef struct HashJoinTableData
 	int		   *skewBucketNums; /* array indexes of active skew buckets */
 
 	int			nbatch;			/* number of batches */
+	int			nbatch_inmemory;	/* max number of in-memory batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
 	int			nbatch_original;	/* nbatch when we started inner scan */
@@ -331,6 +332,9 @@ typedef struct HashJoinTableData
 	double		partialTuples;	/* # tuples obtained from inner plan by me */
 	double		skewTuples;		/* # tuples inserted into skew tuples */
 
+	BufFile	  **innerOverflowFiles;	/* temp file for inner overflow batches */
+	BufFile	  **outerOverflowFiles;	/* temp file for outer overflow batches */
+
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
 	 * nbatch > 1.  A file is opened only when we first write a tuple into it
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index e4eb7bc6359..ef1d2bec15a 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -46,6 +47,12 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 									  uint32 hashvalue,
 									  int *bucketno,
 									  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+									  BufFile **batchFiles,
+									  BufFile **overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -62,6 +69,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									size_t *space_allowed,
 									int *numbuckets,
 									int *numbatches,
+									int *numbatches_inmemory,
 									int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1590b643920..eb8eaea89ab 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation
 	int			nbuckets_original;	/* planned number of buckets */
 	int			nbatch;			/* number of batches at end of execution */
 	int			nbatch_original;	/* planned number of batches */
+	int			nbatch_inmemory;	/* number of batches kept in memory */
 	Size		space_peak;		/* peak memory usage in bytes */
 } HashInstrumentation;
 
-- 
2.47.1

From 55142837eaf439652fd91e165f3d4c3607ea9987 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@vondra.me>
Date: Tue, 31 Dec 2024 17:34:59 +0100
Subject: [PATCH v20241231-single-spill] Hash join with a single spill file

We can't use arbitrary number of batches in the hash join, because that
can use substantial memory use, ignored by the memory limit. Instead,
decide how many batches we can keep in memory, and open files only for
those batches, and one "overflow" file for all future batches.

Then use this spill file after "switching" to the first batch that is
not in memory.

NB: This allows enforcing the memory limit very strictly, but it's very
inefficient and causes a lot of temporary file traffic.
---
 src/backend/commands/explain.c        |   6 +-
 src/backend/executor/nodeHash.c       | 206 +++++++++++++++++++++++---
 src/backend/executor/nodeHashjoin.c   |  76 ++++++----
 src/backend/optimizer/path/costsize.c |   2 +
 src/include/executor/hashjoin.h       |   4 +
 src/include/executor/nodeHash.h       |   7 +
 src/include/nodes/execnodes.h         |   1 +
 7 files changed, 250 insertions(+), 52 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index a201ed30824..aa3fbeda3dd 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3466,19 +3466,23 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 								   hinstrument.nbatch, es);
 			ExplainPropertyInteger("Original Hash Batches", NULL,
 								   hinstrument.nbatch_original, es);
+			ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+								   hinstrument.nbatch_original, es);
 			ExplainPropertyUInteger("Peak Memory Usage", "kB",
 									spacePeakKb, es);
 		}
 		else if (hinstrument.nbatch_original != hinstrument.nbatch ||
+				 hinstrument.nbatch_inmemory != hinstrument.nbatch ||
 				 hinstrument.nbuckets_original != hinstrument.nbuckets)
 		{
 			ExplainIndentText(es);
 			appendStringInfo(es->str,
-							 "Buckets: %d (originally %d)  Batches: %d (originally %d)  Memory Usage: " UINT64_FORMAT "kB\n",
+							 "Buckets: %d (originally %d)  Batches: %d (originally %d, in-memory %d)  Memory Usage: " UINT64_FORMAT "kB\n",
 							 hinstrument.nbuckets,
 							 hinstrument.nbuckets_original,
 							 hinstrument.nbatch,
 							 hinstrument.nbatch_original,
+							 hinstrument.nbatch_inmemory,
 							 spacePeakKb);
 		}
 		else
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 3e22d50e3a4..58062a8b256 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -199,10 +200,8 @@ MultiExecPrivateHash(HashState *node)
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
 
-	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
-	hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
 
 	hashtable->partialTuples = hashtable->totalTuples;
 }
@@ -451,6 +450,7 @@ ExecHashTableCreate(HashState *state)
 	size_t		space_allowed;
 	int			nbuckets;
 	int			nbatch;
+	int			nbatch_inmemory;
 	double		rows;
 	int			num_skew_mcvs;
 	int			log2_nbuckets;
@@ -477,7 +477,8 @@ ExecHashTableCreate(HashState *state)
 							state->parallel_state != NULL ?
 							state->parallel_state->nparticipants - 1 : 0,
 							&space_allowed,
-							&nbuckets, &nbatch, &num_skew_mcvs);
+							&nbuckets, &nbatch, &nbatch_inmemory,
+							&num_skew_mcvs);
 
 	/* nbuckets must be a power of 2 */
 	log2_nbuckets = my_log2(nbuckets);
@@ -503,6 +504,7 @@ ExecHashTableCreate(HashState *state)
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
 	hashtable->nbatch = nbatch;
+	hashtable->nbatch_inmemory = nbatch_inmemory;
 	hashtable->curbatch = 0;
 	hashtable->nbatch_original = nbatch;
 	hashtable->nbatch_outstart = nbatch;
@@ -512,6 +514,8 @@ ExecHashTableCreate(HashState *state)
 	hashtable->skewTuples = 0;
 	hashtable->innerBatchFile = NULL;
 	hashtable->outerBatchFile = NULL;
+	hashtable->innerOverflowFile = NULL;
+	hashtable->outerOverflowFile = NULL;
 	hashtable->spaceUsed = 0;
 	hashtable->spacePeak = 0;
 	hashtable->spaceAllowed = space_allowed;
@@ -553,14 +557,16 @@ ExecHashTableCreate(HashState *state)
 	{
 		MemoryContext oldctx;
 
+		int	cnt = Min(nbatch, nbatch_inmemory);
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
 		oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
-		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+		hashtable->innerBatchFile = palloc0_array(BufFile *, cnt);
+		hashtable->outerBatchFile = palloc0_array(BufFile *, cnt);
 
 		MemoryContextSwitchTo(oldctx);
 
@@ -661,6 +667,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						size_t *space_allowed,
 						int *numbuckets,
 						int *numbatches,
+						int *numbatches_inmemory,
 						int *num_skew_mcvs)
 {
 	int			tupsize;
@@ -669,6 +676,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	size_t		bucket_bytes;
 	size_t		max_pointers;
 	int			nbatch = 1;
+	int			nbatch_inmemory = 1;
 	int			nbuckets;
 	double		dbuckets;
 
@@ -811,6 +819,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									space_allowed,
 									numbuckets,
 									numbatches,
+									numbatches_inmemory,
 									num_skew_mcvs);
 			return;
 		}
@@ -848,11 +857,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch = pg_nextpower2_32(Max(2, minbatch));
 	}
 
+	/*
+	 * See how many batches we can fit into memory (driven mostly by size
+	 * of BufFile, with PGAlignedBlock being the largest part of that).
+	 * We need one BufFile for inner and outer side, so we count it twice
+	 * for each batch, and we stop once we exceed (work_mem/2).
+	 */
+	while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+			<= (work_mem * 1024L / 2))
+		nbatch_inmemory *= 2;
+
 	Assert(nbuckets > 0);
 	Assert(nbatch > 0);
 
 	*numbuckets = nbuckets;
 	*numbatches = nbatch;
+	*numbatches_inmemory = nbatch_inmemory;
 }
 
 
@@ -874,13 +894,21 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+		for (i = 1; i < nbatch; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
 			if (hashtable->outerBatchFile[i])
 				BufFileClose(hashtable->outerBatchFile[i]);
 		}
+
+		if (hashtable->innerOverflowFile)
+			BufFileClose(hashtable->innerOverflowFile);
+
+		if (hashtable->outerOverflowFile)
+			BufFileClose(hashtable->outerOverflowFile);
 	}
 
 	/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -923,11 +951,13 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	if (hashtable->innerBatchFile == NULL)
 	{
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
 		MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
 
 		/* we had no file arrays before */
-		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
-		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
+		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch_tmp);
 
 		MemoryContextSwitchTo(oldcxt);
 
@@ -936,9 +966,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 	else
 	{
+		int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+		int oldnbatch_tmp = Min(oldnbatch, hashtable->nbatch_inmemory);
+
 		/* enlarge arrays and zero out added entries */
-		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
-		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
+		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch_tmp, nbatch_tmp);
 	}
 
 	hashtable->nbatch = nbatch;
@@ -1008,11 +1041,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			else
 			{
+				BufFile **batchFile;
+
 				/* dump it out */
 				Assert(batchno > curbatch);
+
+				batchFile = ExecHashGetBatchFile(hashtable, batchno,
+												 hashtable->innerBatchFile,
+												 &hashtable->innerOverflowFile);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
-									  &hashtable->innerBatchFile[batchno],
+									  batchFile,
 									  hashtable);
 
 				hashtable->spaceUsed -= hashTupleSize;
@@ -1673,22 +1713,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 		/* Account for space used, and back off if we've used too much */
 		hashtable->spaceUsed += hashTupleSize;
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
+
+		/* Consider increasing number of batches if we filled work_mem. */
 		if (hashtable->spaceUsed +
-			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+			hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+			Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2	/* inner + outer */
 			> hashtable->spaceAllowed)
 			ExecHashIncreaseNumBatches(hashtable);
 	}
 	else
 	{
+		BufFile **batchFile;
+
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
+		batchFile = ExecHashGetBatchFile(hashtable, batchno,
+										 hashtable->innerBatchFile,
+										 &hashtable->innerOverflowFile);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
-							  &hashtable->innerBatchFile[batchno],
+							  batchFile,
 							  hashtable);
 	}
 
@@ -1843,6 +1894,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 	}
 }
 
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+	int	slice,
+		curslice;
+
+	if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+		return batchno;
+
+	slice = batchno / hashtable->nbatch_inmemory;
+	curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+	/* slices can't go backwards */
+	Assert(slice >= curslice);
+
+	/* overflow slice */
+	if (slice > curslice)
+		return -1;
+
+	/* current slice, compute index in the current array */
+	return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **batchFiles, BufFile **overflowFile)
+{
+	int		idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+	if (idx == -1)
+		return overflowFile;
+
+	return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+	memset(hashtable->innerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->innerBatchFile[0] = hashtable->innerOverflowFile;
+	hashtable->innerOverflowFile = NULL;
+
+	memset(hashtable->outerBatchFile, 0,
+		   hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+	hashtable->outerBatchFile[0] = hashtable->outerOverflowFile;
+	hashtable->outerOverflowFile = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+	int		batchidx;
+
+	hashtable->curbatch++;
+
+	/* see if we skipped to the next batch slice */
+	batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+	/* Can't be -1, current batch is in the current slice by definition. */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+	/*
+	 * If we skipped to the next slice of batches, reset the array of files
+	 * and use the overflow file as the first batch.
+	 */
+	if (batchidx == 0)
+		ExecHashSwitchToNextBatchSlice(hashtable);
+
+	return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+	Size	spaceUsed = hashtable->spaceUsed;
+
+	/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+	spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+	/* Account for memory used for batch files (inner + outer) */
+	spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	/* Account for slice files (inner + outer) */
+	spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+				 sizeof(PGAlignedBlock) * 2;
+
+	if (spaceUsed > hashtable->spacePeak)
+		hashtable->spacePeak = spaceUsed;
+}
+
 /*
  * ExecScanHashBucket
  *		scan a hash bucket for matches to the current outer tuple
@@ -2349,8 +2494,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			+ mcvsToUse * sizeof(int);
 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
 			+ mcvsToUse * sizeof(int);
-		if (hashtable->spaceUsed > hashtable->spacePeak)
-			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* refresh info about peak used memory */
+		ExecHashUpdateSpacePeak(hashtable);
 
 		/*
 		 * Create a skew bucket for each MCV hash value.
@@ -2399,8 +2545,9 @@ ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable,
 			hashtable->nSkewBuckets++;
 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
-			if (hashtable->spaceUsed > hashtable->spacePeak)
-				hashtable->spacePeak = hashtable->spaceUsed;
+
+			/* refresh info about peak used memory */
+			ExecHashUpdateSpacePeak(hashtable);
 		}
 
 		free_attstatsslot(&sslot);
@@ -2489,8 +2636,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	/* Account for space used, and back off if we've used too much */
 	hashtable->spaceUsed += hashTupleSize;
 	hashtable->spaceUsedSkew += hashTupleSize;
-	if (hashtable->spaceUsed > hashtable->spacePeak)
-		hashtable->spacePeak = hashtable->spaceUsed;
+
+	/* refresh info about peak used memory */
+	ExecHashUpdateSpacePeak(hashtable);
+
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
@@ -2569,10 +2718,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		}
 		else
 		{
+			BufFile **batchFile;
+
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
+			batchFile = ExecHashGetBatchFile(hashtable, batchno,
+											 hashtable->innerBatchFile,
+											 &hashtable->innerOverflowFile);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
-								  &hashtable->innerBatchFile[batchno],
+								  batchFile,
 								  hashtable);
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
@@ -2750,6 +2906,8 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument,
 							 hashtable->nbatch);
 	instrument->nbatch_original = Max(instrument->nbatch_original,
 									  hashtable->nbatch_original);
+	instrument->nbatch_inmemory = Min(hashtable->nbatch,
+									  hashtable->nbatch_inmemory);
 	instrument->space_peak = Max(instrument->space_peak,
 								 hashtable->spacePeak);
 }
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index ea0045bc0f3..2f5d94653e8 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -481,6 +481,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (batchno != hashtable->curbatch &&
 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
 				{
+					BufFile	  **batchFile;
 					bool		shouldFree;
 					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
 																	  &shouldFree);
@@ -491,8 +492,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
+					batchFile = ExecHashGetBatchFile(hashtable, batchno,
+													 hashtable->outerBatchFile,
+													 &hashtable->outerOverflowFile);
+
 					ExecHashJoinSaveTuple(mintuple, hashvalue,
-										  &hashtable->outerBatchFile[batchno],
+										  batchFile,
 										  hashtable);
 
 					if (shouldFree)
@@ -1030,17 +1036,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	}
 	else if (curbatch < hashtable->nbatch)
 	{
-		BufFile    *file = hashtable->outerBatchFile[curbatch];
+		BufFile    **file = ExecHashGetBatchFile(hashtable, curbatch,
+												 hashtable->outerBatchFile,
+												 &hashtable->outerOverflowFile);
 
 		/*
 		 * In outer-join cases, we could get here even though the batch file
 		 * is empty.
 		 */
-		if (file == NULL)
+		if (*file == NULL)
 			return NULL;
 
 		slot = ExecHashJoinGetSavedTuple(hjstate,
-										 file,
+										 *file,
 										 hashvalue,
 										 hjstate->hj_OuterTupleSlot);
 		if (!TupIsNull(slot))
@@ -1135,9 +1143,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	BufFile    *innerFile;
 	TupleTableSlot *slot;
 	uint32		hashvalue;
+	int			batchidx;
+	int			curbatch_old;
 
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
+	curbatch_old = curbatch;
+
+	/* index of the old batch */
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+	/* has to be in the current slice of batches */
+	Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
 
 	if (curbatch > 0)
 	{
@@ -1145,9 +1162,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * We no longer need the previous outer batch file; close it right
 		 * away to free disk space.
 		 */
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
 	}
 	else						/* we just finished the first batch */
 	{
@@ -1182,45 +1199,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	 * scan, we have to rescan outer batches in case they contain tuples that
 	 * need to be reassigned.
 	 */
-	curbatch++;
+	curbatch = ExecHashSwitchToNextBatch(hashtable);
+	batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
 	while (curbatch < nbatch &&
-		   (hashtable->outerBatchFile[curbatch] == NULL ||
-			hashtable->innerBatchFile[curbatch] == NULL))
+		   (hashtable->outerBatchFile[batchidx] == NULL ||
+			hashtable->innerBatchFile[batchidx] == NULL))
 	{
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			HJ_FILL_OUTER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			HJ_FILL_INNER(hjstate))
 			break;				/* must process due to rule 1 */
-		if (hashtable->innerBatchFile[curbatch] &&
+		if (hashtable->innerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_original)
 			break;				/* must process due to rule 2 */
-		if (hashtable->outerBatchFile[curbatch] &&
+		if (hashtable->outerBatchFile[batchidx] &&
 			nbatch != hashtable->nbatch_outstart)
 			break;				/* must process due to rule 3 */
 		/* We can ignore this batch. */
 		/* Release associated temp files right away. */
-		if (hashtable->innerBatchFile[curbatch])
-			BufFileClose(hashtable->innerBatchFile[curbatch]);
-		hashtable->innerBatchFile[curbatch] = NULL;
-		if (hashtable->outerBatchFile[curbatch])
-			BufFileClose(hashtable->outerBatchFile[curbatch]);
-		hashtable->outerBatchFile[curbatch] = NULL;
-		curbatch++;
+		if (hashtable->innerBatchFile[batchidx])
+			BufFileClose(hashtable->innerBatchFile[batchidx]);
+		hashtable->innerBatchFile[batchidx] = NULL;
+		if (hashtable->outerBatchFile[batchidx])
+			BufFileClose(hashtable->outerBatchFile[batchidx]);
+		hashtable->outerBatchFile[batchidx] = NULL;
+
+		curbatch = ExecHashSwitchToNextBatch(hashtable);
+		batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
 	}
 
 	if (curbatch >= nbatch)
+	{
+		hashtable->curbatch = curbatch_old;
 		return false;			/* no more batches */
-
-	hashtable->curbatch = curbatch;
+	}
 
 	/*
 	 * Reload the hash table with the new inner batch (which could be empty)
 	 */
 	ExecHashTableReset(hashtable);
 
-	innerFile = hashtable->innerBatchFile[curbatch];
+	innerFile = hashtable->innerBatchFile[batchidx];
 
 	if (innerFile != NULL)
 	{
@@ -1246,15 +1268,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 		 * needed
 		 */
 		BufFileClose(innerFile);
-		hashtable->innerBatchFile[curbatch] = NULL;
+		hashtable->innerBatchFile[batchidx] = NULL;
 	}
 
 	/*
 	 * Rewind outer batch file (if present), so that we can start reading it.
 	 */
-	if (hashtable->outerBatchFile[curbatch] != NULL)
+	if (hashtable->outerBatchFile[batchidx] != NULL)
 	{
-		if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
+		if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0, SEEK_SET))
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not rewind hash-join temporary file")));
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c36687aa4df..6f40600d10b 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -4173,6 +4173,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 	int			num_hashclauses = list_length(hashclauses);
 	int			numbuckets;
 	int			numbatches;
+	int			numbatches_inmemory;
 	int			num_skew_mcvs;
 	size_t		space_allowed;	/* unused */
 
@@ -4227,6 +4228,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 							&space_allowed,
 							&numbuckets,
 							&numbatches,
+							&numbatches_inmemory,
 							&num_skew_mcvs);
 
 	/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 2d8ed8688cd..c612ae98117 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -320,6 +320,7 @@ typedef struct HashJoinTableData
 	int		   *skewBucketNums; /* array indexes of active skew buckets */
 
 	int			nbatch;			/* number of batches */
+	int			nbatch_inmemory;	/* max number of in-memory batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
 	int			nbatch_original;	/* nbatch when we started inner scan */
@@ -331,6 +332,9 @@ typedef struct HashJoinTableData
 	double		partialTuples;	/* # tuples obtained from inner plan by me */
 	double		skewTuples;		/* # tuples inserted into skew tuples */
 
+	BufFile	   *innerOverflowFile;	/* temp file for overflow batch batch */
+	BufFile	   *outerOverflowFile;	/* temp file for overflow batch batch */
+
 	/*
 	 * These arrays are allocated for the life of the hash join, but only if
 	 * nbatch > 1.  A file is opened only when we first write a tuple into it
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index e4eb7bc6359..fc82ea43c3b 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
 
 #include "access/parallel.h"
 #include "nodes/execnodes.h"
+#include "storage/buffile.h"
 
 struct SharedHashJoinBatch;
 
@@ -46,6 +47,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 									  uint32 hashvalue,
 									  int *bucketno,
 									  int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+					 BufFile **files, BufFile **overflow);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -62,6 +68,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									size_t *space_allowed,
 									int *numbuckets,
 									int *numbatches,
+									int *numbatches_inmemory,
 									int *num_skew_mcvs);
 extern int	ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
 extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1590b643920..eb8eaea89ab 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2755,6 +2755,7 @@ typedef struct HashInstrumentation
 	int			nbuckets_original;	/* planned number of buckets */
 	int			nbatch;			/* number of batches at end of execution */
 	int			nbatch_original;	/* planned number of batches */
+	int			nbatch_inmemory;	/* number of batches kept in memory */
 	Size		space_peak;		/* peak memory usage in bytes */
 } HashInstrumentation;
 
-- 
2.47.1

Attachment: hash.sql
Description: application/sql

Reply via email to