On Mon, Sep 21, 2020 at 8:34 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> On Tue, Sep 22, 2020 at 8:49 AM Melanie Plageman > <melanieplage...@gmail.com> wrote: > > On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas.mu...@gmail.com> > wrote: > > I took it for a very quick spin and saw simple cases working nicely, > but TPC-DS queries 51 and 97 (which contain full joins) couldn't be > convinced to use it. Hmm. > Thanks for taking a look, Thomas! Both query 51 and query 97 have full outer joins of two CTEs, each of which are aggregate queries. During planning when constructing the joinrel and choosing paths, in hash_inner_and_outer(), we don't consider parallel hash parallel hash join paths because the outerrel and innerrel do not have partial_pathlists. This code if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) gates the code to generate partial paths for hash join. My understanding of this is that if the inner and outerrel don't have partial paths, then they can't be executed in parallel, so the join could not be executed in parallel. For the two TPC-DS queries, even if they use parallel aggs, the finalize agg will have to be done by a single worker, so I don't think they could be joined with a parallel hash join. I added some logging inside the "if" statement and ran join_hash.sql in regress to see what nodes were typically in the pathlist and partial pathlist. All of them had basically just sequential scans as the outer and inner rel paths. regress examples are definitely meant to be minimal, so this probably wasn't the best place to look for examples of more complex rels that can be joined with a parallel hash join. > > >> Some other notes on the patch: > > From a quick peek: > > +/* > + * Upon arriving at the barrier, if this worker is not the last > worker attached, > + * detach from the barrier and return false. If this worker is the last > worker, > + * remain attached and advance the phase of the barrier, return true > to indicate > + * you are the last or "elected" worker who is still attached to the > barrier. > + * Another name I considered was BarrierUniqueify or BarrierSoloAssign > + */ > +bool > +BarrierDetachOrElect(Barrier *barrier) > > I tried to find some existing naming in writing about > barriers/phasers, but nothing is jumping out at me. I think a lot of > this stuff comes from super computing where I guess "make all of the > threads give up except one" isn't a primitive they'd be too excited > about :-) > > BarrierArriveAndElectOrDetach()... gah, no. > You're right that Arrive should be in there. So, I went with BarrierArriveAndDetachExceptLast() It's specific, if not clever. > > + last = BarrierDetachOrElect(&batch->batch_barrier); > > I'd be nice to add some assertions after that, in the 'last' path, > that there's only one participant and that the phase is as expected, > just to make it even clearer to the reader, and a comment in the other > path that we are no longer attached. > Assert and comment added to the single worker path. The other path is just back to HJ_NEED_NEW_BATCH and workers will detach there as before, so I'm not sure where we could add the comment about the other workers detaching. > > + hjstate->hj_AllocatedBucketRange = 0; > ... > + pg_atomic_uint32 bucket; /* bucket allocator for unmatched inner > scan */ > ... > + //volatile int mybp = 0; while (mybp == 0) > > Some leftover fragments of the bucket-scan version and debugging stuff. > cleaned up (and rebased). I also changed ExecScanHashTableForUnmatched() to scan HashMemoryChunks in the hashtable instead of using the buckets to align parallel and serial hash join code. Originally, I had that code freeing the chunks of the hashtable after finishing scanning them, however, I noticed this query from regress failing: select * from (values (1, array[10,20]), (2, array[20,30])) as v1(v1x,v1ys) left join (values (1, 10), (2, 20)) as v2(v2x,v2y) on v2x = v1x left join unnest(v1ys) as u1(u1y) on u1y = v2y; It is because the hash join gets rescanned and because there is only one batch, ExecReScanHashJoin reuses the same hashtable. QUERY PLAN ------------------------------------------------------------- Nested Loop Left Join -> Values Scan on "*VALUES*" -> Hash Right Join Hash Cond: (u1.u1y = "*VALUES*_1".column2) Filter: ("*VALUES*_1".column1 = "*VALUES*".column1) -> Function Scan on unnest u1 -> Hash -> Values Scan on "*VALUES*_1" I was freeing the hashtable as I scanned each chunk, which clearly doesn't work for a single batch hash join which gets rescanned. I don't see anything specific to parallel hash join in ExecReScanHashJoin(), so, it seems like the same rules apply to parallel hash join. So, I will have to remove the logic that frees the hash table after scanning each chunk from the parallel function as well. In addition, I still need to go through the patch with a fine tooth comb (refine the comments and variable names and such) but just wanted to check that these changes were in line with what you were thinking first. Regards, Melanie (Microsoft)
From 6d34cbee84b06aa27d6c73426f29ef0d50dadb3a Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Tue, 29 Sep 2020 13:47:57 -0700 Subject: [PATCH v2] Support Parallel FOJ and ROJ To support parallel FOJ and ROJ, - re-enable setting match bit for tuples in the hash table - a single worker preps for unmatched inner tuple scan in HJ_NEED_NEW_OUTER and transitions to HJ_FILL_INNER to avoid deadlock. ExecParallelScanHashTableForUnmatched() is no longer executed by multiple workers. A single worker will scan each HashMemoryChunk in the hash table, freeing it after finishing with it. - To align parallel and serial hash join, change ExecScanHashTableForUnmatched() to also scan HashMemoryChunks for the unmatched tuple scan instead of using the buckets --- src/backend/executor/nodeHash.c | 195 ++++++++++++++++++------ src/backend/executor/nodeHashjoin.c | 61 ++++---- src/backend/optimizer/path/joinpath.c | 14 +- src/backend/postmaster/pgstat.c | 3 + src/backend/storage/ipc/barrier.c | 22 ++- src/include/executor/hashjoin.h | 13 +- src/include/executor/nodeHash.h | 3 + src/include/pgstat.h | 1 + src/include/storage/barrier.h | 1 + src/test/regress/expected/join_hash.out | 56 ++++++- src/test/regress/sql/join_hash.sql | 23 ++- 11 files changed, 302 insertions(+), 90 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index ea69eeb2a1..42cb8514d3 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100; hashtable->chunks = NULL; hashtable->current_chunk = NULL; + hashtable->current_chunk_idx = 0; hashtable->parallel_state = state->parallel_state; hashtable->area = state->ps.state->es_query_dsa; hashtable->batches = NULL; @@ -2053,9 +2054,56 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) * hj_CurTuple: last tuple returned, or NULL to start next bucket *---------- */ + HashJoinTable hashtable = hjstate->hj_HashTable; + + hjstate->hj_CurBucketNo = 0; + hjstate->hj_CurSkewBucketNo = 0; + hjstate->hj_CurTuple = NULL; + hashtable->current_chunk = hashtable->chunks; + hashtable->current_chunk_idx = 0; +} + +/* + * ExecPrepHashTableForUnmatched + * set up for a series of ExecScanHashTableForUnmatched calls + * return true if this worker is elected to do the unmatched inner scan + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatchAccessor *batch_accessor = &hashtable->batches[curbatch]; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool last = false; hjstate->hj_CurBucketNo = 0; hjstate->hj_CurSkewBucketNo = 0; hjstate->hj_CurTuple = NULL; + if (curbatch < 0) + return false; + last = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (!last) + { + hashtable->batches[hashtable->curbatch].done = true; + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak,batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + } + else + { + batch_accessor->shared_chunk = batch->chunks; + batch_accessor->current_chunk = dsa_get_address(hashtable->area, batch_accessor->shared_chunk); + batch_accessor->current_chunk_idx = 0; + } + return last; } /* @@ -2069,63 +2117,118 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) { - HashJoinTable hashtable = hjstate->hj_HashTable; - HashJoinTuple hashTuple = hjstate->hj_CurTuple; + HashMemoryChunk next; + HashJoinTable hashtable = hjstate->hj_HashTable; - for (;;) + while (hashtable->current_chunk) { - /* - * hj_CurTuple is the address of the tuple last returned from the - * current bucket, or NULL if it's time to start scanning a new - * bucket. - */ - if (hashTuple != NULL) - hashTuple = hashTuple->next.unshared; - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + while (hashtable->current_chunk_idx < hashtable->current_chunk->used) { - hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo]; - hjstate->hj_CurBucketNo++; - } - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets) - { - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo]; + HashJoinTuple hashTuple = (HashJoinTuple) ( + HASH_CHUNK_DATA(hashtable->current_chunk) + + hashtable->current_chunk_idx + ); + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple); + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len); + + /* next tuple in this chunk */ + hashtable->current_chunk_idx += MAXALIGN(hashTupleSize); + + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + continue; - hashTuple = hashtable->skewBucket[j]->tuples; - hjstate->hj_CurSkewBucketNo++; + /* insert hashtable's tuple into exec slot */ + econtext->ecxt_innertuple = + ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; } - else - break; /* finished all buckets */ - while (hashTuple != NULL) + next = hashtable->current_chunk->next.unshared; + hashtable->current_chunk = next; + hashtable->current_chunk_idx = 0; + + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + dsa_pointer next; + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[curbatch]; + + /* + * Only one worker should execute this function. + * Since tuples have already been emitted, it is hazardous for workers + * to wait at the batch_barrier again. Instead, all workers except the last + * will detach and the last will conduct this unmatched inner tuple scan. + */ + Assert(BarrierParticipants(&accessor->shared->batch_barrier) == 1); + while (accessor->current_chunk) + { + while (accessor->current_chunk_idx < accessor->current_chunk->used) { - if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) - { - TupleTableSlot *inntuple; + HashJoinTuple hashTuple = (HashJoinTuple) ( + HASH_CHUNK_DATA(accessor->current_chunk) + accessor->current_chunk_idx + ); + accessor->current_chunk_idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len); - /* insert hashtable's tuple into exec slot */ - inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), - hjstate->hj_HashTupleSlot, - false); /* do not pfree */ - econtext->ecxt_innertuple = inntuple; + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + continue; - /* - * Reset temp memory each time; although this function doesn't - * do any qual eval, the caller will, so let's keep it - * parallel to ExecScanHashBucket. - */ - ResetExprContext(econtext); + /* insert hashtable's tuple into exec slot */ + econtext->ecxt_innertuple = ExecStoreMinimalTuple( + HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot,false); - hjstate->hj_CurTuple = hashTuple; - return true; - } + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it parallel to + * ExecScanHashBucket. + */ + ResetExprContext(econtext); - hashTuple = hashTuple->next.unshared; + hjstate->hj_CurTuple = hashTuple; + return true; } - /* allow this loop to be cancellable */ + next = accessor->current_chunk->next.shared; + dsa_free(hashtable->area, accessor->shared_chunk); + accessor->shared_chunk = next; + accessor->current_chunk = dsa_get_address(hashtable->area, accessor->shared_chunk); + accessor->current_chunk_idx = 0; + CHECK_FOR_INTERRUPTS(); } + accessor->shared->chunks = InvalidDsaPointer; /* * no more unmatched tuples */ @@ -3131,13 +3234,6 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) /* Detach from the batch we were last working on. */ if (BarrierArriveAndDetach(&batch->batch_barrier)) { - /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. - */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) { @@ -3271,6 +3367,9 @@ ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno) hashtable->current_chunk = NULL; hashtable->current_chunk_shared = InvalidDsaPointer; hashtable->batches[batchno].at_least_one_chunk = false; + hashtable->batches[batchno].shared_chunk = InvalidDsaPointer; + hashtable->batches[batchno].current_chunk = NULL; + hashtable->batches[batchno].current_chunk_idx = 0; } /* diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5532b91a71..7a2b5275bb 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -82,7 +82,9 @@ * PHJ_BATCH_ALLOCATING -- one allocates buckets * PHJ_BATCH_LOADING -- all load the hash table from disk * PHJ_BATCH_PROBING -- all probe - * PHJ_BATCH_DONE -- end + + * PHJ_BATCH_DONE -- queries not requiring inner fill done + * PHJ_BATCH_FILL_INNER_DONE -- inner fill completed, all queries done * * Batch 0 is a special case, because it starts out in phase * PHJ_BATCH_PROBING; populating batch 0's hash table is done during @@ -99,7 +101,9 @@ * while attached to a barrier, unless the barrier has reached its final * state. In the slightly special case of the per-batch barrier, we return * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE or + * PHJ_BATCH_DONE_INNER, depending on whether or not the join requires + * a scan for unmatched inner tuples, without waiting. * *------------------------------------------------------------------------- */ @@ -360,9 +364,19 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) /* end of batch, or maybe whole join */ if (HJ_FILL_INNER(node)) { - /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + /* set up to scan for unmatched inner tuples */ + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -455,25 +469,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), + * but we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -531,7 +533,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -1173,15 +1176,17 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase + * so that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); return true; - case PHJ_BATCH_DONE: + /* Fall through. */ + + case PHJ_BATCH_FILL_INNER_DONE: /* * Already done. Detach and go around again (if any diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index db54a6ba2e..cbc8c2ad83 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1845,15 +1845,9 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. - * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel - * Hash, since in that case we're back to a single hash table with a - * single set of match bits for each batch, but that will require - * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -1887,9 +1881,13 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * (building the hash table in each backend) because no one process + * has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e6be2b7836..f6f242d806 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3782,6 +3782,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_BATCH_LOAD: event_name = "HashBatchLoad"; break; + case WAIT_EVENT_HASH_BATCH_PROBE: + event_name = "HashBatchProbe"; + break; case WAIT_EVENT_HASH_BUILD_ALLOCATE: event_name = "HashBuildAllocate"; break; diff --git a/src/backend/storage/ipc/barrier.c b/src/backend/storage/ipc/barrier.c index 3e200e02cc..2e7b0687ef 100644 --- a/src/backend/storage/ipc/barrier.c +++ b/src/backend/storage/ipc/barrier.c @@ -204,6 +204,27 @@ BarrierArriveAndDetach(Barrier *barrier) { return BarrierDetachImpl(barrier, true); } +/* + * Upon arriving at the barrier, if this worker is not the last worker attached, + * detach from the barrier and return false. If this worker is the last worker, + * remain attached and advance the phase of the barrier, return true to indicate + * you are the last or "elected" worker who is still attached to the barrier. + */ +bool +BarrierArriveAndDetachExceptLast(Barrier *barrier) +{ + SpinLockAcquire(&barrier->mutex); + if (barrier->participants > 1) + { + --barrier->participants; + SpinLockRelease(&barrier->mutex); + return false; + } + Assert(barrier->participants == 1); + ++barrier->phase; + SpinLockRelease(&barrier->mutex); + return true; +} /* * Attach to a barrier. All waiting participants will now wait for this @@ -221,7 +242,6 @@ BarrierAttach(Barrier *barrier) ++barrier->participants; phase = barrier->phase; SpinLockRelease(&barrier->mutex); - return phase; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index eb5daba36b..2af7228ef0 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -205,6 +205,14 @@ typedef struct ParallelHashJoinBatchAccessor bool at_least_one_chunk; /* has this backend allocated a chunk? */ bool done; /* flag to remember that a batch is done */ + /* + * While doing the unmatched inner scan, the assigned worker may emit + * tuples. Thus, we must keep track of where it was in the hashtable + * so it can return to the correct offset within the correct chunk. + */ + dsa_pointer shared_chunk; + HashMemoryChunk current_chunk; + size_t current_chunk_idx; SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; } ParallelHashJoinBatchAccessor; @@ -265,7 +273,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_FILL_INNER_DONE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 @@ -351,6 +360,8 @@ typedef struct HashJoinTableData /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ + size_t current_chunk_idx; /* index of tuple within current chunk for serial unmatched inner scan */ + /* Shared and private state for Parallel Hash. */ HashMemoryChunk current_chunk; /* this backend's current chunk */ dsa_area *area; /* DSA area to allocate memory from */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 2db4e2f672..a642736d54 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -56,8 +56,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 0dfbac46b4..62d5f1d16b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -856,6 +856,7 @@ typedef enum WAIT_EVENT_HASH_BATCH_ALLOCATE, WAIT_EVENT_HASH_BATCH_ELECT, WAIT_EVENT_HASH_BATCH_LOAD, + WAIT_EVENT_HASH_BATCH_PROBE, WAIT_EVENT_HASH_BUILD_ALLOCATE, WAIT_EVENT_HASH_BUILD_ELECT, WAIT_EVENT_HASH_BUILD_HASH_INNER, diff --git a/src/include/storage/barrier.h b/src/include/storage/barrier.h index d71927cc2f..e0de24378b 100644 --- a/src/include/storage/barrier.h +++ b/src/include/storage/barrier.h @@ -37,6 +37,7 @@ typedef struct Barrier extern void BarrierInit(Barrier *barrier, int num_workers); extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info); extern bool BarrierArriveAndDetach(Barrier *barrier); +extern bool BarrierArriveAndDetachExceptLast(Barrier *barrier); extern int BarrierAttach(Barrier *barrier); extern bool BarrierDetach(Barrier *barrier); extern int BarrierPhase(Barrier *barrier); diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 3a91c144a2..4ca0e01756 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -767,8 +767,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -788,6 +789,31 @@ select count(*) from simple r full outer join simple s using (id); 20000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: (r.id = s.id) + -> Parallel Seq Scan on simple r + -> Parallel Hash + -> Parallel Seq Scan on simple s +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 20000 +(1 row) + rollback to settings; -- An full outer join where every record is not matched. -- non-parallel @@ -812,8 +838,9 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); @@ -833,6 +860,31 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); 40000 (1 row) +rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 2 + -> Partial Aggregate + -> Parallel Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Parallel Seq Scan on simple s + -> Parallel Hash + -> Parallel Seq Scan on simple r +(9 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +------- + 40000 +(1 row) + rollback to settings; -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 68c1a8c7b6..504b3611ca 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -418,7 +418,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -436,14 +445,24 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) -- 2.25.1