On Mon, May 25, 2020 at 04:10:45AM +0200, Tomas Vondra wrote:
... parallel queries ================ And now the fun begins ... 1) small one (SSD, max_parallel_workers_per_gather = 2) algorithm master tlist prealloc prealloc+tlist -------------------------------------------------- hash 693 390 177 128 sort 103 99 101 99 This looks pretty nice - the patches have the expected effect, it got faster than with just a single CPU etc. 2) big one (SATA, max_parallel_workers_per_gather = 16) algorithm master tlist prealloc prealloc+tlist -------------------------------------------------- hash ? 25000 ? 3132 sort 248 234 216 200 Well, not that nice :-( The hash queries take so much time that I've decided not to wait for them and the two numbers are actually just estimates (after processing just a couple of logical tapes). Plus it actually gets slower than with serial execution, so what's the problem here? Especially considering it worked OK on the small machine? At first I thought it's something about SSD vs. SATA, but it seems to be more about how we construct the plans, because the plans between the two machines are very different. And it seems to be depend by the number of workers per gather - for low number of workers the plan looks like this (the plans are attached in plans.txt in case the formatting gets broken by your client): QUERY PLAN --------------------------------------------------------------------------------------------------------------- Limit -> Aggregate -> Hash Join Hash Cond: (part.p_partkey = lineitem_1.l_partkey) Join Filter: (lineitem.l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) -> Gather Workers Planned: 2 -> Nested Loop -> Parallel Seq Scan on part Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem Index Cond: (l_partkey = part.p_partkey) -> Hash -> Finalize HashAggregate Group Key: lineitem_1.l_partkey -> Gather Workers Planned: 2 -> Partial HashAggregate Group Key: lineitem_1.l_partkey -> Parallel Seq Scan on lineitem lineitem_1 (20 rows) but then if I crank the number of workers up, it switches to this: QUERY PLAN --------------------------------------------------------------------------------------------------------------------- Limit -> Finalize Aggregate -> Gather Workers Planned: 5 -> Partial Aggregate -> Nested Loop Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join Hash Cond: (part.p_partkey = lineitem_1.l_partkey) -> Parallel Seq Scan on part Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Hash -> HashAggregate Group Key: lineitem_1.l_partkey -> Seq Scan on lineitem lineitem_1 -> Index Scan using idx_lineitem_part_supp on lineitem Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (18 rows) Notice that in the first plan, the hashagg is on top of parallel-aware path - so each workers builds hashagg only on a subset of data, and also spills only a fraction of the input rows (so that all workers combined spill rouhly the "whole" table).
OK, I've done an experiment and re-ran the test with max_parallel_workers_per_gather = 5 which is the highest value still giving the "good" plan, and the results look like this: master tlist prealloc prealloc+tlist ---------------------------------------------------- hash 10535 1044 1723 407 sort 198 196 192 219 which is obviously *way* better than the numbers with more workers:
algorithm master tlist prealloc prealloc+tlist -------------------------------------------------- hash ? 25000 ? 3132 sort 248 234 216 200
It's still ~2x slower than the sort, so presumably we'll need to tweak the costing somehow. I do belive this is still due to differences in I/O patterns, with parallel hashagg probably being a bit more random (I'm deducing that from SSD not being affected by this). I'd imagine this is because given the same work_mem value, sort tends to create "sorted chunks" that are then merged into larger runs, making it more sequential. OTOH hashagg likely makes it more random with smaller work_mem values - more batches making it more interleaved / random. This does not explain why we end up with the "bad" plans, though. Attached are two files showing how the plan changes for different number of workers per gather, both for groupagg and hashagg. For groupagg the plan shape does not change at all, for hashagg it starts as "good" and then between 5 and 6 switches to "bad" one. There's another interesting thing I just noticed - as we increase the number of workers, the cost estimate actually starts to grow at some point: workers | plan cost 0 | 23594267 1 | 20155545 2 | 19785306 5 | 22718176 <- 6 | 23063639 10 | 22990594 12 | 22972363 AFAIK this happens because we pick the number of workers simply based on size of the input relation, which ignores the cost due to sending data from workers to leaders (parallel_tuple_cost). Which in this case is quite significant, because each worker produces large number of groups. I don't think this is causing the issue, though, because the sort plans behave the same way. (I wonder if we could/should consider different number of workers, somehow.) We probably can't see these plans on 12 simply because hashagg would need more memory than work_mem (especially in parallel mode), so we simply reject them. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
test=# set min_parallel_table_scan_size = '1kB'; SET test=# set min_parallel_index_scan_size = '1kB'; SET test=# set work_mem = '128MB'; SET test=# set max_parallel_workers_per_gather = 0; SET test=# set enable_hashagg = off; SET test=# \i explain/17.sql QUERY PLAN ------------------------------------------------------------------------------------------------------------------------ Limit (cost=88495966.59..88495966.60 rows=1 width=32) -> Aggregate (cost=88495966.59..88495966.60 rows=1 width=32) -> Merge Join (cost=83109553.57..88495602.94 rows=145457 width=8) Merge Cond: (lineitem_1.l_partkey = part.p_partkey) Join Filter: (lineitem.l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) -> GroupAggregate (cost=83109552.56..86708927.87 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Sort (cost=83109552.56..84234595.52 rows=450017184 width=9) Sort Key: lineitem_1.l_partkey -> Seq Scan on lineitem lineitem_1 (cost=0.00..12936546.84 rows=450017184 width=9) -> Materialize (cost=1.01..1593253.80 rows=437838 width=21) -> Nested Loop (cost=1.01..1592159.21 rows=437838 width=21) -> Index Scan using part_pkey on part (cost=0.43..689458.90 rows=14594 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..61.55 rows=30 width=17) Index Cond: (l_partkey = part.p_partkey) (16 rows) test=# set max_parallel_workers_per_gather = 1; SET test=# \i explain/17.sql QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=56912954.14..56912954.15 rows=1 width=32) -> Aggregate (cost=56912954.14..56912954.15 rows=1 width=32) -> Merge Join (cost=51350528.65..56912590.49 rows=145457 width=8) Merge Cond: (lineitem_1.l_partkey = part.p_partkey) Join Filter: (lineitem.l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) -> Finalize GroupAggregate (cost=51349527.63..55539987.46 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Gather Merge (cost=51349527.63..55203617.82 rows=14949762 width=36) Workers Planned: 1 -> Partial GroupAggregate (cost=51348527.62..53520769.58 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Sort (cost=51348527.62..52010317.60 rows=264715991 width=9) Sort Key: lineitem_1.l_partkey -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..11083534.91 rows=264715991 width=9) -> Materialize (cost=1001.02..1179181.76 rows=437838 width=21) -> Gather Merge (cost=1001.02..1178087.17 rows=437838 width=21) Workers Planned: 1 -> Nested Loop (cost=1.01..1127830.38 rows=257552 width=21) -> Parallel Index Scan using part_pkey on part (cost=0.43..596812.00 rows=8585 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..61.55 rows=30 width=17) Index Cond: (l_partkey = part.p_partkey) (22 rows) test=# set max_parallel_workers_per_gather = 5; SET test=# \i explain/17.sql QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- Limit (cost=33914092.61..33914092.62 rows=1 width=32) -> Aggregate (cost=33914092.61..33914092.62 rows=1 width=32) -> Merge Join (cost=22328104.16..33913728.97 rows=145457 width=8) Merge Cond: (lineitem_1.l_partkey = part.p_partkey) Join Filter: (lineitem.l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) -> Finalize GroupAggregate (cost=22327103.07..32975474.81 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Gather Merge (cost=22327103.07..32190612.31 rows=74748810 width=36) Workers Planned: 5 -> Partial GroupAggregate (cost=22326103.00..23188000.80 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Sort (cost=22326103.00..22551111.59 rows=90003437 width=9) Sort Key: lineitem_1.l_partkey -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..9336409.37 rows=90003437 width=9) -> Materialize (cost=1001.09..744832.88 rows=437838 width=21) -> Gather Merge (cost=1001.09..743738.29 rows=437838 width=21) Workers Planned: 5 -> Nested Loop (cost=1.01..690011.65 rows=87568 width=21) -> Parallel Index Scan using part_pkey on part (cost=0.43..509459.22 rows=2919 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..61.55 rows=30 width=17) Index Cond: (l_partkey = part.p_partkey) (22 rows) test=# set max_parallel_workers_per_gather = 15; SET test=# \i explain/17.sql QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- Limit (cost=44566651.53..44566651.54 rows=1 width=32) -> Aggregate (cost=44566651.53..44566651.54 rows=1 width=32) -> Merge Join (cost=12830532.44..44566287.88 rows=145457 width=8) Merge Cond: (lineitem_1.l_partkey = part.p_partkey) Join Filter: (lineitem.l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) -> Finalize GroupAggregate (cost=12829531.24..43738926.34 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Gather Merge (cost=12829531.24..41832831.69 rows=224246430 width=36) Workers Planned: 15 -> Partial GroupAggregate (cost=12828530.92..13240411.54 rows=14949762 width=36) Group Key: lineitem_1.l_partkey -> Sort (cost=12828530.92..12903533.79 rows=30001146 width=9) Sort Key: lineitem_1.l_partkey -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..8736386.46 rows=30001146 width=9) -> Materialize (cost=1001.20..633940.27 rows=437838 width=21) -> Gather Merge (cost=1001.20..632845.68 rows=437838 width=21) Workers Planned: 10 -> Nested Loop (cost=1.01..577204.55 rows=43784 width=21) -> Parallel Index Scan using part_pkey on part (cost=0.43..486959.26 rows=1459 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..61.55 rows=30 width=17) Index Cond: (l_partkey = part.p_partkey) (22 rows)
test=# set min_parallel_table_scan_size = '1kB'; SET test=# set min_parallel_index_scan_size = '1kB'; SET test=# set work_mem = '128MB'; SET test=# set max_parallel_workers_per_gather = 0; SET test=# \i explain/17.sql QUERY PLAN ---------------------------------------------------------------------------------------------------------------- Limit (cost=23594267.60..23594267.61 rows=1 width=32) -> Aggregate (cost=23594267.60..23594267.61 rows=1 width=32) -> Nested Loop (cost=20113710.42..23593903.95 rows=145457 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=20113709.84..22724046.56 rows=14545 width=40) Hash Cond: (lineitem_1.l_partkey = part.p_partkey) -> HashAggregate (cost=19581331.82..22002927.78 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Seq Scan on lineitem lineitem_1 (cost=0.00..12936546.84 rows=450017184 width=9) -> Hash (cost=532195.59..532195.59 rows=14594 width=4) -> Seq Scan on part (cost=0.00..532195.59 rows=14594 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (16 rows) test=# set max_parallel_workers_per_gather = 1; SET test=# \i explain/17.sql QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=20155545.75..20155545.76 rows=1 width=32) -> Aggregate (cost=20155545.75..20155545.76 rows=1 width=32) -> Nested Loop (cost=18755543.08..20155182.10 rows=145457 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=18755542.51..19285324.71 rows=14545 width=40) Hash Cond: (lineitem_1.l_partkey = part.p_partkey) -> Finalize HashAggregate (cost=18313351.98..18654393.43 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Gather (cost=14993231.96..17967638.74 rows=14949762 width=36) Workers Planned: 1 -> Partial HashAggregate (cost=14992231.96..16471662.54 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..11083534.91 rows=264715991 width=9) -> Hash (cost=442008.10..442008.10 rows=14594 width=4) -> Gather (cost=1000.00..442008.10 rows=14594 width=4) Workers Planned: 1 -> Parallel Seq Scan on part (cost=0.00..439548.70 rows=8585 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (23 rows) test=# set max_parallel_workers_per_gather = 2; SET test=# \i explain/17.sql QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=19785306.14..19785306.15 rows=1 width=32) -> Aggregate (cost=19785306.14..19785306.15 rows=1 width=32) -> Nested Loop (cost=18268508.46..19784942.50 rows=145457 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=18268507.89..18915085.11 rows=14545 width=40) Hash Cond: (lineitem_1.l_partkey = part.p_partkey) -> Finalize HashAggregate (cost=17864920.23..18322756.69 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Gather (cost=13081107.01..17173493.74 rows=29899524 width=36) Workers Planned: 2 -> Partial HashAggregate (cost=13080107.01..14182541.34 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..10311446.60 rows=187507160 width=9) -> Hash (cost=403405.23..403405.23 rows=14594 width=4) -> Gather (cost=1000.00..403405.23 rows=14594 width=4) Workers Planned: 2 -> Parallel Seq Scan on part (cost=0.00..400945.83 rows=6081 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (23 rows) test=# set max_parallel_workers_per_gather = 5; SET test=# \i explain/17.sql QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- Limit (cost=22718176.57..22718176.59 rows=1 width=32) -> Aggregate (cost=22718176.57..22718176.59 rows=1 width=32) -> Nested Loop (cost=20850993.85..22717812.93 rows=145457 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=20850993.27..21847955.54 rows=14545 width=40) Hash Cond: (lineitem_1.l_partkey = part.p_partkey) -> Finalize HashAggregate (cost=20496155.53..21304377.04 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Gather (cost=10666366.37..18767589.30 rows=74748810 width=36) Workers Planned: 5 -> Partial HashAggregate (cost=10665366.37..11291708.30 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Parallel Seq Scan on lineitem lineitem_1 (cost=0.00..9336409.37 rows=90003437 width=9) -> Hash (cost=354655.32..354655.32 rows=14594 width=4) -> Gather (cost=1000.00..354655.32 rows=14594 width=4) Workers Planned: 5 -> Parallel Seq Scan on part (cost=0.00..352195.92 rows=2919 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (23 rows) test=# set max_parallel_workers_per_gather = 6; SET test=# \i explain/17.sql QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------- Limit (cost=23063639.26..23063639.27 rows=1 width=32) -> Finalize Aggregate (cost=23063639.26..23063639.27 rows=1 width=32) -> Gather (cost=23063638.62..23063639.23 rows=6 width=32) Workers Planned: 6 -> Partial Aggregate (cost=23062638.62..23062638.63 rows=1 width=32) -> Nested Loop (cost=22456094.00..23062578.01 rows=24243 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=22456093.43..22917611.75 rows=2424 width=40) Hash Cond: (part.p_partkey = lineitem_1.l_partkey) -> Parallel Seq Scan on part (cost=0.00..344695.93 rows=2432 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Hash (cost=22152425.40..22152425.40 rows=14949762 width=36) -> HashAggregate (cost=19581331.82..22002927.78 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Seq Scan on lineitem lineitem_1 (cost=0.00..12936546.84 rows=450017184 width=9) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (19 rows) test=# set max_parallel_workers_per_gather = 10; SET test=# \i explain/17.sql QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------- Limit (cost=22990594.48..22990594.50 rows=1 width=32) -> Finalize Aggregate (cost=22990594.48..22990594.50 rows=1 width=32) -> Gather (cost=22990593.42..22990594.43 rows=10 width=32) Workers Planned: 10 -> Partial Aggregate (cost=22989593.42..22989593.43 rows=1 width=32) -> Nested Loop (cost=22456094.00..22989557.05 rows=14546 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=22456093.43..22902601.22 rows=1454 width=40) Hash Cond: (part.p_partkey = lineitem_1.l_partkey) -> Parallel Seq Scan on part (cost=0.00..329695.96 rows=1459 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Hash (cost=22152425.40..22152425.40 rows=14949762 width=36) -> HashAggregate (cost=19581331.82..22002927.78 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Seq Scan on lineitem lineitem_1 (cost=0.00..12936546.84 rows=450017184 width=9) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (19 rows) test=# set max_parallel_workers_per_gather = 32; SET test=# \i explain/17.sql QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------- Limit (cost=22972363.30..22972363.31 rows=1 width=32) -> Finalize Aggregate (cost=22972363.30..22972363.31 rows=1 width=32) -> Gather (cost=22972362.02..22972363.23 rows=12 width=32) Workers Planned: 12 -> Partial Aggregate (cost=22971362.02..22971362.03 rows=1 width=32) -> Nested Loop (cost=22456094.00..22971331.72 rows=12121 width=8) Join Filter: (part.p_partkey = lineitem.l_partkey) -> Hash Join (cost=22456093.43..22898848.59 rows=1212 width=40) Hash Cond: (part.p_partkey = lineitem_1.l_partkey) -> Parallel Seq Scan on part (cost=0.00..325945.97 rows=1216 width=4) Filter: ((p_brand = 'Brand#22'::bpchar) AND (p_container = 'LG BOX'::bpchar)) -> Hash (cost=22152425.40..22152425.40 rows=14949762 width=36) -> HashAggregate (cost=19581331.82..22002927.78 rows=14949762 width=36) Group Key: lineitem_1.l_partkey Planned Partitions: 64 -> Seq Scan on lineitem lineitem_1 (cost=0.00..12936546.84 rows=450017184 width=9) -> Index Scan using idx_lineitem_part_supp on lineitem (cost=0.57..59.68 rows=10 width=17) Index Cond: (l_partkey = lineitem_1.l_partkey) Filter: (l_quantity < ((0.2 * avg(lineitem_1.l_quantity)))) (19 rows)