Hi ,

Stream distinct accumulator is actually supported in SQL API [1]. The
syntax is pretty much identical to the batch case. A simple example using
the tumbling window will be.

> SELECT COUNT(DISTINCT col)
> FROM t
> GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE)

I haven't added the support but I think it should be easy to support this
in table API as well [2].

For TopN I think this could be implemented as a UDF[3], there's also an
example in the test utility from the Flink repo [4] that might be a good
example.

In terms of the aggregation strategy, I believe "window" is not necessarily
needed if your data sink can support retraction / upsert, I think @Fabian
or @Timo might have more context here.

Thanks,
Rong

[1] https://issues.apache.org/jira/browse/FLINK-8688
[2] https://issues.apache.org/jira/browse/FLINK-8691
[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions
[4]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedAggFunctions.scala

On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <summerle...@163.com> wrote:

> Hi aitozi,
>
> I think it can be implemented by window or non-window, but it can not be
> implemented without keyBy(). A general approach to implement this is as
> follow.
>
> {code}
> process(Record records) {
>     for (Record record : records) (
>         if (!isFilter(record)) {
>             agg(record);
>         }
>     }
> }
> {code}
>
> Where the isFilter() is to filter the duplicated records, and the agg() is
> the function to do aggregation, in your case that means the count().
>
> In general, the isFilter() can be implemented base on the MapState<String,
> Integer> to store the previous records, so the isFilter() may look like.
>
> {code}
> boolean isFilter(Record record) {
>     Integer oldVal = mapState.get(record);
>     if (oldVal == null) {
>         mapState.put(record, 1L);
>         return false;
>     } else {
>         mapState.put(record, oldVal + 1L);
>         return true;
>     }
> }
> {code}
>
> as you can see, we need to query the state frequently, one way with
> better performance is to the use BloomFilter to implement the isFilter()
> but with an approximate result(the accuracy is configurable),
> unfortunately it's not easy to use the bloom filter in flink, there are
> some works need to do to introduce it (
> https://issues.apache.org/jira/browse/FLINK-8601).
>
> Best, Sihua
> On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> <gjying1...@gmail.com>
> wrote:
>
> Hi, community
>
> I am using flink to deal with some situation.
>
> 1. "distinct count" to calculate the uv/pv.
> 2.  calculate the topN of the past 1 hour or 1 day time.
>
> Are these all realized by window? Or is there a best practice on doing
> this?
>
> 3. And when deal with the distinct, if there is no need to do the keyBy
> previous, how does the window deal with this.
>
> Thanks
> Aitozi.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Reply via email to