Ahmet -
I think it would only require observing each key's partition of the input
independently, and the size of the state would only be proportional to the
number of distinct elements, not the entire input. Note the pipeline would
be a GBK with a key based on the GROUP BY, followed by a
Combined.GroupedValue with a (possibly very stateful) CombineFn.

Luke -
Here's A little background on why I think (1) is harder (It may also just
be that it looks daunting to me as someone who's not that familiar with the
code).

An aggregation node can have multiple aggregations. So, for example, the
query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...` would
yield a logical plan that has a single aggregation node with three
different aggregations. We then take that node and build up a CombineFn
that is a composite of all of the aggregations we need to make a combining
PTransform [1]. To implement (1) we would need to distinguish between all
the DISTINCT and non-DISTINCT aggregations, and come up with a way to unify
the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.

That's certainly not unsolvable, but approach (2) is much simpler - it just
requires implementing some variations on the CombineFn's that already exist
[2] and re-using the existing logic for converting an aggregation node to a
combining PTransform.


Hopefully that makes sense, let me know if I need to clarify further :)
Brian

[1]
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
[2]
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48


On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote:

> Can you also go into more detail why you think 1) is more challenging to
> implement?
>
> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote:
>
>> From my limited understanding, would not the stateful combinefn option
>> require observing the whole input before being able combine and the risk of
>> blowing memory is actually very high except for trivial inputs?
>>
>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Hi everyone,
>>> Currently BeamSQL does not support DISTINCT aggregations. These are
>>> queries like:
>>>
>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>
>>> These are represented in Calcite's logical plan with a distinct flag on
>>> aggregation calls, but we ignore the flag when converting to a pipeline. I
>>> see two different ways that we could support this:
>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP
>>> BY key> + <DISTINCT expr> to de-dupe values of expr followed by a second
>>> GBK on just <GROUP BY key> to perform the aggregation.
>>> 2. Stateful CombineFn - We could implement a version of the combiners
>>> used for SUM, COUNT, etc [1] that maintain some state that tracks
>>> previously seen elements and uses it de-deupe new elements.
>>>
>>> Of course, something like (1) is much more scalable, but it is also much
>>> more challenging to implement. While (2) is trivial to implement, but runs
>>> the risk of blowing up a worker's memory usage.
>>>
>>> Personally, I think it could be worthwhile to provide support for
>>> DISTINCT quickly with approach (2), and implement (1) as an optimization
>>> later. This combiner's state would be partitioned by key and by window, so
>>> I think we would be pretty safe from OOM'ing a worker except in some
>>> extreme cases (long windows, hot keys, very large batch pipelines, ...).
>>>
>>>
>>> But I understand this could be controversial, so I wanted to open it up
>>> for discussion first: Would it be worthwhile to provide support for
>>> DISTINCT aggregations quickly with approach #2, or is it better to just
>>> reject these queries until we can implement them safely with approach #1?
>>>
>>> Thanks,
>>> Brian
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>
>>

Reply via email to