Thanks for having a look at this.

On Fri, 4 Feb 2022 at 13:48, Robert Haas <robertmh...@gmail.com> wrote:
> I think the actual rule is: every path under a Gather or GatherMerge
> must be parallel-safe.

I've adjusted the patch so that it counts parallel_aware and
parallel_safe Paths independently and verifies everything below a
Gather[Merge] is parallel_safe.

The diff stat currently looks like:

src/backend/optimizer/plan/createplan.c | 230
1 file changed, 230 insertions(+)

I still feel this is quite a bit of code for what we're getting here.
I'd be more for it if the path traversal function existed for some
other reason and I was just adding the callback functions and Asserts.

I'm keen to hear what others think about that.

David
diff --git a/src/backend/optimizer/plan/createplan.c 
b/src/backend/optimizer/plan/createplan.c
index cd6d72c763..898046ca07 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -313,6 +313,20 @@ static ModifyTable *make_modifytable(PlannerInfo *root, 
Plan *subplan,
                                                                         List 
*rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
                                                                                
         GatherMergePath *best_path);
+static bool contains_a_parallel_aware_path(Path *path);
+static bool contains_only_parallel_safe_paths(Path *path);
+
+/*
+ * PathTypeCount
+ *             Used for various checks to assert plans are sane in assert 
enabled
+ *             builds.
+ */
+typedef struct PathTypeCount
+{
+       uint64 count;
+       uint64 parallel_safe_count;
+       uint64 parallel_aware_count;
+} PathTypeCount;
 
 
 /*
@@ -389,6 +403,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, 
int flags)
        /* Guard against stack overflow due to overly complex plans */
        check_stack_depth();
 
+       /* Parallel aware paths should contain only parallel safe subpaths. */
+       Assert(!best_path->parallel_aware ||
+                  contains_only_parallel_safe_paths(best_path));
+
        switch (best_path->pathtype)
        {
                case T_SeqScan:
@@ -481,6 +499,14 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, 
int flags)
                case T_Gather:
                        plan = (Plan *) create_gather_plan(root,
                                                                                
           (GatherPath *) best_path);
+
+                       /*
+                        * We expect a Gather to contain at least one parallel 
aware path
+                        * unless running in single_copy mode.
+                        */
+                       Assert(((GatherPath *) best_path)->single_copy ||
+                                  contains_a_parallel_aware_path(((GatherPath 
*)
+                                                                               
                  best_path)->subpath));
                        break;
                case T_Sort:
                        plan = (Plan *) create_sort_plan(root,
@@ -537,6 +563,9 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int 
flags)
                case T_GatherMerge:
                        plan = (Plan *) create_gather_merge_plan(root,
                                                                                
                         (GatherMergePath *) best_path);
