Can you also go into more detail why you think 1) is more challenging to
implement?

On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote:

> From my limited understanding, would not the stateful combinefn option
> require observing the whole input before being able combine and the risk of
> blowing memory is actually very high except for trivial inputs?
>
> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com> wrote:
>
>> Hi everyone,
>> Currently BeamSQL does not support DISTINCT aggregations. These are
>> queries like:
>>
>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>
>> These are represented in Calcite's logical plan with a distinct flag on
>> aggregation calls, but we ignore the flag when converting to a pipeline. I
>> see two different ways that we could support this:
>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP BY
>> key> + <DISTINCT expr> to de-dupe values of expr followed by a second GBK
>> on just <GROUP BY key> to perform the aggregation.
>> 2. Stateful CombineFn - We could implement a version of the combiners
>> used for SUM, COUNT, etc [1] that maintain some state that tracks
>> previously seen elements and uses it de-deupe new elements.
>>
>> Of course, something like (1) is much more scalable, but it is also much
>> more challenging to implement. While (2) is trivial to implement, but runs
>> the risk of blowing up a worker's memory usage.
>>
>> Personally, I think it could be worthwhile to provide support for
>> DISTINCT quickly with approach (2), and implement (1) as an optimization
>> later. This combiner's state would be partitioned by key and by window, so
>> I think we would be pretty safe from OOM'ing a worker except in some
>> extreme cases (long windows, hot keys, very large batch pipelines, ...).
>>
>>
>> But I understand this could be controversial, so I wanted to open it up
>> for discussion first: Would it be worthwhile to provide support for
>> DISTINCT aggregations quickly with approach #2, or is it better to just
>> reject these queries until we can implement them safely with approach #1?
>>
>> Thanks,
>> Brian
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>
>

Reply via email to