On Wed, Apr 29, 2020 at 4:44 PM David Kimura <david.g.kim...@gmail.com> wrote:
>
> Following patch adds logic to create a batch 0 file for serial hash join so
> that even in pathalogical case we do not need to exceed work_mem.

Updated the patch to spill batch 0 tuples after it is marked as fallback.

A couple questions from looking more at serial code:

1) Does the current pattern to repartition batches *after* the previous
   hashtable insert exceeds work_mem still make sense?

   In that case we'd allow ourselves to exceed work_mem by one tuple. If that
   doesn't seem correct anymore then I think we can move the space exceeded
   check in ExecHashTableInsert() *before* actual hashtable insert.

2) After batch 0 is marked fallback, does the logic to insert into its batch
   file fit more in MultiExecPrivateHash() or ExecHashTableInsert()?

   The latter already has logic to decide whether to insert into hashtable or
   batchfile

Thanks,
David
From f0a3bbed9c80ad304f6cea9ace33534be4f4c3cd Mon Sep 17 00:00:00 2001
From: David Kimura <dkimura@pivotal.io>
Date: Wed, 29 Apr 2020 16:54:36 +0000
Subject: [PATCH v6 2/2] Implement fallback of batch 0 for serial adaptive hash
 join

There is some fuzzyness around concerns of different functions, specifically
ExecHashTableInsert() and ExecHashIncreaseNumBatches().  Existing model allows
insert to succeed and then later adjusts the number of batches or fallback. But
this doesn't address exceeding work_mem until after the fact. Instead this
change makes a decision of whether to insert into hashtable of batch file when
relocating tuples in between batches inside ExecHashIncreaseNumBatches().
---
 src/backend/executor/nodeHash.c     | 43 +++++++++++++++++++++--------
 src/backend/executor/nodeHashjoin.c | 17 ++++++++++++
 2 files changed, 48 insertions(+), 12 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ecbc76ab5..9340db9fb7 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -183,12 +183,36 @@ MultiExecPrivateHash(HashState *node)
 			else
 			{
 				/* Not subject to skew optimization, so insert normally */
-				ExecHashTableInsert(hashtable, slot, hashvalue);
+				int			bucketno;
+				int			batchno;
+				bool		shouldFree;
+				MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+				ExecHashGetBucketAndBatch(hashtable, hashvalue,
+							  &bucketno, &batchno);
+				if (hashtable->hashloop_fallback && hashtable->hashloop_fallback[0])
+					ExecHashJoinSaveTuple(tuple,
+										  hashvalue,
+										  &hashtable->innerBatchFile[batchno]);
+				else
+					ExecHashTableInsert(hashtable, slot, hashvalue);
+
+				if (shouldFree)
+					heap_free_minimal_tuple(tuple);
+
 			}
 			hashtable->totalTuples += 1;
 		}
 	}
 
+	if (hashtable->innerBatchFile && hashtable->innerBatchFile[0])
+	{
+		if (BufFileSeek(hashtable->innerBatchFile[0], 0, 0L, SEEK_SET))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not rewind hash-join temporary file: %m")));
+	}
+
 	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
@@ -925,6 +949,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	int			childbatch_outgoing_tuples;
 	int			target_batch;
 	FallbackBatchStats *fallback_batch_stats;
+	size_t		currentBatchSize = 0;
 
 	if (hashtable->hashloop_fallback && hashtable->hashloop_fallback[curbatch])
 		return;
@@ -1029,7 +1054,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
 
-			if (batchno == curbatch)
+			if (batchno == curbatch && (curbatch != 0 || currentBatchSize + hashTupleSize < hashtable->spaceAllowed))
 			{
 				/* keep tuple in memory - copy it into the new chunk */
 				HashJoinTuple copyTuple;
@@ -1041,11 +1066,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 				copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
 				hashtable->buckets.unshared[bucketno] = copyTuple;
 				curbatch_outgoing_tuples++;
+				currentBatchSize += hashTupleSize;
 			}
 			else
 			{
 				/* dump it out */
-				Assert(batchno > curbatch);
+				Assert(batchno > curbatch || currentBatchSize + hashTupleSize >= hashtable->spaceAllowed);
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
 									  &hashtable->innerBatchFile[batchno]);
@@ -1081,13 +1107,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		   hashtable, nfreed, ninmemory, hashtable->spaceUsed);
 #endif
 
-	/*
-	 * For now we do not support fallback in batch 0 as it is a special case
-	 * and assumed to fit in hashtable.
-	 */
-	if (curbatch == 0)
-		return;
-
 	/*
 	 * The same batch should not be marked to fall back more than once
 	 */
@@ -1097,9 +1116,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	if ((curbatch_outgoing_tuples / (float) ninmemory) >= 0.8)
 		printf("curbatch %i targeted to fallback.", curbatch);
 #endif
-	if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && childbatch > 0)
+	if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION)
 		target_batch = childbatch;
-	else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && curbatch > 0)
+	else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION)
 		target_batch = curbatch;
 	else
 		return;
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 516067f176..8f3f4d4b44 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -507,6 +507,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
 					continue;
 				}
+				if (batchno == 0 && node->hj_HashTable->curstripe == 0 && IsHashloopFallback(hashtable))
+				{
+					bool		shouldFree;
+					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
+																	  &shouldFree);
+
+					/*
+					 * Need to save this outer tuple to a batch since batch 0
+					 * is fallback and we must later rewind.
+					 */
+					Assert(parallel_state == NULL);
+					ExecHashJoinSaveTuple(mintuple, hashvalue,
+										  &hashtable->outerBatchFile[batchno]);
+
+					if (shouldFree)
+						heap_free_minimal_tuple(mintuple);
+				}
 
 				/*
 				 * While probing the phantom stripe, don't increment
-- 
2.17.0

Reply via email to