On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
Hi all,

Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.

Implementation 1
================

Attached is the patch and also there is a github branch [1] for this
work.

Parallel aggregation has already been supported in PostgreSQL and it is
implemented by aggregating in two stages. First, each worker performs an
aggregation step, producing a partial result for each group of which
that process is aware. Second, the partial results are transferred to
the leader via the Gather node. Finally, the leader merges the partial
results and produces the final result for each group.

We are implementing parallel grouping sets in the same way. The only
difference is that in the final stage, the leader performs a grouping
sets aggregation, rather than a normal aggregation.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
grouping sets((c1,c2), (c1), (c2,c3));
                      QUERY PLAN
---------------------------------------------------------
Finalize MixedAggregate
  Output: c1, c2, avg(c3), c3
  Hash Key: t2.c2, t2.c3
  Group Key: t2.c1, t2.c2
  Group Key: t2.c1
  ->  Gather Merge
        Output: c1, c2, c3, (PARTIAL avg(c3))
        Workers Planned: 2
        ->  Sort
              Output: c1, c2, c3, (PARTIAL avg(c3))
              Sort Key: t2.c1, t2.c2
              ->  Partial HashAggregate
                    Output: c1, c2, c3, PARTIAL avg(c3)
                    Group Key: t2.c1, t2.c2, t2.c3
                    ->  Parallel Seq Scan on public.t2
                          Output: c1, c2, c3
(16 rows)

As the partial aggregation can be performed in parallel, we can expect a
speedup if the number of groups seen by the Finalize Aggregate node is
some less than the number of input rows.

For example, for the table provided in the test case within the patch,
running the above query in my Linux box:

# explain analyze select c1, c2, avg(c3) from t2 group by grouping
sets((c1,c2), (c1), (c2,c3)); -- without patch
Planning Time: 0.123 ms
Execution Time: 9459.362 ms

# explain analyze select c1, c2, avg(c3) from t2 group by grouping
sets((c1,c2), (c1), (c2,c3)); -- with patch
Planning Time: 0.204 ms
Execution Time: 1077.654 ms


Very nice. That's pretty much exactly how I imagined it'd work.

But sometimes we may not benefit from this patch. For example, in the
worst-case scenario the number of groups seen by the Finalize Aggregate
node could be as many as the number of input rows which were seen by all
worker processes in the Partial Aggregate stage. This is prone to
happening with this patch, because the group key for Partial Aggregate
is all the columns involved in the grouping sets, such as in the above
query, it is (c1, c2, c3).

So, we have been working on another way to implement parallel grouping
sets.

Implementation 2
================

This work can be found in github branch [2]. As it contains some hacky
codes and a list of TODO items, this is far from a patch. So please
consider it as a PoC.

The idea is instead of performing grouping sets aggregation in Finalize
Aggregate, we perform it in Partial Aggregate.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
grouping sets((c1,c2), (c1));
                         QUERY PLAN
--------------------------------------------------------------
Finalize GroupAggregate
  Output: c1, c2, avg(c3), (gset_id)
  Group Key: t2.c1, t2.c2, (gset_id)
  ->  Gather Merge
        Output: c1, c2, (gset_id), (PARTIAL avg(c3))
        Workers Planned: 2
        ->  Sort
              Output: c1, c2, (gset_id), (PARTIAL avg(c3))
              Sort Key: t2.c1, t2.c2, (gset_id)
              ->  Partial HashAggregate
                    Output: c1, c2, gset_id, PARTIAL avg(c3)
                    Hash Key: t2.c1, t2.c2
                    Hash Key: t2.c1
                    ->  Parallel Seq Scan on public.t2
                          Output: c1, c2, c3
(15 rows)


OK, I'm not sure I understand the point of this - can you give an
example which is supposed to benefit from this? Where does the speedup
came from?
regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Reply via email to