On Mon, 2021-03-01 at 17:39 +0900, Amit Langote wrote: > > I am not sure. > > IMO, for normal table, we use the following macro to get the > > parallel_workers: > > ---------------------- > > /* > > * RelationGetParallelWorkers > > * Returns the relation's parallel_workers reloption setting. > > * Note multiple eval of argument! > > */ > > #define RelationGetParallelWorkers(relation, defaultpw) \ > > ((relation)->rd_options ? \ > > ((StdRdOptions *) (relation)->rd_options)->parallel_workers : > > (defaultpw)) > > ---------------------- > > Since we add new struct " PartitionedTableRdOptions ", It seems we need to > > get parallel_workers in different way. > > Do we need similar macro to get partitioned table's parallel_workers ? > > Oh, you're right. The parallel_workers setting of a relation is only > accessed through this macro, even for partitioned tables, and I can > see that it is actually wrong to access a partitioned table's > parallel_workers through this macro as-is. Although I hadn't tested > that, so thanks for pointing that out. > > > Like: > > #define PartitionedTableGetParallelWorkers(relation, defaultpw) \ > > xxx > > (PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : > > (defaultpw)) > > I'm thinking it would be better to just modify the existing macro to > check relkind to decide which struct pointer type to cast the value in > rd_options to.
Here is an updated patch with this fix. I added regression tests and adapted the documentation a bit. I also added support for lowering the number of parallel workers. Yours, Laurenz Albe
From d48874a61ac698dc284b83b7e3b147a71158d09d Mon Sep 17 00:00:00 2001 From: Laurenz Albe <laurenz.a...@cybertec.at> Date: Mon, 1 Mar 2021 16:06:49 +0100 Subject: [PATCH] Allow setting parallel_workers on partitioned tables. Sometimes it's beneficial to do ALTER TABLE my_part_tbl SET (parallel_workers = 32); when the query planner is planning too few workers to read from your partitions. For example, if you have a partitioned table where each partition is a foreign table that cannot itself have this setting on it. Shown to give a 100%+ performance increase (in my case, 700%) when used with cstore_fdw column store partitions. This is also useful to lower the number of parallel workers, for example when the executor is expected to prune most partitions. Authors: Seamus Abshere, Amit Langote, Laurenz Albe Discussion: https://postgr.es/m/95b1dd96-8634-4545-b1de-e2ac779beb44%40www.fastmail.com --- doc/src/sgml/ref/create_table.sgml | 15 +- src/backend/access/common/reloptions.c | 14 +- src/backend/optimizer/path/allpaths.c | 155 ++++++++++-------- src/include/utils/rel.h | 15 +- .../regress/expected/partition_aggregate.out | 51 +++++- src/test/regress/sql/partition_aggregate.sql | 17 +- 6 files changed, 188 insertions(+), 79 deletions(-) diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml index 3b2b227683..c8c14850c1 100644 --- a/doc/src/sgml/ref/create_table.sgml +++ b/doc/src/sgml/ref/create_table.sgml @@ -1337,8 +1337,9 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM If a table parameter value is set and the equivalent <literal>toast.</literal> parameter is not, the TOAST table will use the table's parameter value. - Specifying these parameters for partitioned tables is not supported, - but you may specify them for individual leaf partitions. + These parameters, with the exception of <literal>parallel_workers</literal>, + are not supported on partitioned tables, but you may specify them for + individual leaf partitions. </para> <variablelist> @@ -1401,9 +1402,13 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM <para> This sets the number of workers that should be used to assist a parallel scan of this table. If not set, the system will determine a value based - on the relation size. The actual number of workers chosen by the planner - or by utility statements that use parallel scans may be less, for example - due to the setting of <xref linkend="guc-max-worker-processes"/>. + on the relation size and the number of scanned partitions. + When set on a partitioned table, the specified number of workers will + work on distinct partitions, so the number of partitions affected by the + parallel operation should be taken into account. + The actual number of workers chosen by the planner or by utility + statements that use parallel scans may be less, for example due + to the setting of <xref linkend="guc-max-worker-processes"/>. </para> </listitem> </varlistentry> diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index c687d3ee9e..f8443d2361 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -377,7 +377,7 @@ static relopt_int intRelOpts[] = { "parallel_workers", "Number of parallel processes that can be used per executor node for this relation.", - RELOPT_KIND_HEAP, + RELOPT_KIND_HEAP | RELOPT_KIND_PARTITIONED, ShareUpdateExclusiveLock }, -1, 0, 1024 @@ -1962,12 +1962,18 @@ bytea * partitioned_table_reloptions(Datum reloptions, bool validate) { /* - * There are no options for partitioned tables yet, but this is able to do - * some validation. + * Currently the only setting known to be useful for partitioned tables + * is parallel_workers. */ + static const relopt_parse_elt tab[] = { + {"parallel_workers", RELOPT_TYPE_INT, + offsetof(PartitionedTableRdOptions, parallel_workers)}, + }; + return (bytea *) build_reloptions(reloptions, validate, RELOPT_KIND_PARTITIONED, - 0, NULL, 0); + sizeof(PartitionedTableRdOptions), + tab, lengthof(tab)); } /* diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index d73ac562eb..e22cb6f33e 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -97,6 +97,9 @@ static void set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); +static int compute_append_parallel_workers(RelOptInfo *rel, List *subpaths, + int num_live_children, + bool parallel_append); static void generate_orderedappend_paths(PlannerInfo *root, RelOptInfo *rel, List *live_childrels, List *all_child_pathkeys); @@ -1268,6 +1271,65 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, add_paths_to_append_rel(root, rel, live_childrels); } +/* + * compute_append_parallel_workers + * Computes the number of workers to assign to scan the subpaths appended + * by a given Append path + */ +static int +compute_append_parallel_workers(RelOptInfo *rel, List *subpaths, + int num_live_children, + bool parallel_append) +{ + ListCell *lc; + int parallel_workers = 0; + + /* + * For partitioned rels, first see if there is a root-level setting for + * parallel_workers. But only consider if a Parallel Append plan is + * to be considered. + */ + if (IS_PARTITIONED_REL(rel) && + parallel_append && + rel->rel_parallel_workers != -1) + { + parallel_workers = Min(rel->rel_parallel_workers, + max_parallel_workers_per_gather); + + /* an explicit setting overrides heuristics */ + return parallel_workers; + } + + /* Find the highest number of workers requested for any subpath. */ + foreach(lc, subpaths) + { + Path *path = lfirst(lc); + + parallel_workers = Max(parallel_workers, path->parallel_workers); + } + Assert(parallel_workers > 0 || subpaths == NIL); + + /* + * If the use of parallel append is permitted, always request at least + * log2(# of children) workers. We assume it can be useful to have + * extra workers in this case because they will be spread out across + * the children. The precise formula is just a guess, but we don't + * want to end up with a radically different answer for a table with N + * partitions vs. an unpartitioned table with the same data, so the + * use of some kind of log-scaling here seems to make some sense. + */ + if (parallel_append) + { + parallel_workers = Max(parallel_workers, + fls(num_live_children)); + parallel_workers = Min(parallel_workers, + max_parallel_workers_per_gather); + } + Assert(parallel_workers > 0); + + return parallel_workers; +} + /* * add_paths_to_append_rel @@ -1464,50 +1526,28 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, if (partial_subpaths_valid && partial_subpaths != NIL) { AppendPath *appendpath; - ListCell *lc; - int parallel_workers = 0; + int parallel_workers = + compute_append_parallel_workers(rel, partial_subpaths, + list_length(live_childrels), + enable_parallel_append); - /* Find the highest number of workers requested for any subpath. */ - foreach(lc, partial_subpaths) + if (parallel_workers > 0) { - Path *path = lfirst(lc); + /* Generate a partial append path. */ + appendpath = create_append_path(root, rel, NIL, partial_subpaths, + NIL, NULL, parallel_workers, + enable_parallel_append, + -1); - parallel_workers = Max(parallel_workers, path->parallel_workers); - } - Assert(parallel_workers > 0); + /* + * Make sure any subsequent partial paths use the same row count + * estimate. + */ + partial_rows = appendpath->path.rows; - /* - * If the use of parallel append is permitted, always request at least - * log2(# of children) workers. We assume it can be useful to have - * extra workers in this case because they will be spread out across - * the children. The precise formula is just a guess, but we don't - * want to end up with a radically different answer for a table with N - * partitions vs. an unpartitioned table with the same data, so the - * use of some kind of log-scaling here seems to make some sense. - */ - if (enable_parallel_append) - { - parallel_workers = Max(parallel_workers, - fls(list_length(live_childrels))); - parallel_workers = Min(parallel_workers, - max_parallel_workers_per_gather); + /* Add the path */ + add_partial_path(rel, (Path *) appendpath); } - Assert(parallel_workers > 0); - - /* Generate a partial append path. */ - appendpath = create_append_path(root, rel, NIL, partial_subpaths, - NIL, NULL, parallel_workers, - enable_parallel_append, - -1); - - /* - * Make sure any subsequent partial paths use the same row count - * estimate. - */ - partial_rows = appendpath->path.rows; - - /* Add the path. */ - add_partial_path(rel, (Path *) appendpath); } /* @@ -1519,36 +1559,19 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL) { AppendPath *appendpath; - ListCell *lc; - int parallel_workers = 0; + int parallel_workers = + compute_append_parallel_workers(rel, pa_partial_subpaths, + list_length(live_childrels), + true); - /* - * Find the highest number of workers requested for any partial - * subpath. - */ - foreach(lc, pa_partial_subpaths) + if (parallel_workers > 0) { - Path *path = lfirst(lc); - - parallel_workers = Max(parallel_workers, path->parallel_workers); + appendpath = create_append_path(root, rel, pa_nonpartial_subpaths, + pa_partial_subpaths, + NIL, NULL, parallel_workers, true, + partial_rows); + add_partial_path(rel, (Path *) appendpath); } - - /* - * Same formula here as above. It's even more important in this - * instance because the non-partial paths won't contribute anything to - * the planned number of parallel workers. - */ - parallel_workers = Max(parallel_workers, - fls(list_length(live_childrels))); - parallel_workers = Min(parallel_workers, - max_parallel_workers_per_gather); - Assert(parallel_workers > 0); - - appendpath = create_append_path(root, rel, pa_nonpartial_subpaths, - pa_partial_subpaths, - NIL, NULL, parallel_workers, true, - partial_rows); - add_partial_path(rel, (Path *) appendpath); } /* diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 10b63982c0..efa7b714cb 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -308,6 +308,16 @@ typedef struct StdRdOptions bool vacuum_truncate; /* enables vacuum to truncate a relation */ } StdRdOptions; +/* + * PartitionedTableRdOptions + * Contents of rd_options for partitioned tables + */ +typedef struct PartitionedTableRdOptions +{ + int32 vl_len_; /* varlena header (do not touch directly!) */ + int parallel_workers; /* max number of parallel workers */ +} PartitionedTableRdOptions; + #define HEAP_MIN_FILLFACTOR 10 #define HEAP_DEFAULT_FILLFACTOR 100 @@ -359,7 +369,10 @@ typedef struct StdRdOptions */ #define RelationGetParallelWorkers(relation, defaultpw) \ ((relation)->rd_options ? \ - ((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw)) + ((relation)->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? \ + ((PartitionedTableRdOptions *) (relation)->rd_options)->parallel_workers : \ + ((StdRdOptions *) (relation)->rd_options)->parallel_workers \ + ) : (defaultpw)) /* ViewOptions->check_option values */ typedef enum ViewOptCheckOption diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out index dfa4b036b5..611dfc87ff 100644 --- a/src/test/regress/expected/partition_aggregate.out +++ b/src/test/regress/expected/partition_aggregate.out @@ -1166,9 +1166,58 @@ SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 O 29 | 4500 | 500 (12 rows) --- Parallelism within partitionwise aggregates +-- Override "parallel_workers" for a partitioned table SET min_parallel_table_scan_size TO '8kB'; SET parallel_setup_cost TO 0; +SET max_parallel_workers_per_gather TO 8; +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; + QUERY PLAN +------------------------------------------------------------------ + Gather + Workers Planned: 4 + -> Parallel Append + -> Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1 + Filter: (b = 42) + -> Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2 + Filter: (b = 42) + -> Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3 + Filter: (b = 42) +(9 rows) + +ALTER TABLE pagg_tab_ml SET (parallel_workers = 6); +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; + QUERY PLAN +------------------------------------------------------------------ + Gather + Workers Planned: 6 + -> Parallel Append + -> Parallel Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1 + Filter: (b = 42) + -> Parallel Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2 + Filter: (b = 42) + -> Parallel Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3 + Filter: (b = 42) +(9 rows) + +ALTER TABLE pagg_tab_ml SET (parallel_workers = 0); +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; + QUERY PLAN +--------------------------------------------------- + Append + -> Seq Scan on pagg_tab_ml_p1 pagg_tab_ml_1 + Filter: (b = 42) + -> Seq Scan on pagg_tab_ml_p2_s1 pagg_tab_ml_2 + Filter: (b = 42) + -> Seq Scan on pagg_tab_ml_p2_s2 pagg_tab_ml_3 + Filter: (b = 42) +(7 rows) + +ALTER TABLE pagg_tab_ml RESET (parallel_workers); +-- Parallelism within partitionwise aggregates +SET max_parallel_workers_per_gather TO 2; -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY -- for level 1 only. For subpartitions, GROUP BY clause does not match with -- PARTITION KEY, thus we will have a partial aggregation for them. diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql index c17294b15b..c058e2e181 100644 --- a/src/test/regress/sql/partition_aggregate.sql +++ b/src/test/regress/sql/partition_aggregate.sql @@ -253,10 +253,23 @@ EXPLAIN (COSTS OFF) SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3; SELECT a, sum(b), count(*) FROM pagg_tab_ml GROUP BY a, b, c HAVING avg(b) > 7 ORDER BY 1, 2, 3; --- Parallelism within partitionwise aggregates - +-- Override "parallel_workers" for a partitioned table SET min_parallel_table_scan_size TO '8kB'; SET parallel_setup_cost TO 0; +SET max_parallel_workers_per_gather TO 8; +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; +ALTER TABLE pagg_tab_ml SET (parallel_workers = 6); +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; +ALTER TABLE pagg_tab_ml SET (parallel_workers = 0); +EXPLAIN (COSTS OFF) +SELECT a FROM pagg_tab_ml WHERE b = 42; +ALTER TABLE pagg_tab_ml RESET (parallel_workers); + +-- Parallelism within partitionwise aggregates + +SET max_parallel_workers_per_gather TO 2; -- Full aggregation at level 1 as GROUP BY clause matches with PARTITION KEY -- for level 1 only. For subpartitions, GROUP BY clause does not match with -- 2.26.2