Here's a new rebased and debugged patch set. On Tue, Aug 1, 2017 at 1:11 PM, Andres Freund <and...@anarazel.de> wrote: > - Echoing concerns from other threads (Robert: ping): I'm doubtful that > it makes sense to size the number of parallel workers solely based on > the parallel scan node's size. I don't think it's this patch's job to > change that, but to me it seriously amplifys that - I'd bet there's a > lot of cases with nontrivial joins where the benefit from parallelism > on the join level is bigger than on the scan level itself. And the > number of rows in the upper nodes might also be bigger than on the > scan node level, making it more important to have higher number of > nodes.
Agreed that this is bogus. The number of workers is really determined by the outer path (the probe side), except that if the inner path (the build side) is not big enough to warrant parallel workers at all then parallelism is inhibited on that side. That prevents small tables from being loaded by Parallel Hash. That is something we want, but it's probably not doing it for the right reasons with the right threshold -- about which more below. > - If I understand the code in initial_cost_hashjoin() correctly, we > count the synchronization overhead once, independent of the number of > workers. But on the other hand we calculate the throughput by > dividing by the number of workers. Do you think that's right? It's how long you think the average participant will have to wait for the last participant to arrive, and I think that's mainly determined by the parallel grain, not the number of workers. If you're a work that has reached the end of a scan, the best case is that every other worker has already reached the end too and the worst case is that another worker read the last granule (currently page) just before you hit the end, so you'll have to wait for it to process a granule's worth of work. To show this I used dtrace to measure the number of microseconds spent waiting at the barrier before probing while running a 5 million row self-join 100 times, and got the following histograms: 1 worker: value ------------- Distribution ------------- count < 0 | 0 0 |@@@@@@@@@@@@@@@@@@@@@@ 110 20 | 1 40 |@ 5 60 |@@@@@ 24 80 |@@@ 14 100 |@ 5 120 |@@@ 16 140 |@@@ 17 160 |@@ 8 180 | 0 2 workers: value ------------- Distribution ------------- count < 0 | 0 0 |@@@@@@@@@@@@@@ 107 20 | 1 40 |@@@ 21 60 |@@@ 25 80 |@@ 16 100 |@@ 14 120 |@@@@@ 38 140 |@@@@@@@ 51 160 |@@@ 20 180 | 3 200 | 1 220 | 3 240 | 0 3 workers: value ------------- Distribution ------------- count < 0 | 0 0 |@@@@@@@@@@@ 113 20 |@@ 15 40 |@@@ 29 60 |@@@@ 35 80 |@@@@ 37 100 |@@@@@@ 56 120 |@@@@@ 51 140 |@@@ 31 160 |@@ 21 180 |@ 6 4 workers: value ------------- Distribution ------------- count < 0 | 0 0 |@@@@@@@@@@ 121 20 | 4 40 |@@@ 39 60 |@@ 29 80 |@@ 24 100 |@@@@@@@ 88 120 |@@@@@@@ 82 140 |@@@@@ 58 160 |@@ 26 180 |@ 15 200 |@ 9 220 | 4 240 | 1 260 | 0 I didn't know what to expect above my machine's core count of 4, but this is for 8: value ------------- Distribution ------------- count < 0 | 0 0 |@@@@@ 116 20 | 2 40 |@@ 36 60 |@@@ 69 80 |@@@@ 95 100 |@@@@@ 113 120 |@@@ 74 140 |@@@ 71 160 |@@ 44 180 |@@ 36 200 |@ 30 220 |@ 14 240 |@ 18 260 | 8 280 | 3 300 | 4 It's true that the fraction of waits that go into the 0-20us bucket (because the last to arrive at a barrier doesn't have to wait at all) decreases as you add more workers, but above 1 worker the main story is the bell curve (?) we see clustered around 100-120us, and it doesn't seem to be moving. If we call the fraction of samples outside the 0-20us bucket "wait_probability" and call their average wait time "expected_wait_cost", then one way to estimate this is something like: wait_probability * expected_wait_cost = (1 - 1 / participants) * (tuples_per_grain * cost_per_tuple * 0.5) I don't think we can do that today, because we don't have access to tuples_per_grain from the subplan. That would in theory come ultimately from the scan, adjusted as appropriate by selectivity estimates. The grain could in future be more than one page at a time as proposed by David Rowley and others, or "it's complicated" for a Parallel Append. But I'm not sure if that's correct, doable or worth doing, hence my attempt to provide a single knob to model this for now. I did some experiments to find a value of parallel_synchronization_cost that avoids Parallel Hash when it won't pay off, like this: * a "big" table with 1 million rows to be the outer relation * a "small" table with a range of sizes from 5k to 100k rows to hash * both tables have a unique integer key "a" and a 60 byte text column "b" * query (a): SELECT COUNT(*) FROM big JOIN small USING (a) * query (b): ... WHERE length(small.b) * 2 - len(small.b) = length(small.b) * work_mem set high enough that we never have multiple batches * one warmup run and then the median of 3 measurements * all default except min_parallel_table_scan_size = 0 * 4 core developer machine * -O2, no asserts Just to be clear: The following number aren't supposed to be impressive and are way shorter than the queries that Parallel Hash feature is really intended to help with. That's because we're searching for the threshold below which Parallel Hash *doesn't* help, and that involves running queries where there isn't much to hash. The times are for the complete query (ie include probing too, not just the hash table build), and show "parallel-oblivious-hash-join-time -> parallel-aware-hash-join-time" for queries "a" and "b" on patched master. I also compared with unpatched master to confirm that the parallel-oblivious times on the left of the arrows match unpatched master's, modulo a bit of noise. 1 worker: 5,000 rows hashed: (a) 157ms -> 166ms, (b) 166ms -> 183ms 7,500 rows hashed: (a) 162ms -> 174ms, (b) 176ms -> 182ms 10,000 rows hashed: (a) 161ms -> 170ms, (b) 181ms -> 210ms 12,500 rows hashed: (a) 169ms -> 175ms, (b) 194ms -> 188ms 15,000 rows hashed: (a) 175ms -> 181ms, (b) 199ms -> 195ms 17,500 rows hashed: (a) 173ms -> 175ms, (b) 201ms -> 202ms 20,000 rows hashed: (a) 179ms -> 179ms, (b) 210ms -> 195ms <== a & b threshold 30,000 rows hashed: (a) 196ms -> 192ms, (b) 244ms -> 218ms 40,000 rows hashed: (a) 201ms -> 197ms, (b) 265ms -> 228ms 50,000 rows hashed: (a) 217ms -> 251ms, (b) 294ms -> 249ms 60,000 rows hashed: (a) 228ms -> 222ms, (b) 324ms -> 268ms 70,000 rows hashed: (a) 230ms -> 214ms, (b) 338ms -> 275ms 80,000 rows hashed: (a) 243ms -> 229ms, (b) 366ms -> 291ms 90,000 rows hashed: (a) 256ms -> 239ms, (b) 391ms -> 311ms 100,000 rows hashed: (a) 266ms -> 248ms, (b) 420ms -> 326ms 2 workers: 5,000 rows hashed: (a) 110ms -> 115ms, (b) 118ms -> 127ms 7,500 rows hashed: (a) 115ms -> 128ms, (b) 131ms -> 128ms 10,000 rows hashed: (a) 114ms -> 116ms, (b) 135ms -> 148ms 12,500 rows hashed: (a) 126ms -> 126ms, (b) 145ms -> 131ms 15,000 rows hashed: (a) 134ms -> 142ms, (b) 151ms -> 134ms 17,500 rows hashed: (a) 125ms -> 122ms, (b) 153ms -> 147ms <== a & b threshold 20,000 rows hashed: (a) 126ms -> 124ms, (b) 160ms -> 136ms 30,000 rows hashed: (a) 144ms -> 132ms, (b) 191ms -> 152ms 40,000 rows hashed: (a) 165ms -> 151ms, (b) 213ms -> 158ms 50,000 rows hashed: (a) 161ms -> 143ms, (b) 240ms -> 171ms 60,000 rows hashed: (a) 171ms -> 150ms, (b) 266ms -> 186ms 70,000 rows hashed: (a) 176ms -> 151ms, (b) 283ms -> 190ms 80,000 rows hashed: (a) 181ms -> 156ms, (b) 315ms -> 204ms 90,000 rows hashed: (a) 189ms -> 164ms, (b) 338ms -> 214ms 100,000 rows hashed: (a) 207ms -> 177ms, (b) 362ms -> 232ms 3 workers: 5,000 rows hashed: (a) 90ms -> 103ms, (b) 107ms -> 118ms 7,500 rows hashed: (a) 106ms -> 104ms, (b) 115ms -> 118ms 10,000 rows hashed: (a) 100ms -> 95ms, (b) 121ms -> 110ms <== b threshold 12,500 rows hashed: (a) 103ms -> 120ms, (b) 134ms -> 113ms 15,000 rows hashed: (a) 134ms -> 110ms, (b) 142ms -> 116ms <== a threshold 17,500 rows hashed: (a) 110ms -> 104ms, (b) 146ms -> 123ms 20,000 rows hashed: (a) 107ms -> 103ms, (b) 151ms -> 120ms 30,000 rows hashed: (a) 124ms -> 110ms, (b) 183ms -> 135ms 40,000 rows hashed: (a) 125ms -> 108ms, (b) 209ms -> 137ms 50,000 rows hashed: (a) 133ms -> 115ms, (b) 238ms -> 150ms 60,000 rows hashed: (a) 143ms -> 119ms, (b) 266ms -> 159ms 70,000 rows hashed: (a) 146ms -> 120ms, (b) 288ms -> 165ms 80,000 rows hashed: (a) 150ms -> 129ms, (b) 316ms -> 176ms 90,000 rows hashed: (a) 159ms -> 126ms, (b) 343ms -> 187ms 100,000 rows hashed: (a) 176ms -> 136ms, (b) 370ms -> 195ms 4 workers: 5,000 rows hashed: (a) 93ms -> 103ms, (b) 109ms -> 117ms 7,500 rows hashed: (a) 106ms -> 102ms, (b) 121ms -> 115ms <== b threshold 10,000 rows hashed: (a) 99ms -> 100ms, (b) 126ms -> 113ms 12,500 rows hashed: (a) 107ms -> 102ms, (b) 137ms -> 117ms <== a threshold 15,000 rows hashed: (a) 111ms -> 107ms, (b) 145ms -> 115ms 17,500 rows hashed: (a) 110ms -> 10ms, (b) 151ms -> 118ms 20,000 rows hashed: (a) 108ms -> 103ms, (b) 160ms -> 120ms 30,000 rows hashed: (a) 120ms -> 108ms, (b) 196ms -> 127ms 40,000 rows hashed: (a) 129ms -> 109ms, (b) 225ms -> 134ms 50,000 rows hashed: (a) 140ms -> 121ms, (b) 262ms -> 148ms 60,000 rows hashed: (a) 152ms -> 123ms, (b) 294ms -> 154ms 70,000 rows hashed: (a) 157ms -> 122ms, (b) 322ms -> 165ms 80,000 rows hashed: (a) 154ms -> 138ms, (b) 372ms -> 201ms 90,000 rows hashed: (a) 186ms -> 122ms, (b) 408ms -> 180ms 100,000 rows hashed: (a) 170ms -> 124ms, (b) 421ms -> 186ms I found that a good value of parallel_synchronization_cost that enables Parallel Hash somewhere around those thresholds is 250 for these test queries, so I have set that as the default in the new patch set. All of this might be considered moot, because I still needed to frob min_parallel_table_scan_size to get a Parallel Hash below 90,0000 rows anyway due to the policies in compute_parallel_worker(). So really there is no danger of tables like the TPC-H "nation" and "region" tables being loaded by Parallel Hash even if you set parallel_synchronization_cost to 0, and probably no reason to worry to much about its default value for now. It could probably be argued that we shouldn't have the GUC at all, but at least it provides a handy way to enable and disable Parallel Hash! One hidden factor here is that it takes a while for workers to start up and the leader can scan thousands of rows before they arrive. This effect will presumably be exaggerated on systems with slow fork/equivalent (Windows, some commercial Unices IIRC), and minimised by someone writing a patch to reuse parallel workers. I haven't tried to investigate that effect because it doesn't seem very interesting or likely to persist but it may contribute the experimental thresholds I observed. > - I haven't really grokked the deadlock issue you address. Could you > expand the comments on that? Possibly somewhere central referenced by > the various parts. The central place is leader_gate.c. What's wrong with the explanation in there? Let me restate the problem here, and the three solutions I considered: Problem: The leader must never be allowed to wait for other participants that have emitted tuples (it doesn't matter whether that waiting takes the form of latches, condition variables, barriers, shm_queues or anything else). Any participant that has emitted tuples might currently be blocked waiting for the leader to drain the tuple queue, so a deadlock could be created. Concrete example: In this case, once we get past PHJ_PHASE_PROBING we have to allow only the leader or the workers to continue. Otherwise some worker might be trying to finish probing by emitting tuples, while the leader might be in BarrierWait() waiting for everyone to finish probing. This problems affects only outer joins (they have wait to start PHJ_PHASE_UNMATCHED after probing) and multibatch joins (they wait to be able to load the next batch). Solution 1: LeaderGate is a simple mechanism for reaching consensus on whether the leader or a set of workers will be allowed to run after a certain point, in this case the end of probing. Concretely this means that either the leader or any workers will drop out early at that point, leaving nothing left to do. This is made slightly more complicated by the fact that we don't know up front if there are any workers yet. Solution 2: Teach tuple queues to spill to disk instead of blocking when full. I think this behaviour should probably only be activated while the leader is running the plan rather than draining tuple queues; the current block-when-full behaviour would still be appropriate if the leader is simply unable to drain queues fast enough. Then the deadlock risk would go away. Solution 3: An asynchronous executor model where you don't actually wait synchronously at barriers -- instead you detach and go and do something else, but come back and reattach when there is progress to be made. I have some ideas about that but they are dependent on the async execution project reaching a fairly advanced state first. When I wrote it, I figured that leader_gate.c was cheap and would do for now, but I have to admit that it's quite confusing and it sucks that later batches lose a core. I'm now thinking that 2 may be a better idea. My first thought is that Gather needs a way to advertise that it's busy while running the plan, shm_mq needs a slightly different all-or-nothing nowait mode, and TupleQueue needs to write to a shared tuplestore or other temp file-backed mechanism when appropriate. Thoughts? > - maybe I'm overly paranoid, but it might not be bad to add some extra > checks for ExecReScanHashJoin ensuring that it doesn't get called when > workers are still doing something. Check out ExecReScanGather(): it shuts down and waits for all workers to complete, which makes the assumptions in ExecReScanHashJoin() true. If a node below Gather but above Hash Join could initiate a rescan then the assumptions would not hold. I am not sure what it would mean though and we don't generate any such plans today to my knowledge. It doesn't seem to make sense for the inner side of Nested Loop to be partial. Have I missed something here? It looks like some details may have changed here due to 41b0dd98 and nearby commits, and I may need to implement at least ReInitializeDSM. I also need a regression test to hit the rescan but I'm not sure how to write one currently. In an earlier version of this patch set I could do it by setting shared_tuple_cost (a GUC I no longer have) to a negative number, which essentially turned our optimiser into a pessimiser capable of producing a nested loop that rescans a gather node, forking workers for every row... > - seems like you're dereffing tuple unnecessarily here: > > + tuple = (HashJoinTuple) > + dsa_get_address(hashtable->area, > detached_chain_shared); > + ExecHashTransferSkewTuples(hashtable, detached_chain, Yes, here lurked a bug, fixed. > - The names here could probably improved some: > + case WAIT_EVENT_HASH_SHRINKING1: > + event_name = "Hash/Shrinking1"; > + break; > + case WAIT_EVENT_HASH_SHRINKING2: > + event_name = "Hash/Shrinking2"; > + break; > + case WAIT_EVENT_HASH_SHRINKING3: > + event_name = "Hash/Shrinking3"; > + break; > + case WAIT_EVENT_HASH_SHRINKING4: > + event_name = "Hash/Shrinking4"; Fixed. > - why are we restricting rows_total bit to parallel aware? > > + /* > + * If parallel-aware, the executor will also need an estimate of the > total > + * number of rows expected from all participants so that it can size > the > + * shared hash table. > + */ > + if (best_path->jpath.path.parallel_aware) > + { > + hash_plan->plan.parallel_aware = true; > + hash_plan->rows_total = best_path->inner_rows_total; > + } > + I could set it unconditionally and then skip this bit that receives the number: rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows; Do you think it would be better to push plan_rows_total into Plan instead? > - seems we need a few more test - I don't think the existing tests are > properly going to exercise the skew stuff, multiple batches, etc? > This is nontrivial code, I'd really like to see a high test coverage > of the new code. I've added some regression tests in a patch to apply before making any changes. You would have to change the "explain (costs off)" to "explain analyze" to verify the claims I put in comments about the number of batches and peak memory usage in the work_mem management tests. I chose to put them into join.sql, and then a later patch adds parallel-aware versions. (An alternative would be to put them into select.sql and select_parallel.sql, but it seemed better to keep the non-parallel, parallel with parallel-oblivious join and parallel-aware cases next to each other.) While testing I found a timing bug that could produce incorrect query results because of the empty hash table optimisation, because it had an incorrect value hashtable->totalTuples == 0. Fixed (see code in the "finish:" case in MultiExecHash()). Last week I finally figured out a way to test different startup timings, considering the complexity created by the "flash mob" problem I described when I first proposed dynamic barriers[1]. If you build with -DBARRIER_DEBUG in the attached patch set you get a new GUC "barrier_attach_sequence" which you can set like this: SET barrier_attach_phases = 'HashJoin.barrier:2,7,0'; That list of number tells it which phase each participant should simulate attaching at. In that example the leader will attach at phase 2 (PHJ_PHASE_BUILDING), worker 0 will attach at 7 (PHJ_PHASE_RESETTING_BATCH(1)) and worker 1 will attach at 0 (PHJ_PHASE_BEGINNING). Note that *someone* has to start at 0 or bad things will happen. Using this technique I can now use simple scripts to test every case in the switch statements that appear in three places in the patch. See attached file parallel-hash-attach-phases.sql. I'm not sure whether, and if so how, to package any such tests for the regression suite, since they require a special debug build. Ideally I would also like to find a way to tell Gather not to run the plan in the leader (a bit like single_copy mode, except allowing multiple workers to run the plan, and raising an error out if no workers could be launched). > - might not hurt to reindent before the final submission Will do. > - Unsurprisingly, please implement the FIXME ;) This must refer to a note about cleaning up skew buckets after they're not needed, which I've now done. Some other things: Previously I failed to initialise the atomics in the shared skew hash table correctly, and also I used memset to overwrite atomics when loading a new batch. This worked on modern systems but would of course fail when using emulated atomics. Fixed in the attached. In the process I discovered that initialising and clearing large hash tables this way is quite a lot slower than memset on my machine under simple test conditions. I think it might be worth experimenting with a array-oriented atomic operations that have a specialisation for 0 that just uses memset if it can (something like pg_atomic_init_u64_array(base, stride, n, 0)). I also think it may be interesting to parallelise the initialisation and reset of the hash table, since I've seen cases where I have 7 backends waiting on a barrier while one initialises a couple of GB of memory for several seconds. Those are just small optimisations though and I'm not planning to investigate them until after the basic patch is in committable form. [1] https://www.postgresql.org/message-id/CAEepm=3yj65sqzuahff3s7ufev83x_rnh5a4-jxmqxgqrq+...@mail.gmail.com -- Thomas Munro http://www.enterprisedb.com
parallel-hash-v19.patchset.tgz
Description: GNU Zip compressed data
parallel-hash-attach-phases.sql
Description: Binary data
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers