On Thu, Mar 9, 2017 at 5:28 PM, Thomas Munro <thomas.mu...@enterprisedb.com>
wrote:

> On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <and...@anarazel.de> wrote:
> > 0002: Check hash join work_mem usage at the point of chunk allocation.
> >
> > Modify the existing hash join code to detect work_mem exhaustion at
> > the point where chunks are allocated, instead of checking after every
> > tuple insertion.  This matches the logic used for estimating, and more
> > importantly allows for some parallelism in later patches.
> >
> > diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/
> nodeHash.c
> > index 406c180..af1b66d 100644
> > --- a/src/backend/executor/nodeHash.c
> > +++ b/src/backend/executor/nodeHash.c
> > @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable
> hashtable,
> >                                                 int bucketNumber);
> >  static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
> >
> > -static void *dense_alloc(HashJoinTable hashtable, Size size);
> > +static void *dense_alloc(HashJoinTable hashtable, Size size,
> > +                                                bool respect_work_mem);
> >
> > I still dislike this, but maybe Robert's point of:
> >
> > On 2017-02-16 08:57:21 -0500, Robert Haas wrote:
> >> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <and...@anarazel.de>
> wrote:
> >> > Isn't it kinda weird to do this from within dense_alloc()?  I mean
> that
> >> > dumps a lot of data to disk, frees a bunch of memory and so on - not
> >> > exactly what "dense_alloc" implies.  Isn't the free()ing part also
> >> > dangerous, because the caller might actually use some of that memory,
> >> > like e.g. in ExecHashRemoveNextSkewBucket() or such.  I haven't looked
> >> > deeply enough to check whether that's an active bug, but it seems like
> >> > inviting one if not.
> >>
> >> I haven't looked at this, but one idea might be to just rename
> >> dense_alloc() to ExecHashBlahBlahSomething().  If there's a real
> >> abstraction layer problem here then we should definitely fix it, but
> >> maybe it's just the angle at which you hold your head.
> >
> > Is enough.
>
> There is a problem here.  It can determine that it needs to increase
> the number of batches, effectively splitting the current batch, but
> then the caller goes on to insert the current tuple anyway, even
> though it may no longer belong in this batch.  I will post a fix for
> that soon.  I will also refactor it so that it doesn't do that work
> inside dense_alloc.  You're right, that's too weird.
>
> In the meantime, here is a new patch series addressing the other
> things you raised.
>
> > 0003: Scan for unmatched tuples in a hash join one chunk at a time.
> >
> >
> > @@ -1152,8 +1155,65 @@ bool
> >  ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext
> *econtext)
> >  {
> >         HashJoinTable hashtable = hjstate->hj_HashTable;
> > -       HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> > +       HashJoinTuple hashTuple;
> > +       MinimalTuple tuple;
> > +
> > +       /*
> > +        * First, process the queue of chunks holding tuples that are in
> regular
> > +        * (non-skew) buckets.
> > +        */
> > +       for (;;)
> > +       {
> > +               /* Do we need a new chunk to scan? */
> > +               if (hashtable->current_chunk == NULL)
> > +               {
> > +                       /* Have we run out of chunks to scan? */
> > +                       if (hashtable->unmatched_chunks == NULL)
> > +                               break;
> > +
> > +                       /* Pop the next chunk from the front of the
> queue. */
> > +                       hashtable->current_chunk =
> hashtable->unmatched_chunks;
> > +                       hashtable->unmatched_chunks =
> hashtable->current_chunk->next;
> > +                       hashtable->current_chunk_index = 0;
> > +               }
> > +
> > +               /* Have we reached the end of this chunk yet? */
> > +               if (hashtable->current_chunk_index >=
> hashtable->current_chunk->used)
> > +               {
> > +                       /* Go around again to get the next chunk from
> the queue. */
> > +                       hashtable->current_chunk = NULL;
> > +                       continue;
> > +               }
> > +
> > +               /* Take the next tuple from this chunk. */
> > +               hashTuple = (HashJoinTuple)
> > +                       (hashtable->current_chunk->data +
> hashtable->current_chunk_index);
> > +               tuple = HJTUPLE_MINTUPLE(hashTuple);
> > +               hashtable->current_chunk_index +=
> > +                       MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
> > +
> > +               /* Is it unmatched? */
> > +               if (!HeapTupleHeaderHasMatch(tuple))
> > +               {
> > +                       TupleTableSlot *inntuple;
> > +
> > +                       /* insert hashtable's tuple into exec slot */
> > +                       inntuple = ExecStoreMinimalTuple(tuple,
> > +
>                 hjstate->hj_HashTupleSlot,
> > +
>                 false); /* do not pfree */
> > +                       econtext->ecxt_innertuple = inntuple;
> > +
> > +                       /* reset context each time (see below for
> explanation) */
> > +                       ResetExprContext(econtext);
> > +                       return true;
> > +               }
> > +       }
> >
> > I suspect this might actually be slower than the current/old logic,
> > because the current_chunk tests are repeated every loop. I think
> > retaining the two loops the previous code had makes sense, i.e. one to
> > find a relevant chunk, and one to iterate through all tuples in a chunk,
> > checking for an unmatched one.
>
> Ok, I've updated it to use two loops as suggested.  I couldn't measure
> any speedup as a result but it's probably better code that way.
>
> > Have you run a performance comparison pre/post this patch?  I don't
> > think there'd be a lot, but it seems important to verify that.  I'd just
> > run a tpc-h pre/post comparison (prewarmed, fully cache resident,
> > parallelism disabled, hugepages is my personal recipe for the least
> > run-over-run variance).
>
> I haven't been able to measure any difference in TPCH results yet.  I
> tried to contrive a simple test where there is a measurable
> difference.  I created a pair of tables and repeatedly ran two FULL
> OUTER JOIN queries.  In Q1 no unmatched tuples are found in the hash
> table, and in Q2 every tuple in the hash table turns out to be
> unmatched.  I consistently measure just over 10% improvement.
>
> CREATE TABLE t1 AS
> SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
> ';
>
> CREATE TABLE t2 AS
> SELECT generate_series(10000001, 20000000) AS id,
> 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
>
> SET work_mem = '1GB';
>
> -- Q1
> SELECT COUNT(*)
> FROM t1 FULL OUTER JOIN t1 other USING (id);
>
> -- Q2
> SELECT COUNT(*)
> FROM t1 FULL OUTER JOIN t2 USING (id);
>
> master:                               Q1 = 9.280s, Q2 = 9.645s
> 0003-hj-refactor-unmatched-v6.patch:  Q1 = 8.341s, Q2 = 8.661s
> 0003-hj-refactor-unmatched-v7.patch:  Q1 = 8.186s, Q2 = 8.642s
>
> > 0004: Add a barrier primitive for synchronizing backends.
> >
> >
> > +/*---------------------------------------------------------
> ----------------
> > + *
> > + * barrier.c
> > + *       Barriers for synchronizing cooperating processes.
> > + *
> > + * Copyright (c) 2017, PostgreSQL Global Development Group
> > + *
> > + * This implementation of barriers allows for static sets of
> participants
> > + * known up front, or dynamic sets of participants which processes can
> join
> > + * or leave at any time.  In the dynamic case, a phase number can be
> used to
> > + * track progress through a parallel algorithm; in the static case it
> isn't
> > + * needed.
> >
> > Why would a phase id generally not be needed in the static case?
> > There's also further references to it ("Increments the current phase.")
> > that dont quite jive with that.
>
> I've extended that text at the top to explain.
>
> Short version: there is always a phase internally; that comment refers
> to the need for client code to examine it.  Dynamic barrier users
> probably need to care what it is, since progress can be made while
> they're not attached so they need a way to find out about that after
> they attach, but static barriers generally don't need to care about
> the phase number because nothing can happen without explicit action
> from all participants so they should be in sync automatically.
> Hopefully the new comments explain that better.
>
> > + * IDENTIFICATION
> > + *       src/backend/storage/ipc/barrier.c
> >
> > This could use a short example usage scenario. Without knowing existing
> > usages of the "pattern", it's probably hard to grasp.
>
> Examples added.
>
> > + *-----------------------------------------------------------
> --------------
> > + */
> > +
> > +#include "storage/barrier.h"
> >
> > Aren't you missing an include of postgres.h here?
>
> Fixed.
>
> > +bool
> > +BarrierWait(Barrier *barrier, uint32 wait_event_info)
> > +{
> > +       bool first;
> > +       bool last;
> > +       int start_phase;
> > +       int next_phase;
> > +
> > +       SpinLockAcquire(&barrier->mutex);
> > +       start_phase = barrier->phase;
> > +       next_phase = start_phase + 1;
> > +       ++barrier->arrived;
> > +       if (barrier->arrived == 1)
> > +               first = true;
> > +       else
> > +               first = false;
> > +       if (barrier->arrived == barrier->participants)
> > +       {
> > +               last = true;
> > +               barrier->arrived = 0;
> > +               barrier->phase = next_phase;
> > +       }
> > +       else
> > +               last = false;
> > +       SpinLockRelease(&barrier->mutex);
> >
> > Hm. So what's the defined concurrency protocol for non-static barriers,
> > when they attach after the spinlock here has been released?  I think the
> > concurrency aspects deserve some commentary.  Afaics it'll correctly
> > just count as the next phase - without any blocking - but that shouldn't
> > have to be inferred.
>
> It may join at start_phase or next_phase depending on what happened
> above.  If it we just advanced the phase (by being the last to arrive)
> then another backend that attaches will be joining at phase ==
> next_phase, and if that new backend calls BarrierWait it'll be waiting
> for the phase after that.
>
> > Things might get wonky if that new participant
> > then starts waiting for the new phase, violating the assert below...
>
> > +               Assert(barrier->phase == start_phase || barrier->phase
> == next_phase);
>
> I've added a comment near that assertion that explains the reason the
> assertion holds.
>
> Short version:  The caller is attached, so there is no way for the
> phase to advance beyond next_phase without the caller's participation;
> the only possibilities to consider in the wait loop are "we're still
> waiting" or "the final participant arrived or detached, advancing the
> phase and releasing me".
>
> Put another way, no waiting backend can ever see phase advance beyond
> next_phase, because in order to do so, the waiting backend would need
> to run BarrierWait again; barrier->arrived can never reach
> barrier->participants a second time while we're in that wait loop.
>
> > +/*
> > + * Detach from a barrier.  This may release other waiters from
> BarrierWait and
> > + * advance the phase, if they were only waiting for this backend.
> Return
> > + * true if this participant was the last to detach.
> > + */
> > +bool
> > +BarrierDetach(Barrier *barrier)
> > +{
> > +       bool release;
> > +       bool last;
> > +
> > +       SpinLockAcquire(&barrier->mutex);
> > +       Assert(barrier->participants > 0);
> > +       --barrier->participants;
> > +
> > +       /*
> > +        * If any other participants are waiting and we were the last
> participant
> > +        * waited for, release them.
> > +        */
> > +       if (barrier->participants > 0 &&
> > +               barrier->arrived == barrier->participants)
> > +       {
> > +               release = true;
> > +               barrier->arrived = 0;
> > +               barrier->phase++;
> > +       }
> > +       else
> > +               release = false;
> > +
> > +       last = barrier->participants == 0;
> > +       SpinLockRelease(&barrier->mutex);
> > +
> > +       if (release)
> > +               ConditionVariableBroadcast(&
> barrier->condition_variable);
> > +
> > +       return last;
> > +}
> >
> > Doesn't this, again, run into danger of leading to an assert failure in
> > the loop in BarrierWait?
>
> I believe this code is correct.  The assertion in BarrierWait can't
> fail, because waiters know that there is no way for the phase to get
> any further ahead without their help (because they are attached):
> again, the only possibilities are phase == start_phase (implying that
> they received a spurious condition variable signal) or phase ==
> next_phase (the last backend being waited on has finally arrived or
> detached, allowing other participants to proceed).
>
> I've attached a test module that starts N workers, and makes the
> workers attach, call BarrierWait a random number of times, then
> detach, and then rinse and repeat, until the phase reaches some large
> number and they all exit.  This exercises every interleaving of the
> attach, wait, detach.  CREATE EXTENSION test_barrier, then something
> like SELECT test_barrier_reattach_random(4, 1000000) to verify that no
> assertions are thrown and it always completes.
>
> > +#include "postgres.h"
> >
> > Huh, that normally shouldn't be in a header.  I see you introduced that
> > in a bunch of other places too - that really doesn't look right to me.
>
> Fixed.
>
> In an attempt to test v7 of this patch on TPC-H 20 scale factor I found a
few regressions,
Q21: 52 secs on HEAD and  400 secs with this patch
Q8: 8 secs on HEAD to 14 secs with patch

However, to avoid me being framed as some sort of "jinx" [;)] I'd like to
report a few cases of improvements also,
Q3: improved to 44 secs from 58 secs on HEAD
Q9: 81 secs on HEAD to 48 secs with patch
Q10: improved to 47 secs from 57 secs on HEAD
Q14: 9 secs on HEAD to 5 secs with patch

The details of this experimental setup is as follows,
scale-factor: 20
work_mem = 1GB
shared_buffers = 10GB

For the output plans on head and with patch please find the attached tar
file. In case, you require any more information please let me know.
-- 
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachment: ph.tar.gz
Description: GNU Zip compressed data

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to