Hi,

This patch hasn't been updated since September, and it got broken by
4a29eabd1d91c5484426bc5836e0a7143b064f5a which the incremental sort
stuff a little bit. But the breakage was rather limited, so I took a
stab at fixing it - attached is the result, hopefully correct.

I also added a couple minor comments about stuff I noticed while
rebasing and skimming the patch, I kept those in separate commits.
There's also a couple pre-existing TODOs.

James, what's your plan with this patch. Do you intend to work on it for
PG16, or are there some issues I missed in the thread?


One of the queries in in incremental_sort changed plans a little bit:

explain (costs off) select distinct
  unique1,
  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
from tenk1 t, generate_series(1, 1000);

switched from

 Unique  (cost=18582710.41..18747375.21 rows=10000 width=8)
   ->  Gather Merge  (cost=18582710.41..18697375.21 rows=10000000 ...)
         Workers Planned: 2
         ->  Sort  (cost=18582710.39..18593127.06 rows=4166667 ...)
               Sort Key: t.unique1, ((SubPlan 1))
             ...

to

 Unique  (cost=18582710.41..18614268.91 rows=10000 ...)
   ->  Gather Merge  (cost=18582710.41..18614168.91 rows=20000 ...)
         Workers Planned: 2
         ->  Unique  (cost=18582710.39..18613960.39 rows=10000 ...)
               ->  Sort  (cost=18582710.39..18593127.06 ...)
                     Sort Key: t.unique1, ((SubPlan 1))
                   ...

which probably makes sense, as the cost estimate decreases a bit.

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 95db15fe16303ed3f4fdea52af3b8d6d05a8d7a6 Mon Sep 17 00:00:00 2001
From: jcoleman <jtc...@gmail.com>
Date: Mon, 26 Sep 2022 20:30:23 -0400
Subject: [PATCH 1/5] Add tests before change

---
 src/test/regress/expected/select_parallel.out | 108 ++++++++++++++++++
 src/test/regress/sql/select_parallel.sql      |  25 ++++
 2 files changed, 133 insertions(+)

diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 91f74fe47a3..9b4d7dd44a4 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -311,6 +311,114 @@ select count(*) from tenk1 where (two, four) not in
  10000
 (1 row)
 
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
+ Gather
+   Output: (SubPlan 1)
+   Workers Planned: 4
+   ->  Nested Loop
+         Output: t.unique1
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: t.unique1
+         ->  Function Scan on pg_catalog.generate_series
+               Output: generate_series.generate_series
+               Function Call: generate_series(1, 10)
+   SubPlan 1
+     ->  Index Only Scan using tenk1_unique1 on public.tenk1
+           Output: t.unique1
+           Index Cond: (tenk1.unique1 = t.unique1)
+(14 rows)
+
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: (SubPlan 1)
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+   SubPlan 1
+     ->  Index Only Scan using tenk1_unique1 on public.tenk1
+           Output: t.unique1
+           Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+                            QUERY PLAN                             
+-------------------------------------------------------------------
+ Limit
+   Output: ((SubPlan 1))
+   ->  Seq Scan on public.tenk1 t
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(8 rows)
+
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Seq Scan on public.tenk1 t
+   Output: t.unique1
+   Filter: (t.unique1 = (SubPlan 1))
+   SubPlan 1
+     ->  Index Only Scan using tenk1_unique1 on public.tenk1
+           Output: t.unique1
+           Index Cond: (tenk1.unique1 = t.unique1)
+(7 rows)
+
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+                                                                                             QUERY PLAN                                                                                             
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Sort
+   Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+   Sort Key: ((SubPlan 1))
+   ->  Gather
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+         Workers Planned: 4
+         ->  Parallel Seq Scan on public.tenk1 t
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(12 rows)
+
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
+   Output: t.unique1, (SubPlan 1)
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+   SubPlan 1
+     ->  Index Only Scan using tenk1_unique1 on public.tenk1
+           Output: t.unique1
+           Index Cond: (tenk1.unique1 = t.unique1)
+(9 rows)
+
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 62fb68c7a04..21c2f1c7424 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -111,6 +111,31 @@ explain (costs off)
 	(select hundred, thousand from tenk2 where thousand > 100);
 select count(*) from tenk1 where (two, four) not in
 	(select hundred, thousand from tenk2 where thousand > 100);
