Hi all, Paul and I have been hacking recently to implement parallel grouping sets, and here we have two implementations.
Implementation 1 ================ Attached is the patch and also there is a github branch [1] for this work. Parallel aggregation has already been supported in PostgreSQL and it is implemented by aggregating in two stages. First, each worker performs an aggregation step, producing a partial result for each group of which that process is aware. Second, the partial results are transferred to the leader via the Gather node. Finally, the leader merges the partial results and produces the final result for each group. We are implementing parallel grouping sets in the same way. The only difference is that in the final stage, the leader performs a grouping sets aggregation, rather than a normal aggregation. The plan looks like: # explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); QUERY PLAN --------------------------------------------------------- Finalize MixedAggregate Output: c1, c2, avg(c3), c3 Hash Key: t2.c2, t2.c3 Group Key: t2.c1, t2.c2 Group Key: t2.c1 -> Gather Merge Output: c1, c2, c3, (PARTIAL avg(c3)) Workers Planned: 2 -> Sort Output: c1, c2, c3, (PARTIAL avg(c3)) Sort Key: t2.c1, t2.c2 -> Partial HashAggregate Output: c1, c2, c3, PARTIAL avg(c3) Group Key: t2.c1, t2.c2, t2.c3 -> Parallel Seq Scan on public.t2 Output: c1, c2, c3 (16 rows) As the partial aggregation can be performed in parallel, we can expect a speedup if the number of groups seen by the Finalize Aggregate node is some less than the number of input rows. For example, for the table provided in the test case within the patch, running the above query in my Linux box: # explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- without patch Planning Time: 0.123 ms Execution Time: 9459.362 ms # explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- with patch Planning Time: 0.204 ms Execution Time: 1077.654 ms But sometimes we may not benefit from this patch. For example, in the worst-case scenario the number of groups seen by the Finalize Aggregate node could be as many as the number of input rows which were seen by all worker processes in the Partial Aggregate stage. This is prone to happening with this patch, because the group key for Partial Aggregate is all the columns involved in the grouping sets, such as in the above query, it is (c1, c2, c3). So, we have been working on another way to implement parallel grouping sets. Implementation 2 ================ This work can be found in github branch [2]. As it contains some hacky codes and a list of TODO items, this is far from a patch. So please consider it as a PoC. The idea is instead of performing grouping sets aggregation in Finalize Aggregate, we perform it in Partial Aggregate. The plan looks like: # explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1)); QUERY PLAN -------------------------------------------------------------- Finalize GroupAggregate Output: c1, c2, avg(c3), (gset_id) Group Key: t2.c1, t2.c2, (gset_id) -> Gather Merge Output: c1, c2, (gset_id), (PARTIAL avg(c3)) Workers Planned: 2 -> Sort Output: c1, c2, (gset_id), (PARTIAL avg(c3)) Sort Key: t2.c1, t2.c2, (gset_id) -> Partial HashAggregate Output: c1, c2, gset_id, PARTIAL avg(c3) Hash Key: t2.c1, t2.c2 Hash Key: t2.c1 -> Parallel Seq Scan on public.t2 Output: c1, c2, c3 (15 rows) With this method, there is a problem, i.e., in the final stage of aggregation, the leader does not have a way to distinguish which tuple comes from which grouping set, which turns out to be needed by leader for merging the partial results. For instance, suppose we have a table t(c1, c2, c3) containing one row (1, NULL, 3), and we are selecting agg(c3) group by grouping sets ((c1,c2), (c1)). Then the leader would get two tuples via Gather node for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2) and one is from group by (c1). If the leader cannot tell that the two tuples are from two different grouping sets, it will merge them incorrectly. So we add a hidden column 'gset_id', representing grouping set id, to the targetlist of Partial Aggregate node, as well as to the group key for Finalize Aggregate node. So only tuples coming from the same grouping set can get merged in the final stage of aggregation. With this method, for grouping sets with multiple rollups, to simplify the implementation, we generate a separate aggregation path for each rollup, and then append them for the final path. References: [1] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets [2] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets_2 Any comments and feedback are welcome. Thanks Richard
v1-0001-Implementing-parallel-grouping-sets.patch
Description: Binary data