Last week as I was working on adaptive hash join [1] and trying to get parallel adaptive hash join batch 0 to spill correctly, I noticed what seemed like a problem with the code to repartition batch 0.
If we run out of space while inserting tuples into the hashtable during the build phase of parallel hash join and proceed to increase the number of batches, we need to repartition all of the tuples from the old generation (when nbatch was x) and move them to their new homes in the new generation (when nbatch is 2x). Before we do this repartitioning we disable growth in the number of batches. Then we repartition the tuples from the hashtable, inserting them either back into the hashtable or into a batch file. While inserting them into the hashtable, we call ExecParallelHashTupleAlloc(), and, if there is no space for the current tuple in the current chunk and growth in the number of batches is disabled, we go ahead and allocate a new chunk of memory -- regardless of whether or not we will exceed the space limit. Below, I've included a test case, which, on master, results in an error while trying to allocate shared memory. I use a custom data type whose hash function ensures that the tuples will go to batch 0. With my attached patch, this test case no longer errors out. I discussed with Thomas Munro, and it seems this is not the desired behavior. We discussed how abandoning a repartitioning effort once we know it is doomed is an optimization anyway. To start with, I've attached a patch which bails out of the ExecParallelHashRepartitionFirst() attempt when allocating a new chunk of memory would exceed the space limit. We skip ExecParallelHashRepartitionRest() and engage in the deciding phase as before. This means that we will disable growth in the number of batches if all of the tuples that we attempted to load back into the hashtable from the evicted tuple queue would stay resident in the hashtable. Otherwise, we will set growth to indicate we need to try increasing the number of batches and return, eventually returning NULL to the original allocation function call and indicating we need to retry repartitioning. It's important to note that if we disable growth in the deciding phase due to skew, batch 0, and subsequent batches that had too many tuples to fit in the space allowed, will simply exceed the space limit while building the hashtable. This patch does not fix that. Thomas and I also discussed the potential optimization of bailing out of repartitioning during repartitioning of all of the other batches (after batch 0) in ExecParallelHashRepartitionRest(). This would be a good optimization, however, it isn't addressing a "bug" in the same way that bailing out in ExecParallelHashRepartitionFirst() is. Also, I hacked on a few versions of this optimization and it requires more thought. I would like to propose that as a separate patch and thread. One note about the code of the attached patch, I added a variable to the ParallelHashJoinState structure indicating that repartitioning should be abandoned. Workers only need to check it before allocating a new chunk of memory during repartitioning. I thought about whether or not it would be better to make it a ParallelHashGrowth stage, but I wasn't sure whether or not that made sense. -------------------------------- Test Case -------------------------------- DROP TYPE stub CASCADE; CREATE TYPE stub AS (value CHAR(8098)); CREATE FUNCTION stub_hash(item stub) RETURNS INTEGER AS $$ BEGIN RETURN 0; END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE; CREATE FUNCTION stub_eq(item1 stub, item2 stub) RETURNS BOOLEAN AS $$ BEGIN RETURN item1.value = item2.value; END; $$ LANGUAGE plpgsql IMMUTABLE LEAKPROOF STRICT PARALLEL SAFE; CREATE OPERATOR = ( FUNCTION = stub_eq, LEFTARG = stub, RIGHTARG = stub, COMMUTATOR = =, HASHES, MERGES ); CREATE OPERATOR CLASS stub_hash_ops DEFAULT FOR TYPE stub USING hash AS OPERATOR 1 =(stub, stub), FUNCTION 1 stub_hash(stub); DROP TABLE IF EXISTS probeside_batch0; CREATE TABLE probeside_batch0(a stub); ALTER TABLE probeside_batch0 ALTER COLUMN a SET STORAGE PLAIN; INSERT INTO probeside_batch0 SELECT '("")' FROM generate_series(1, 13); DROP TABLE IF EXISTS hashside_wide_batch0; CREATE TABLE hashside_wide_batch0(a stub, id int); ALTER TABLE hashside_wide_batch0 ALTER COLUMN a SET STORAGE PLAIN; INSERT INTO hashside_wide_batch0 SELECT '("")', 22 FROM generate_series(1, 200); ANALYZE probeside_batch0, hashside_wide_batch0; set min_parallel_table_scan_size = 0; set parallel_setup_cost = 0; set enable_hashjoin = on; set max_parallel_workers_per_gather = 1; set enable_parallel_hash = on; set work_mem = '64kB'; explain (analyze, costs off) SELECT TRIM((probeside_batch0.a).value), hashside_wide_batch0.id, hashside_wide_batch0.ctid as innerctid, TRIM((hashside_wide_batch0.a).value), probeside_batch0.ctid as outerctid FROM probeside_batch0 LEFT OUTER JOIN hashside_wide_batch0 USING (a); --------------------------------- [1] https://www.postgresql.org/message-id/flat/CA%2BhUKGJvYFCcF8vTHFSQQB_F8oGRsBp3JdZAPWbORZgfAPk5Sw%40mail.gmail.com#1156516651bb2587da3909cf1db29952 -- Melanie Plageman
From 22b01b7e514cf975bb70d14918dcb6611a09bbd4 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Thu, 2 Jul 2020 17:02:48 -0700 Subject: [PATCH v1] Bail out of repartitioning batch 0 when space is exhausted Previously, ExecParallelHashTupleAlloc() would allocate a new chunk of shared memory while repartitioning batch 0 -- even if it would exceed the space limit. This patch adds a new exception to allow the executor to bail out of repartitioning when growth is only disabled because we are in the middle of repartitioning. --- src/backend/executor/nodeHash.c | 30 ++++++++++++++++++++++++++++- src/backend/executor/nodeHashjoin.c | 1 + src/include/executor/hashjoin.h | 1 + 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 45b342011f..9c40d010f3 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1083,6 +1083,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) int new_nbatch; int i; + pstate->abandon_repartitioning = false; /* Move the old batch out of the way. */ old_batch0 = hashtable->batches[0].shared; pstate->old_batches = pstate->batches; @@ -1195,7 +1196,8 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ExecParallelHashTableSetCurrentBatch(hashtable, 0); /* Then partition, flush counters. */ ExecParallelHashRepartitionFirst(hashtable); - ExecParallelHashRepartitionRest(hashtable); + if (!pstate->abandon_repartitioning) + ExecParallelHashRepartitionRest(hashtable); ExecParallelHashMergeCounters(hashtable); /* Wait for the above to be finished. */ BarrierArriveAndWait(&pstate->grow_batches_barrier, @@ -1302,6 +1304,11 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable) ExecParallelHashTupleAlloc(hashtable, HJTUPLE_OVERHEAD + tuple->t_len, &shared); + if (!copyTuple) + { + Assert(hashtable->parallel_state->abandon_repartitioning); + return; + } copyTuple->hashvalue = hashTuple->hashvalue; memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len); ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], @@ -1759,6 +1766,8 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, hashTuple = ExecParallelHashTupleAlloc(hashtable, HJTUPLE_OVERHEAD + tuple->t_len, &shared); + /* After finishing with the build phase, this function should never fail */ + Assert(hashTuple); hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); @@ -2847,6 +2856,25 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, else chunk_size = HASH_CHUNK_SIZE; + /* + * If we are already repartitioning and the space is full, we need to + * abandon this repartitioning attempt and either start a new attempt with + * double the number of batches or disable growth and resign ourselves to + * exceeding the space allowed. We will decide during + * PHJ_GROW_BATCHES_DECIDING phase which of these to do. + */ + if (pstate->growth == PHJ_GROWTH_DISABLED && + PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)) == + PHJ_GROW_BATCHES_REPARTITIONING && + hashtable->batches[0].at_least_one_chunk && + hashtable->batches[0].shared->size + chunk_size > pstate->space_allowed) + { + pstate->abandon_repartitioning = true; + hashtable->batches[0].shared->space_exhausted = true; + LWLockRelease(&pstate->lock); + return NULL; + } + /* Check if it's time to grow batches or buckets. */ if (pstate->growth != PHJ_GROWTH_DISABLED) { diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9bb23fef1a..39df3cbbc6 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1480,6 +1480,7 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt) pstate->total_tuples = 0; LWLockInitialize(&pstate->lock, LWTRANCHE_PARALLEL_HASH_JOIN); + pstate->abandon_repartitioning = false; BarrierInit(&pstate->build_barrier, 0); BarrierInit(&pstate->grow_batches_barrier, 0); BarrierInit(&pstate->grow_buckets_barrier, 0); diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 79b634e8ed..5bdbd4a3c7 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -244,6 +244,7 @@ typedef struct ParallelHashJoinState size_t space_allowed; size_t total_tuples; /* total number of inner tuples */ LWLock lock; /* lock protecting the above */ + bool abandon_repartitioning; Barrier build_barrier; /* synchronization for the build phases */ Barrier grow_batches_barrier; -- 2.20.1