В Пн, 24/01/2022 в 16:24 +0300, Yura Sokolov пишет:
> В Вс, 23/01/2022 в 14:56 +0300, Yura Sokolov пишет:
> > В Чт, 20/01/2022 в 09:32 +1300, David Rowley пишет:
> > > On Fri, 31 Dec 2021 at 00:14, Yura Sokolov <y.soko...@postgrespro.ru> 
> > > wrote:
> > > > Suggested quick (and valid) fix in the patch attached:
> > > > - If Append has single child, then copy its parallel awareness.
> > > 
> > > I've been looking at this and I've gone through changing my mind about
> > > what's the right fix quite a number of times.
> > > 
> > > My current thoughts are that I don't really like the fact that we can
> > > have plans in the following shape:
> > > 
> > >  Finalize Aggregate
> > >    ->  Gather
> > >          Workers Planned: 1
> > >          ->  Partial Aggregate
> > >                ->  Parallel Hash Left Join
> > >                      Hash Cond: (gather_append_1.fk = gather_append_2.fk)
> > >                      ->  Index Scan using gather_append_1_ix on 
> > > gather_append_1
> > >                            Index Cond: (f = true)
> > >                      ->  Parallel Hash
> > >                            ->  Parallel Seq Scan on gather_append_2
> > > 
> > > It's only made safe by the fact that Gather will only use 1 worker.
> > > To me, it just seems too fragile to assume that's always going to be
> > > the case. I feel like this fix just relies on the fact that
> > > create_gather_path() and create_gather_merge_path() do
> > > "pathnode->num_workers = subpath->parallel_workers;". If someone
> > > decided that was to work a different way, then we risk this breaking
> > > again. Additionally, today we have Gather and GatherMerge, but we may
> > > one day end up with more node types that gather results from parallel
> > > workers, or even a completely different way of executing plans.
> > 
> > It seems strange parallel_aware and parallel_safe flags neither affect
> > execution nor are properly checked.
> > 
> > Except parallel_safe is checked in ExecSerializePlan which is called from
> > ExecInitParallelPlan, which is called from ExecGather and ExecGatherMerge.
> > But looks like this check doesn't affect execution as well.
> > 
> > > I think a safer way to fix this is to just not remove the
> > > Append/MergeAppend node if the parallel_aware flag of the only-child
> > > and the Append/MergeAppend don't match. I've done that in the
> > > attached.
> > > 
> > > I believe the code at the end of add_paths_to_append_rel() can remain as 
> > > is.
> > 
> > I found clean_up_removed_plan_level also called from 
> > set_subqueryscan_references.
> > Is there a need to patch there as well?
> > 
> > And there is strange state:
> > - in the loop by subpaths, pathnode->node.parallel_safe is set to AND of
> >   all its subpath's parallel_safe
> >   (therefore there were need to copy it in my patch version),
> > - that means, our AppendPath is parallel_aware but not parallel_safe.
> > It is ridiculous a bit.
> > 
> > And it is strange AppendPath could have more parallel_workers than sum of
> > its children parallel_workers.
> > 
> > So it looks like whole machinery around parallel_aware/parallel_safe has
> > no enough consistency.
> > 
> > Either way, I attach you version of fix with my tests as new patch version.
> 
> Looks like volatile "Memory Usage:" in EXPLAIN brokes 'make check'
> sporadically.
> 
> Applied replacement in style of memoize.sql test.
> 
> Why there is no way to disable "Buckets: %d Buffers: %d Memory Usage: %dkB"
> output in show_hash_info?

And another attempt to fix tests volatility.
From fb09491a401f0df828faf6088158f431b2a69381 Mon Sep 17 00:00:00 2001
From: Yura Sokolov <y.soko...@postgrespro.ru>
Date: Sun, 23 Jan 2022 14:53:21 +0300
Subject: [PATCH v4] Fix duplicate result rows after Append path removal.

It could happen Append path is created with "parallel_aware" flag,
but its single child is not. Append path parent (Gather or Gather Merge)
thinks its child is parallel_aware, but after Append path removal Gather's
child become not parallel_aware. Then when Gather/Gather Merge decides
to run child in several workers or worker + leader participation, it
gathers duplicate result rows from several child path invocations.

To fix it don't remove Append/MergeAppend node if it's parallel_aware !=
single child parallel_aware.

Authors: David Rowley, Sokolov Yura.
---
 src/backend/optimizer/plan/setrefs.c          |  24 +++-
 .../expected/gather_removed_append.out        | 126 ++++++++++++++++++
 src/test/regress/parallel_schedule            |   1 +
 .../regress/sql/gather_removed_append.sql     |  94 +++++++++++++
 4 files changed, 241 insertions(+), 4 deletions(-)
 create mode 100644 src/test/regress/expected/gather_removed_append.out
 create mode 100644 src/test/regress/sql/gather_removed_append.sql

diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index e44ae971b4b..a7b11b7f03a 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -1512,8 +1512,16 @@ set_append_references(PlannerInfo *root,
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
 
-	/* Now, if there's just one, forget the Append and return that child */
-	if (list_length(aplan->appendplans) == 1)
+	/*
+	 * See if it's safe to get rid of the Append entirely.  For this to be
+	 * safe, there must be only one child plan and that child plan's parallel
+	 * awareness must match that of the Append's.  The reason for the latter
+	 * is that the if the Append is parallel aware and the child is not then
+	 * the calling plan may execute the non-parallel aware child multiple
+	 * times.
+	 */
+	if (list_length(aplan->appendplans) == 1 &&
+		((Plan *) linitial(aplan->appendplans))->parallel_aware == aplan->plan.parallel_aware)
 		return clean_up_removed_plan_level((Plan *) aplan,
 										   (Plan *) linitial(aplan->appendplans));
 
@@ -1576,8 +1584,16 @@ set_mergeappend_references(PlannerInfo *root,
 		lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset);
 	}
 
