On Sat, Mar 25, 2023 at 09:21:34AM +1300, Thomas Munro wrote:
>  * reuse the same umatched_scan_{chunk,idx} variables as above
>  * rename the list of chunks to scan to work_queue
>  * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it
> wasn't OK to just fall through)

ah, good catch.

> I don't love the way that both ExecHashTableDetachBatch() and
> ExecParallelPrepHashTableForUnmatched() duplicate logic relating to
> the _SCAN/_FREE protocol, but I'm struggling to find a better idea.
> Perhaps I just need more coffee.

I'm not sure if I have strong feelings either way.
To confirm I understand, though: in ExecHashTableDetachBatch(), the call
to BarrierArriveAndDetachExceptLast() serves only to advance the barrier
phase through _SCAN, right? It doesn't really matter if this worker is
the last worker since BarrierArriveAndDetach() handles that for us.
There isn't another barrier function to do this (and I mostly think it
is fine), but I did have to think on it for a bit.

Oh, and, unrelated, but it is maybe worth updating the BarrierAttach()
function comment to mention BarrierArriveAndDetachExceptLast().

> I think your idea of opportunistically joining the scan if it's
> already running makes sense to explore for a later step, ie to make
> multi-batch PHFJ fully fair, and I think that should be a fairly easy
> code change, and I put in some comments where changes would be needed.

makes sense.

I have some very minor pieces of feedback, mainly about extraneous
commas that made me uncomfortable ;)

> From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas.mu...@gmail.com>
> Date: Fri, 24 Mar 2023 14:19:07 +1300
> Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order.
> 
> In a full/right outer join, we need to scan every tuple in the hash
> table to find the ones that were not matched while probing, so that we

Given how you are using the word "so" here, I think that comma before it
is not needed.

