Can you also go into more detail why you think 1) is more challenging to implement?
On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote: > From my limited understanding, would not the stateful combinefn option > require observing the whole input before being able combine and the risk of > blowing memory is actually very high except for trivial inputs? > > On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com> wrote: > >> Hi everyone, >> Currently BeamSQL does not support DISTINCT aggregations. These are >> queries like: >> >> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >> >> These are represented in Calcite's logical plan with a distinct flag on >> aggregation calls, but we ignore the flag when converting to a pipeline. I >> see two different ways that we could support this: >> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP BY >> key> + <DISTINCT expr> to de-dupe values of expr followed by a second GBK >> on just <GROUP BY key> to perform the aggregation. >> 2. Stateful CombineFn - We could implement a version of the combiners >> used for SUM, COUNT, etc [1] that maintain some state that tracks >> previously seen elements and uses it de-deupe new elements. >> >> Of course, something like (1) is much more scalable, but it is also much >> more challenging to implement. While (2) is trivial to implement, but runs >> the risk of blowing up a worker's memory usage. >> >> Personally, I think it could be worthwhile to provide support for >> DISTINCT quickly with approach (2), and implement (1) as an optimization >> later. This combiner's state would be partitioned by key and by window, so >> I think we would be pretty safe from OOM'ing a worker except in some >> extreme cases (long windows, hot keys, very large batch pipelines, ...). >> >> >> But I understand this could be controversial, so I wanted to open it up >> for discussion first: Would it be worthwhile to provide support for >> DISTINCT aggregations quickly with approach #2, or is it better to just >> reject these queries until we can implement them safely with approach #1? >> >> Thanks, >> Brian >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >> >