+-- test parallel plans for queries containing correlated subplans
+-- where the subplan only needs params available from the current
+-- worker's scan.
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t, generate_series(1, 10);
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t;
+explain (costs off, verbose) select
+  (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
+  from tenk1 t
+  limit 1;
+explain (costs off, verbose) select t.unique1
+  from tenk1 t
+  where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+explain (costs off, verbose) select *
+  from tenk1 t
+  order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
+-- test subplan in join/lateral join
+explain (costs off, verbose, timing off) select t.unique1, l.*
+  from tenk1 t
+  join lateral (
+    select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0)
+  ) l on true;
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
 explain (costs off)
 	select * from tenk1 where (unique1 + random())::integer not in
-- 
2.39.0

From cd379afbe22a9b41ff2b5dd5d86719a19f15301f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Wed, 18 Jan 2023 19:15:06 +0100
Subject: [PATCH 2/5] Parallelize correlated subqueries

When params are provided at the current query level (i.e., are generated
within a single worker and not shared across workers) we can safely
execute these in parallel.

Alternative approach using just relids subset check
---
 doc/src/sgml/parallel.sgml                    |   3 +-
 src/backend/optimizer/path/allpaths.c         |  16 ++-
 src/backend/optimizer/path/joinpath.c         |  16 ++-
 src/backend/optimizer/util/clauses.c          |   3 +
 src/backend/optimizer/util/pathnode.c         |   2 +
 src/include/nodes/pathnodes.h                 |   2 +-
 .../regress/expected/incremental_sort.out     |  28 ++--
 src/test/regress/expected/partition_prune.out | 104 +++++++-------
 src/test/regress/expected/select_parallel.out | 128 ++++++++++--------
 9 files changed, 168 insertions(+), 134 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 5acc9537d6f..fd32572ec8b 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -518,7 +518,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
 
     <listitem>
       <para>
-        Plan nodes that reference a correlated <literal>SubPlan</literal>.
+        Plan nodes that reference a correlated <literal>SubPlan</literal> where
+        the result is shared between workers.
       </para>
     </listitem>
   </itemizedlist>
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index c2fc568dc8a..7108501b9a7 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -558,7 +558,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * the final scan/join targetlist is available (see grouping_planner).
 	 */
 	if (rel->reloptkind == RELOPT_BASEREL &&
-		!bms_equal(rel->relids, root->all_baserels))
+		!bms_equal(rel->relids, root->all_baserels)
+		&& (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY))
 		generate_useful_gather_paths(root, rel, false);
 
 	/* Now find the cheapest of the paths for this rel */
@@ -3037,7 +3038,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
 	simple_gather_path = (Path *)
 		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   NULL, rowsp);