+                       /* GatherMerge must contain at least one parallel aware 
path */
+                       Assert(contains_a_parallel_aware_path(((GatherMergePath 
*)
+                                                                               
                  best_path)->subpath));
                        break;
                default:
                        elog(ERROR, "unrecognized node type: %d",
@@ -7052,6 +7081,207 @@ make_modifytable(PlannerInfo *root, Plan *subplan,
        return node;
 }
 
+/*
+ * path_tree_walker
+ *             Walk a path tree beginning with 'path' and call the 'walker' 
function
+ *             for that path and each of its subpaths recursively.
+ */
+static void
+path_tree_walker(Path *path, void (*walker) (), void *context)
+
+{
+       if (path == NULL)
+               return;
+
+       /* Guard against stack overflow due to overly complex path trees */
+       check_stack_depth();
+
+       walker(path, context);
+
+       switch (path->pathtype)
+       {
+               case T_SeqScan:
+               case T_SampleScan:
+               case T_IndexScan:
+               case T_IndexOnlyScan:
+               case T_BitmapHeapScan:
+               case T_TidScan:
+               case T_TidRangeScan:
+               case T_SubqueryScan:
+               case T_FunctionScan:
+               case T_TableFuncScan:
+               case T_ValuesScan:
+               case T_CteScan:
+               case T_WorkTableScan:
+               case T_NamedTuplestoreScan:
+               case T_ForeignScan:
+               case T_CustomScan:
+                       /* Scan paths have no subpaths */
+                       break;
+               case T_HashJoin:
+               case T_MergeJoin:
+               case T_NestLoop:
+                       path_tree_walker(((JoinPath *) path)->outerjoinpath, 
walker, context);
+                       path_tree_walker(((JoinPath *) path)->innerjoinpath, 
walker, context);
+                       break;
+               case T_Append:
+                       {
+                               AppendPath *apath = (AppendPath *) path;
+                               ListCell           *lc;
+
+                               foreach(lc, apath->subpaths)
+                               {
+                                       Path *subpath = lfirst(lc);
+
+                                       path_tree_walker(subpath, walker, 
context);
+                               }
+                       }
+                       break;
+                       case T_MergeAppend:
+                       {
+                               MergeAppendPath *mpath = (MergeAppendPath *) 
path;
+                               ListCell           *lc;
+
+                               foreach(lc, mpath->subpaths)
+                               {
+                                       Path *subpath = lfirst(lc);
+
+                                       path_tree_walker(subpath, walker, 
context);
+                               }
+                       }
+                       break;
+               case T_Result:
+                       if (IsA(path, ProjectionPath))
+                       {
+                               path_tree_walker(((ProjectionPath *) 
path)->subpath, walker, context);
+                       }
+                       else if (IsA(path, MinMaxAggPath))
+                       {
+                               /* MinMaxAggPath has no subpaths */
+                       }
+                       else if (IsA(path, GroupResultPath))
+                       {
+                               /* GroupResultPath has no subpaths */
+                       }
+                       else
+                       {
+                               /* No subpaths for any other Result type path */
+                       }
+                       break;
+               case T_ProjectSet:
+                       path_tree_walker(((ProjectSetPath *) path)->subpath, 
walker, context);
+                       break;
+               case T_Material:
+                       path_tree_walker(((MaterialPath *) path)->subpath, 
walker, context);
+                       break;
+               case T_Memoize:
+                       path_tree_walker(((MemoizePath *) path)->subpath, 
walker, context);
+                       break;
+               case T_Unique:
+                       if (IsA(path, UpperUniquePath))
+                               path_tree_walker(((UpperUniquePath *) 
path)->subpath, walker, context);
+                       else
+                       {
+                               Assert(IsA(path, UniquePath));
+                               path_tree_walker(((UniquePath *) 
path)->subpath, walker, context);
+                       }
+                       break;
+               case T_Gather:
+                       path_tree_walker(((GatherPath *) path)->subpath, 
walker, context);
+                       break;
+               case T_Sort:
+                       path_tree_walker(((SortPath *) path)->subpath, walker, 
context);
+                       break;
+               case T_IncrementalSort:
+                       path_tree_walker(((IncrementalSortPath *) 
path)->spath.subpath, walker, context);
+                       break;
+               case T_Group:
+                       path_tree_walker(((GroupPath *) path)->subpath, walker, 
context);
+                       break;
+               case T_Agg:
+                       if (IsA(path, GroupingSetsPath))
+                               path_tree_walker(((GroupingSetsPath *) 
path)->subpath, walker, context);
+                       else
+                       {
+                               Assert(IsA(path, AggPath));
+                               path_tree_walker(((AggPath *) path)->subpath, 
walker, context);
+                       }
+                       break;
+               case T_WindowAgg:
+                       path_tree_walker(((WindowAggPath *) path)->subpath, 
walker, context);
+                       break;
+               case T_SetOp:
+                       path_tree_walker(((SetOpPath *) path)->subpath, walker, 
context);
+                       break;
+               case T_RecursiveUnion:
+                       path_tree_walker(((RecursiveUnionPath *) 
path)->leftpath, walker, context);
+                       path_tree_walker(((RecursiveUnionPath *) 
path)->rightpath, walker, context);
+                       break;
+               case T_LockRows:
+                       path_tree_walker(((LockRowsPath *) path)->subpath, 
walker, context);
+                       break;
+               case T_ModifyTable:
+                       path_tree_walker(((ModifyTablePath *) path)->subpath, 
walker, context);
+                       break;
+               case T_Limit:
+                       path_tree_walker(((LimitPath *) path)->subpath, walker, 
context);
+                       break;
+               case T_GatherMerge:
+                       path_tree_walker(((GatherMergePath *) path)->subpath, 
walker, context);
+                       break;
+               default:
+                       elog(ERROR, "unrecognized node type: %d", (int) 
path->pathtype);
+                       break;
+       }
+}
+
+/*
+ * path_type_counter
+ *             Determine the total number of paths and the number of paths 
that are
+ *             parallel_aware and the number that are parallel safe.
+ */
+static void
+path_type_counter(Path *path, PathTypeCount *pathcount)
+{
+       pathcount->count++;
+       if (path->parallel_aware)
+               pathcount->parallel_aware_count++;
+       if (path->parallel_safe)
+               pathcount->parallel_safe_count++;
+}
+
+/*
+ * contains_a_parallel_aware_path
+ *             Determine if 'path' or any of its subpaths are parallel aware
+ */
+static bool
+contains_a_parallel_aware_path(Path *path)
+{
+       PathTypeCount pathcount;
+
+       memset(&pathcount, 0, sizeof(pathcount));
+
+       path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+       return (pathcount.parallel_aware_count > 0);
+}
+
+/*
+ * contains_only_parallel_safe_paths
+ *             Returns true if 'path' and all of its subpaths are parallel safe
+ */
+static bool
+contains_only_parallel_safe_paths(Path *path)
+{
+       PathTypeCount pathcount;
+
+       memset(&pathcount, 0, sizeof(pathcount));
+
+       path_tree_walker(path, path_type_counter, (void *) &pathcount);
+
+       return (pathcount.parallel_safe_count == pathcount.count);
+}
+
 /*
  * is_projection_capable_path
  *             Check whether a given Path node is able to do projection.

Reply via email to