-	/* Now, if there's just one, forget the MergeAppend and return that child */
-	if (list_length(mplan->mergeplans) == 1)
+	/*
+	 * See if it's safe to get rid of the MergeAppend entirely.  For this to
+	 * be safe, there must be only one child plan and that child plan's
+	 * parallel awareness must match that of the MergeAppend's.  The reason
+	 * for the latter is that the if the MergeAppend is parallel aware and the
+	 * child is not then the calling plan may execute the non-parallel aware
+	 * child multiple times.
+	 */
+	if (list_length(mplan->mergeplans) == 1 &&
+		((Plan *) linitial(mplan->mergeplans))->parallel_aware == mplan->plan.parallel_aware)
 		return clean_up_removed_plan_level((Plan *) mplan,
 										   (Plan *) linitial(mplan->mergeplans));
 
diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out
new file mode 100644
index 00000000000..849cf0ae97e
--- /dev/null
+++ b/src/test/regress/expected/gather_removed_append.out
@@ -0,0 +1,126 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+NOTICE:  table "gather_append_1" does not exist, skipping
+NOTICE:  table "gather_append_2" does not exist, skipping
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+ANALYZE gather_append_1, gather_append_2;
+SET max_parallel_workers_per_gather = 0;
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+ count 
+-------
+   200
+(1 row)
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines.  Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+    ln text;
+begin
+    for ln in
+        execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+            query)
+    loop
+        if not ln like '%Gather%' then
+            ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L');
+        end if;
+        ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+        ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+        ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+        return next ln;
+    end loop;
+end;
+$$;
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+                                          explain_gather                                          
+--------------------------------------------------------------------------------------------------
+ Gather (actual rows=200 loops=1)
+   Workers Planned: 1
+   Workers Launched: 1
+   ->  Parallel Hash Left Join (actual rows=R loops=L)
+         Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+         ->  Parallel Append (actual rows=R loops=L)
+               ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L)
+                     Index Cond: (f = true)
+         ->  Parallel Hash (actual rows=R loops=L)
+               Buckets: N  Batches: N  Memory Usage: NkB
+               ->  Parallel Seq Scan on gather_append_2 (actual rows=R loops=L)
+(11 rows)
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
+                                             explain_gather                                             
+--------------------------------------------------------------------------------------------------------
+ Gather Merge (actual rows=200 loops=1)
+   Workers Planned: 1
+   Workers Launched: 1
+   ->  Sort (actual rows=R loops=L)
+         Sort Key: gather_append_2.val
+         Sort Method: quicksort  Memory: 25kB
+         Worker 0:  Sort Method: quicksort  Memory: 25kB
+         ->  Parallel Hash Left Join (actual rows=R loops=L)
+               Hash Cond: (gather_append_1.fk = gather_append_2.fk)
+               ->  Parallel Append (actual rows=R loops=L)
+                     ->  Index Scan using gather_append_1_ix on gather_append_1 (actual rows=R loops=L)
+                           Index Cond: (f = true)
+               ->  Parallel Hash (actual rows=R loops=L)
+                     Buckets: N  Batches: N  Memory Usage: NkB
+                     ->  Parallel Seq Scan on gather_append_2 (actual rows=R loops=L)
+(15 rows)
+
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5b0c73d7e37..84f2f81255d 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
 test: select_parallel
 test: write_parallel
 test: vacuum_parallel
+test: gather_removed_append
 
 # no relation related tests can be put in this group
 test: publication subscription
diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql
new file mode 100644
index 00000000000..3af88b29f0a
--- /dev/null
+++ b/src/test/regress/sql/gather_removed_append.sql
@@ -0,0 +1,94 @@
+-- Test correctness of parallel query execution after removal
+-- of Append path due to single non-trivial child.
+
+DROP TABLE IF EXISTS gather_append_1, gather_append_2;
+
+CREATE TABLE gather_append_1 (
+    fk int,
+    f bool
+);
+
+INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i;
+
+CREATE INDEX gather_append_1_ix on gather_append_1 (f);
+
+CREATE TABLE gather_append_2 (
+    fk int,
+    val serial
+);
+
+INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i;
+
+ANALYZE gather_append_1, gather_append_2;
+
+SET max_parallel_workers_per_gather = 0;
+
+-- Find correct rows count
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0.1;
+SET min_parallel_table_scan_size = 0;
+SET max_parallel_workers_per_gather = 2;
+
+SELECT count(1)
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);
+
+-- The buckets/batches/memory values from the Parallel Hash node can vary between
+-- machines.  Let's just replace the number with an 'N'.
+create function explain_gather(query text) returns setof text
+language plpgsql as
+$$
+declare
+    ln text;
+begin
+    for ln in
+        execute format('explain (analyze, costs off, summary off, timing off, verbose off, buffers off) %s',
+            query)
+    loop
+        if not ln like '%Gather%' then
+            ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=R loops=L');
+        end if;
+        ln := regexp_replace(ln, 'Buckets: \d+', 'Buckets: N');
+        ln := regexp_replace(ln, 'Batches: \d+', 'Batches: N');
+        ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N');
+        return next ln;
+    end loop;
+end;
+$$;
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk);');
+
+-- Result rows in root node should be equal to non-parallel count
+SELECT explain_gather('
+SELECT val
+FROM (
+  SELECT fk FROM gather_append_1 WHERE f
+  UNION ALL
+  SELECT fk FROM gather_append_1 WHERE false
+) as t
+LEFT OUTER JOIN gather_append_2
+USING (fk)
+ORDER BY val;');
-- 
2.34.1

Reply via email to