В Пн, 24/01/2022 в 16:24 +0300, Yura Sokolov пишет: > В Вс, 23/01/2022 в 14:56 +0300, Yura Sokolov пишет: > > В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет: > > > On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.soko...@postgrespro.ru> > > > wrote: > > > > Suggested quick (and valid) fix in the patch attached: > > > > - If Append has single child, then copy its parallel awareness. > > > > > > I've been looking at this and I've gone through changing my mind about > > > what's the right fix quite a number of times. > > > > > > My current thoughts are that I don't really like the fact that we can > > > have plans in the following shape: > > > > > > Finalize Aggregate > > > -> Gather > > > Workers Planned: 1 > > > -> Partial Aggregate > > > -> Parallel Hash Left Join > > > Hash Cond: (gather_append_1.fk = gather_append_2.fk) > > > -> Index Scan using gather_append_1_ix on > > > gather_append_1 > > > Index Cond: (f = true) > > > -> Parallel Hash > > > -> Parallel Seq Scan on gather_append_2 > > > > > > It's only made safe by the fact that Gather will only use 1 worker. > > > To me, it just seems too fragile to assume that's always going to be > > > the case. I feel like this fix just relies on the fact that > > > create_gather_path() and create_gather_merge_path() do > > > "pathnode->num_workers = subpath->parallel_workers;". If someone > > > decided that was to work a different way, then we risk this breaking > > > again. Additionally, today we have Gather and GatherMerge, but we may > > > one day end up with more node types that gather results from parallel > > > workers, or even a completely different way of executing plans. > > > > It seems strange parallel_aware and parallel_safe flags neither affect > > execution nor are properly checked. > > > > Except parallel_safe is checked in ExecSerializePlan which is called from > > ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge. > > But looks like this check doesn't affect execution as well. > > > > > I think a safer way to fix this is to just not remove the > > > Append/MergeAppend node if the parallel_aware flag of the only-child > > > and the Append/MergeAppend don't match. I've done that in the > > > attached. > > > > > > I believe the code at the end of add_paths_to_append_rel() can remain as > > > is. > > > > I found clean_up_removed_plan_level also called from > > set_subqueryscan_references. > > Is there a need to patch there as well? > > > > And there is strange state: > > - in the loop by subpaths, pathnode->node.parallel_safe is set to AND of > > all its subpath's parallel_safe > > (therefore there were need to copy it in my patch version), > > - that means, our AppendPath is parallel_aware but not parallel_safe. > > It is ridiculous a bit. > > > > And it is strange AppendPath could have more parallel_workers than sum of > > its children parallel_workers. > > > > So it looks like whole machinery around parallel_aware/parallel_safe has > > no enough consistency. > > > > Either way, I attach you version of fix with my tests as new patch version. > > Looks like volatile "Memory Usage:" in EXPLAIN brokes 'make check' > sporadically. > > Applied replacement in style of memoize.sql test. > > Why there is no way to disable "Buckets: %d Buffers: %d Memory Usage: %dkB" > output in show_hash_info?
And another attempt to fix tests volatility.
From fb09491a401f0df828faf6088158f431b2a69381 Mon Sep 17 00:00:00 2001 From: Yura Sokolov <y.soko...@postgrespro.ru> Date: Sun, 23 Jan 2022 14:53:21 +0300 Subject: [PATCH v4] Fix duplicate result rows after Append path removal. It could happen Append path is created with "parallel_aware" flag, but its single child is not. Append path parent (Gather or Gather Merge) thinks its child is parallel_aware, but after Append path removal Gather's child become not parallel_aware. Then when Gather/Gather Merge decides to run child in several workers or worker + leader participation, it gathers duplicate result rows from several child path invocations. To fix it don't remove Append/MergeAppend node if it's parallel_aware != single child parallel_aware. Authors: David Rowley, Sokolov Yura. --- src/backend/optimizer/plan/setrefs.c | 24 +++- .../expected/gather_removed_append.out | 126 ++++++++++++++++++ src/test/regress/parallel_schedule | 1 + .../regress/sql/gather_removed_append.sql | 94 +++++++++++++ 4 files changed, 241 insertions(+), 4 deletions(-) create mode 100644 src/test/regress/expected/gather_removed_append.out create mode 100644 src/test/regress/sql/gather_removed_append.sql diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index e44ae971b4b..a7b11b7f03a 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root, lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } - /* Now, if there's just one, forget the Append and return that child */ - if (list_length(aplan->appendplans) == 1) + /* + * See if it's safe to get rid of the Append entirely. For this to be + * safe, there must be only one child plan and that child plan's parallel + * awareness must match that of the Append's. The reason for the latter + * is that the if the Append is parallel aware and the child is not then + * the calling plan may execute the non-parallel aware child multiple + * times. + */ + if (list_length(aplan->appendplans) == 1 && + ((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware) return clean_up_removed_plan_level((Plan *) aplan, (Plan *) linitial(aplan->appendplans)); @@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root, lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } - /* Now, if there's just one, forget the MergeAppend and return that child */ - if (list_length(mplan->mergeplans) == 1) + /* + * See if it's safe to get rid of the MergeAppend entirely. For this to + * be safe, there must be only one child plan and that child plan's + * parallel awareness must match that of the MergeAppend's. The reason + * for the latter is that the if the MergeAppend is parallel aware and the + * child is not then the calling plan may execute the non-parallel aware + * child multiple times. + */ + if (list_length(mplan->mergeplans) == 1 && + ((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware) return clean_up_removed_plan_level((Plan *) mplan, (Plan *) linitial(mplan->mergeplans)); diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out new file mode 100644 index 00000000000..849cf0ae97e --- /dev/null +++ b/src/test/regress/expected/gather_removed_append.out @@ -0,0 +1,126 @@ +-- Test correctness of parallel query execution after removal +-- of Append path due to single non-trivial child. +DROP TABLE IF EXISTS gather_append_1, gather_append_2; +NOTICE: table "gather_append_1" does not exist, skipping +NOTICE: table "gather_append_2" does not exist, skipping +CREATE TABLE gather_append_1 ( + fk int, + f bool +); +INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i; +CREATE INDEX gather_append_1_ix on gather_append_1 (f); +CREATE TABLE gather_append_2 ( + fk int, + val serial +); +INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i; +ANALYZE gather_append_1, gather_append_2; +SET max_parallel_workers_per_gather = 0; +-- Find correct rows count +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + count +------- + 200 +(1 row) + +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0.1; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + count +------- + 200 +(1 row) + +-- The buckets/batches/memory values from the Parallel Hash node can vary between +-- machines. Let's just replace the number with an 'N'. +create function explain_gather(query text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in + execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s', + query) + loop + if not ln like '%Gather%' then + ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L'); + end if; + ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N'); + ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); + return next ln; + end loop; +end; +$$; +-- Result rows in root node should be equal to non-parallel count +SELECT explain_gather(' +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk);'); + explain_gather +-------------------------------------------------------------------------------------------------- + Gather (actual rows=200 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Parallel Hash Left Join (actual rows=R loops=L) + Hash Cond: (gather_append_1.fk = gather_append_2.fk) + -> Parallel Append (actual rows=R loops=L) + -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L) + Index Cond: (f = true) + -> Parallel Hash (actual rows=R loops=L) + Buckets: N Batches: N Memory Usage: NkB + -> Parallel Seq Scan on gather_append_2 (actual rows=R loops=L) +(11 rows) + +-- Result rows in root node should be equal to non-parallel count +SELECT explain_gather(' +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val;'); + explain_gather +-------------------------------------------------------------------------------------------------------- + Gather Merge (actual rows=200 loops=1) + Workers Planned: 1 + Workers Launched: 1 + -> Sort (actual rows=R loops=L) + Sort Key: gather_append_2.val + Sort Method: quicksort Memory: 25kB + Worker 0: Sort Method: quicksort Memory: 25kB + -> Parallel Hash Left Join (actual rows=R loops=L) + Hash Cond: (gather_append_1.fk = gather_append_2.fk) + -> Parallel Append (actual rows=R loops=L) + -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L) + Index Cond: (f = true) + -> Parallel Hash (actual rows=R loops=L) + Buckets: N Batches: N Memory Usage: NkB + -> Parallel Seq Scan on gather_append_2 (actual rows=R loops=L) +(15 rows) + diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 5b0c73d7e37..84f2f81255d 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 test: select_parallel test: write_parallel test: vacuum_parallel +test: gather_removed_append # no relation related tests can be put in this group test: publication subscription diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql new file mode 100644 index 00000000000..3af88b29f0a --- /dev/null +++ b/src/test/regress/sql/gather_removed_append.sql @@ -0,0 +1,94 @@ +-- Test correctness of parallel query execution after removal +-- of Append path due to single non-trivial child. + +DROP TABLE IF EXISTS gather_append_1, gather_append_2; + +CREATE TABLE gather_append_1 ( + fk int, + f bool +); + +INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i; + +CREATE INDEX gather_append_1_ix on gather_append_1 (f); + +CREATE TABLE gather_append_2 ( + fk int, + val serial +); + +INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i; + +ANALYZE gather_append_1, gather_append_2; + +SET max_parallel_workers_per_gather = 0; + +-- Find correct rows count +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0.1; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; + +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + +-- The buckets/batches/memory values from the Parallel Hash node can vary between +-- machines. Let's just replace the number with an 'N'. +create function explain_gather(query text) returns setof text +language plpgsql as +$$ +declare + ln text; +begin + for ln in + execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s', + query) + loop + if not ln like '%Gather%' then + ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L'); + end if; + ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N'); + ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N'); + ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); + return next ln; + end loop; +end; +$$; + +-- Result rows in root node should be equal to non-parallel count +SELECT explain_gather(' +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk);'); + +-- Result rows in root node should be equal to non-parallel count +SELECT explain_gather(' +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val;'); -- 2.34.1