+						   rel->lateral_relids, rowsp);
 	add_path(rel, simple_gather_path);
 
 	/*
@@ -3054,7 +3055,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
-										subpath->pathkeys, NULL, rowsp);
+										subpath->pathkeys, rel->lateral_relids, rowsp);
 		add_path(rel, &path->path);
 	}
 }
@@ -3156,11 +3157,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	double	   *rowsp = NULL;
 	List	   *useful_pathkeys_list = NIL;
 	Path	   *cheapest_partial_path = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3249,7 +3254,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 											subpath,
 											rel->reltarget,
 											subpath->pathkeys,
-											NULL,
+											required_outer,
 											rowsp);
 
 			add_path(rel, &path->path);
@@ -3427,7 +3432,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels)
 			/*
 			 * Except for the topmost scan/join rel, consider gathering
 			 * partial paths.  We'll do the same for the topmost scan/join rel
-			 * once we know the final targetlist (see grouping_planner).
+			 * once we know the final targetlist (see
+			 * apply_scanjoin_target_to_paths).
 			 */
 			if (!bms_equal(rel->relids, root->all_baserels))
 				generate_useful_gather_paths(root, rel, false);
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index d345c0437a4..000e3ca9a25 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -1792,16 +1792,24 @@ match_unsorted_outer(PlannerInfo *root,
 	 * partial path and the joinrel is parallel-safe.  However, we can't
 	 * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and
 	 * therefore we won't be able to properly guarantee uniqueness.  Nor can
-	 * we handle joins needing lateral rels, since partial paths must not be
-	 * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT,
-	 * because they can produce false null extended rows.
+	 * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null
+	 * extended rows.
+	 *
+	 * Partial paths may only have parameters in limited cases
+	 * where the parameterization is fully satisfied without sharing state
+	 * between workers, so we only allow lateral rels on inputs to the join
+	 * if the resulting join contains no lateral rels, the inner rel's laterals
+	 * are fully satisfied by the outer rel, and the outer rel doesn't depend
+	 * on the inner rel to produce any laterals.
 	 */
 	if (joinrel->consider_parallel &&
 		save_jointype != JOIN_UNIQUE_OUTER &&
 		save_jointype != JOIN_FULL &&
 		save_jointype != JOIN_RIGHT &&
 		outerrel->partial_pathlist != NIL &&
-		bms_is_empty(joinrel->lateral_relids))
+		bms_is_empty(joinrel->lateral_relids) &&
+		bms_is_subset(innerrel->lateral_relids, outerrel->relids) &&
+		(bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids)))
 	{
 		if (nestjoinOK)
 			consider_parallel_nestloop(root, joinrel, outerrel, innerrel,
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index aa584848cf9..035471d05d0 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -816,6 +816,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXTERN)
 			return false;
 
+		if (param->paramkind == PARAM_EXEC)
+			return false;
+
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 4478036bb6a..5667222e925 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -2437,6 +2437,8 @@ create_nestloop_path(PlannerInfo *root,
 	NestPath   *pathnode = makeNode(NestPath);
 	Relids		inner_req_outer = PATH_REQ_OUTER(inner_path);
 
+	/* TODO: Assert lateral relids subset safety? */
+
 	/*
 	 * If the inner path is parameterized by the outer, we must drop any
 	 * restrict_clauses that are due to be moved into the inner path.  We have
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index c20b7298a3d..a5ba65a5616 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -2962,7 +2962,7 @@ typedef struct MinMaxAggInfo
  * for conflicting purposes.
  *
  * In addition, PARAM_EXEC slots are assigned for Params representing outputs
- * from subplans (values that are setParam items for those subplans).  These
+ * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?]  These
  * IDs need not be tracked via PlannerParamItems, since we do not need any
  * duplicate-elimination nor later processing of the represented expressions.
  * Instead, we just record the assignment of the slot number by appending to
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 0c3433f8e58..0cb7c1a49c0 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -1600,16 +1600,16 @@ from tenk1 t, generate_series(1, 1000);
                                    QUERY PLAN                                    
 ---------------------------------------------------------------------------------
  Unique
-   ->  Sort
-         Sort Key: t.unique1, ((SubPlan 1))
-         ->  Gather
-               Workers Planned: 2
+   ->  Gather Merge
+         Workers Planned: 2
+         ->  Sort
+               Sort Key: t.unique1, ((SubPlan 1))
                ->  Nested Loop
                      ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                      ->  Function Scan on generate_series
-               SubPlan 1
-                 ->  Index Only Scan using tenk1_unique1 on tenk1
-                       Index Cond: (unique1 = t.unique1)
+                     SubPlan 1
+                       ->  Index Only Scan using tenk1_unique1 on tenk1
+                             Index Cond: (unique1 = t.unique1)
 (11 rows)
 
 explain (costs off) select
@@ -1619,16 +1619,16 @@ from tenk1 t, generate_series(1, 1000)
 order by 1, 2;
                                 QUERY PLAN                                 
 ---------------------------------------------------------------------------
- Sort
-   Sort Key: t.unique1, ((SubPlan 1))
-   ->  Gather
-         Workers Planned: 2
+ Gather Merge
+   Workers Planned: 2
+   ->  Sort
+         Sort Key: t.unique1, ((SubPlan 1))
          ->  Nested Loop
                ->  Parallel Index Only Scan using tenk1_unique1 on tenk1 t
                ->  Function Scan on generate_series
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on tenk1
-                 Index Cond: (unique1 = t.unique1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on tenk1
+                       Index Cond: (unique1 = t.unique1)
 (10 rows)
 
 -- Parallel sort but with expression not available until the upper rel.
diff --git a/src/test/regress/expected/partition_prune.out b/src/test/regress/expected/partition_prune.out
index 7555764c779..5c45f9c0a50 100644
--- a/src/test/regress/expected/partition_prune.out
+++ b/src/test/regress/expected/partition_prune.out
@@ -1284,60 +1284,64 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM part p(x) ORDER BY x;
 --
 -- pruning won't work for mc3p, because some keys are Params
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = t1.b and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p2 t2_3
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p3 t2_4
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p4 t2_5
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p5 t2_6
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p6 t2_7
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p7 t2_8
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_9
-                     Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
-(28 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p2 t2_3
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p3 t2_4
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p4 t2_5
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p5 t2_6
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p6 t2_7
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p7 t2_8
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_9
+                           Filter: ((a = t1.b) AND (c = 1) AND (abs(b) = 1))
+(30 rows)
 
 -- pruning should work fine, because values for a prefix of keys (a, b) are
 -- available
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.c = t1.b and abs(t2.b) = 1 and t2.a = 1) s where t1.a = 1;
-                              QUERY PLAN                               
------------------------------------------------------------------------
- Nested Loop
-   ->  Append
-         ->  Seq Scan on mc2p1 t1_1
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p2 t1_2
-               Filter: (a = 1)
-         ->  Seq Scan on mc2p_default t1_3
-               Filter: (a = 1)
-   ->  Aggregate
-         ->  Append
-               ->  Seq Scan on mc3p0 t2_1
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p1 t2_2
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-               ->  Seq Scan on mc3p_default t2_3
-                     Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
-(16 rows)
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Gather
+   Workers Planned: 2
+   ->  Nested Loop
+         ->  Parallel Append
+               ->  Parallel Seq Scan on mc2p1 t1_1
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p2 t1_2
+                     Filter: (a = 1)
+               ->  Parallel Seq Scan on mc2p_default t1_3
+                     Filter: (a = 1)
+         ->  Aggregate
+               ->  Append
+                     ->  Seq Scan on mc3p0 t2_1
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p1 t2_2
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+                     ->  Seq Scan on mc3p_default t2_3
+                           Filter: ((c = t1.b) AND (a = 1) AND (abs(b) = 1))
+(18 rows)
 
 -- also here, because values for all keys are provided
 explain (costs off) select * from mc2p t1, lateral (select count(*) from mc3p t2 where t2.a = 1 and abs(t2.b) = 1 and t2.c = 1) s where t1.a = 1;
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 9b4d7dd44a4..01443e2ffbe 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -137,8 +137,8 @@ create table part_pa_test_p2 partition of part_pa_test for values from (0) to (m
 explain (costs off)
 	select (select max((select pa1.b from part_pa_test pa1 where pa1.a = pa2.a)))
 	from part_pa_test pa2;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                           QUERY PLAN                           
+----------------------------------------------------------------
  Aggregate
    ->  Gather
          Workers Planned: 3
@@ -148,12 +148,14 @@ explain (costs off)
    SubPlan 2
      ->  Result
    SubPlan 1
-     ->  Append
-           ->  Seq Scan on part_pa_test_p1 pa1_1
-                 Filter: (a = pa2.a)
-           ->  Seq Scan on part_pa_test_p2 pa1_2
-                 Filter: (a = pa2.a)
-(14 rows)
+     ->  Gather
+           Workers Planned: 3
+           ->  Parallel Append
+                 ->  Parallel Seq Scan on part_pa_test_p1 pa1_1
+                       Filter: (a = pa2.a)
+                 ->  Parallel Seq Scan on part_pa_test_p2 pa1_2
+                       Filter: (a = pa2.a)
+(16 rows)
 
 drop table part_pa_test;
 -- test with leader participation disabled
@@ -320,19 +322,19 @@ explain (costs off, verbose) select
                                  QUERY PLAN                                 
 ----------------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Nested Loop
-         Output: t.unique1
+         Output: (SubPlan 1)
          ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
                Output: t.unique1
          ->  Function Scan on pg_catalog.generate_series
                Output: generate_series.generate_series
                Function Call: generate_series(1, 10)
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (14 rows)
 
 explain (costs off, verbose) select
@@ -341,63 +343,69 @@ explain (costs off, verbose) select
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: (SubPlan 1)
+   Output: ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 explain (costs off, verbose) select
   (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
   from tenk1 t
   limit 1;
-                            QUERY PLAN                             
--------------------------------------------------------------------
+                                 QUERY PLAN                                 
+----------------------------------------------------------------------------
  Limit
    Output: ((SubPlan 1))
-   ->  Seq Scan on public.tenk1 t
-         Output: (SubPlan 1)
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
-(8 rows)
+   ->  Gather
+         Output: ((SubPlan 1))
+         Workers Planned: 4
+         ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+               Output: (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
+(11 rows)
 
 explain (costs off, verbose) select t.unique1
   from tenk1 t
   where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                         QUERY PLAN                          
--------------------------------------------------------------
- Seq Scan on public.tenk1 t
+                              QUERY PLAN                              
+----------------------------------------------------------------------
+ Gather
    Output: t.unique1
-   Filter: (t.unique1 = (SubPlan 1))
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
-(7 rows)
+   Workers Planned: 4
+   ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
+         Output: t.unique1
+         Filter: (t.unique1 = (SubPlan 1))
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
+(10 rows)
 
 explain (costs off, verbose) select *
   from tenk1 t
   order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1);
-                                                                                             QUERY PLAN                                                                                             
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Sort
+                                                                                                QUERY PLAN                                                                                                
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Gather Merge
    Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
-   Sort Key: ((SubPlan 1))
-   ->  Gather
-         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
-         Workers Planned: 4
+   Workers Planned: 4
+   ->  Sort
+         Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1))
+         Sort Key: ((SubPlan 1))
          ->  Parallel Seq Scan on public.tenk1 t
-               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4
-         SubPlan 1
-           ->  Index Only Scan using tenk1_unique1 on public.tenk1
-                 Output: t.unique1
-                 Index Cond: (tenk1.unique1 = t.unique1)
+               Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1)
+               SubPlan 1
+                 ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                       Output: t.unique1
+                       Index Cond: (tenk1.unique1 = t.unique1)
 (12 rows)
 
 -- test subplan in join/lateral join
@@ -409,14 +417,14 @@ explain (costs off, verbose, timing off) select t.unique1, l.*
                               QUERY PLAN                              
 ----------------------------------------------------------------------
  Gather
-   Output: t.unique1, (SubPlan 1)
+   Output: t.unique1, ((SubPlan 1))
    Workers Planned: 4
    ->  Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
-         Output: t.unique1
-   SubPlan 1
-     ->  Index Only Scan using tenk1_unique1 on public.tenk1
-           Output: t.unique1
-           Index Cond: (tenk1.unique1 = t.unique1)
+         Output: t.unique1, (SubPlan 1)
+         SubPlan 1
+           ->  Index Only Scan using tenk1_unique1 on public.tenk1
+                 Output: t.unique1
+                 Index Cond: (tenk1.unique1 = t.unique1)
 (9 rows)
 
 -- this is not parallel-safe due to use of random() within SubLink's testexpr:
@@ -1322,8 +1330,10 @@ SELECT 1 FROM tenk1_vw_sec
          ->  Parallel Index Only Scan using tenk1_unique1 on tenk1
    SubPlan 1
      ->  Aggregate
-           ->  Seq Scan on int4_tbl
-                 Filter: (f1 < tenk1_vw_sec.unique1)
-(9 rows)
+           ->  Gather
+                 Workers Planned: 1
+                 ->  Parallel Seq Scan on int4_tbl
+                       Filter: (f1 < tenk1_vw_sec.unique1)
+(11 rows)
 
 rollback;
-- 
2.39.0

From 1aa279e8e82a4e9771fb1a5068e9a5792a824a54 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Wed, 18 Jan 2023 19:51:38 +0100
Subject: [PATCH 3/5] review comments

---
 src/backend/optimizer/path/allpaths.c | 5 +++--
 src/backend/optimizer/util/clauses.c  | 5 +++++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 7108501b9a7..357dfaab3e8 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -558,8 +558,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
 	 * the final scan/join targetlist is available (see grouping_planner).
 	 */
 	if (rel->reloptkind == RELOPT_BASEREL &&
-		!bms_equal(rel->relids, root->all_baserels)
-		&& (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY))
+		!bms_equal(rel->relids, root->all_baserels) &&
+		(rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY))
 		generate_useful_gather_paths(root, rel, false);
 
 	/* Now find the cheapest of the paths for this rel */
@@ -3163,6 +3163,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/* FIXME ??? */
 	if (!bms_is_subset(required_outer, rel->relids))
 		return;
 
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 035471d05d0..9585acf8e69 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -819,6 +819,11 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 		if (param->paramkind == PARAM_EXEC)
 			return false;
 
+		/*
+		 * XXX The first condition is certainly true, thanks to the preceding
+		 * check. The comment above should be updated to reflect this change,
+		 * probably.
+		 */
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
-- 
2.39.0

From 45da97c1141ead1fe757fcff549e68fbd4c1ded9 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Wed, 18 Jan 2023 19:21:24 +0100
Subject: [PATCH 4/5] Possible additional checks

---
 src/backend/optimizer/path/allpaths.c | 29 +++++++++++++++++++++------
 1 file changed, 23 insertions(+), 6 deletions(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 357dfaab3e8..eaa972ec64b 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3019,11 +3019,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	ListCell   *lc;
 	double		rows;
 	double	   *rowsp = NULL;
+	Relids		required_outer = rel->lateral_relids;
 
 	/* If there are no partial paths, there's nothing to do here. */
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	if (!bms_is_subset(required_outer, rel->relids))
+		return;
+
+
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3034,12 +3039,16 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	 * of partial_pathlist because of the way add_partial_path works.
 	 */
 	cheapest_partial_path = linitial(rel->partial_pathlist);
-	rows =
-		cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
-	simple_gather_path = (Path *)
-		create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
-						   rel->lateral_relids, rowsp);
-	add_path(rel, simple_gather_path);
+	if (cheapest_partial_path->param_info == NULL ||
+			bms_is_subset(cheapest_partial_path->param_info->ppi_req_outer, rel->relids))
+	{
+		rows =
+			cheapest_partial_path->rows * cheapest_partial_path->parallel_workers;
+		simple_gather_path = (Path *)
+			create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
+							   rel->lateral_relids, rowsp);
+		add_path(rel, simple_gather_path);
+	}
 
 	/*
 	 * For each useful ordering, we can consider an order-preserving Gather
@@ -3053,6 +3062,10 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
+		if (subpath->param_info != NULL &&
+			!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
+			break;
+
 		rows = subpath->rows * subpath->parallel_workers;
 		path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
 										subpath->pathkeys, rel->lateral_relids, rowsp);
@@ -3223,6 +3236,10 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r
 				(presorted_keys == 0 || !enable_incremental_sort))
 				continue;
 
+			if (subpath->param_info != NULL &&
+				!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
+				break;
+
 			/*
 			 * Consider regular sort for any path that's not presorted or if
 			 * incremental sort is disabled.  We've no need to consider both
-- 
2.39.0

From 2ca25bd5bbe1aa6849c55cab58dafe202adacce9 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Wed, 18 Jan 2023 19:52:46 +0100
Subject: [PATCH 5/5] review comments

---
 src/backend/optimizer/path/allpaths.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index eaa972ec64b..9dd10a1274c 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3025,10 +3025,10 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 	if (rel->partial_pathlist == NIL)
 		return;
 
+	/* FIXME ??? */
 	if (!bms_is_subset(required_outer, rel->relids))
 		return;
 
-
 	/* Should we override the rel's rowcount estimate? */
 	if (override_rows)
 		rowsp = &rows;
@@ -3062,6 +3062,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows)
 		if (subpath->pathkeys == NIL)
 			continue;
 
+		/* FIXME ??? */
 		if (subpath->param_info != NULL &&
 			!bms_is_subset(subpath->param_info->ppi_req_outer, rel->relids))
 			break;
-- 
2.39.0

Reply via email to