> @@ -2083,58 +2079,45 @@ bool
>  ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
>  {
>       HashJoinTable hashtable = hjstate->hj_HashTable;
> -     HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> +     HashMemoryChunk chunk;
>  
> -     for (;;)
> +     while ((chunk = hashtable->unmatched_scan_chunk))
>       {
> -             /*
> -              * hj_CurTuple is the address of the tuple last returned from 
> the
> -              * current bucket, or NULL if it's time to start scanning a new
> -              * bucket.
> -              */
> -             if (hashTuple != NULL)
> -                     hashTuple = hashTuple->next.unshared;
> -             else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
> -             {
> -                     hashTuple = 
> hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
> -                     hjstate->hj_CurBucketNo++;
> -             }
> -             else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
> +             while (hashtable->unmatched_scan_idx < chunk->used)
>               {
> -                     int                     j = 
> hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
> +                     HashJoinTuple hashTuple = (HashJoinTuple)
> +                     (HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) +
> +                      hashtable->unmatched_scan_idx);
>  
> -                     hashTuple = hashtable->skewBucket[j]->tuples;
> -                     hjstate->hj_CurSkewBucketNo++;
> -             }
> -             else
> -                     break;                          /* finished all buckets 
> */
> +                     MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
> +                     int                     hashTupleSize = 
> (HJTUPLE_OVERHEAD + tuple->t_len);
>  
> -             while (hashTuple != NULL)
> -             {
> -                     if 
> (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> -                     {
> -                             TupleTableSlot *inntuple;
> +                     /* next tuple in this chunk */
> +                     hashtable->unmatched_scan_idx += 
> MAXALIGN(hashTupleSize);
>  
> -                             /* insert hashtable's tuple into exec slot */
> -                             inntuple = 
> ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> -                                                                             
>                  hjstate->hj_HashTupleSlot,
> -                                                                             
>                  false);        /* do not pfree */
> -                             econtext->ecxt_innertuple = inntuple;
> +                     if 
> (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> +                             continue;
>  
> -                             /*
> -                              * Reset temp memory each time; although this 
> function doesn't
> -                              * do any qual eval, the caller will, so let's 
> keep it
> -                              * parallel to ExecScanHashBucket.
> -                              */
> -                             ResetExprContext(econtext);

I don't think I had done this before. Good call.

> +                     /* insert hashtable's tuple into exec slot */
> +                     econtext->ecxt_innertuple =
> +                             
> ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> +                                                                       
> hjstate->hj_HashTupleSlot,
> +                                                                       
> false);

> From 6f4e82f0569e5b388440ca0ef268dd307388e8f8 Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas.mu...@gmail.com>
> Date: Fri, 24 Mar 2023 15:23:14 +1300
> Subject: [PATCH v12 2/2] Parallel Hash Full Join.
> 
> Full and right outer joins were not supported in the initial
> implementation of Parallel Hash Join, because of deadlock hazards (see

no comma needed before the "because" here

> discussion).  Therefore FULL JOIN inhibited page-based parallelism,
> as the other join strategies can't do it either.

I actually don't quite understand what this means? It's been awhile for
me, so perhaps I'm being dense, but what is page-based parallelism?
Also, I would put a comma after "Therefore" :)

> Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
> the inner side of one batch's hash table.  For now, sidestep the
> deadlock problem by terminating parallelism there.  The last process to
> arrive at that phase emits the unmatched tuples, while others detach and
> are free to go and work on other batches, if there are any, but
> otherwise they finish the join early.
> 
> That unfairness is considered acceptable for now, because it's better
> than no parallelism at all.  The build and probe phases are run in
> parallel, and the new scan-for-unmatched phase, while serial, is usually
> applied to the smaller of the two relations and is either limited by
> some multiple of work_mem, or it's too big and is partitioned into
> batches and then the situation is improved by batch-level parallelism.
> In future work on deadlock avoidance strategies, we may find a way to
> parallelize the new phase safely.

Is it worth mentioning something about parallel-oblivious parallel hash
join not being able to do this still? Or is that obvious?

>   *
> @@ -2908,6 +3042,12 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, 
> size_t size,
>       chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
>       hashtable->batches[curbatch].shared->chunks = chunk_shared;
>  
> +     /*
> +      * Also make this the head of the work_queue list.  This is used as a
> +      * cursor for scanning all chunks in the batch.
> +      */
> +     hashtable->batches[curbatch].shared->work_queue = chunk_shared;
> +
>       if (size <= HASH_CHUNK_THRESHOLD)
>       {
>               /*
> @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
>       {
>               int                     curbatch = hashtable->curbatch;
>               ParallelHashJoinBatch *batch = 
> hashtable->batches[curbatch].shared;
> +             bool            attached = true;
>  
>               /* Make sure any temporary files are closed. */
>               
> sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
>               
> sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
>  
> -             /* Detach from the batch we were last working on. */
> -             if (BarrierArriveAndDetach(&batch->batch_barrier))
> +             /* After attaching we always get at least to PHJ_BATCH_PROBE. */
> +             Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
> +                        BarrierPhase(&batch->batch_barrier) == 
> PHJ_BATCH_SCAN);
> +
> +             /*
> +              * Even if we aren't doing a full/right outer join, we'll step 
> through
> +              * the PHJ_BATCH_SCAN phase just to maintain the invariant that 
> freeing
> +              * happens in PHJ_BATCH_FREE, but that'll be wait-free.
> +              */
> +             if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)

full/right joins should never fall into this code path, right?

If so, would we be able to assert about that? Maybe it doesn't make
sense, though...

> +                     attached = 
> BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
> +             if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
>               {
>                       /*
> -                      * Technically we shouldn't access the barrier because 
> we're no
> -                      * longer attached, but since there is no way it's 
> moving after
> -                      * this point it seems safe to make the following 
> assertion.
> +                      * We are not longer attached to the batch barrier, but 
> we're the
> +                      * process that was chosen to free resources and it's 
> safe to
> +                      * assert the current phase.  The ParallelHashJoinBatch 
> can't go
> +                      * away underneath us while we are attached to the 
> build barrier,
> +                      * making this access safe.
>                        */
>                       Assert(BarrierPhase(&batch->batch_barrier) == 
> PHJ_BATCH_FREE);

Otherwise, LGTM.

- Melanie


Reply via email to