Hi , Stream distinct accumulator is actually supported in SQL API [1]. The syntax is pretty much identical to the batch case. A simple example using the tumbling window will be.
> SELECT COUNT(DISTINCT col) > FROM t > GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE) I haven't added the support but I think it should be easy to support this in table API as well [2]. For TopN I think this could be implemented as a UDF[3], there's also an example in the test utility from the Flink repo [4] that might be a good example. In terms of the aggregation strategy, I believe "window" is not necessarily needed if your data sink can support retraction / upsert, I think @Fabian or @Timo might have more context here. Thanks, Rong [1] https://issues.apache.org/jira/browse/FLINK-8688 [2] https://issues.apache.org/jira/browse/FLINK-8691 [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions [4] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedAggFunctions.scala On Wed, Jun 27, 2018 at 7:22 AM sihua zhou <summerle...@163.com> wrote: > Hi aitozi, > > I think it can be implemented by window or non-window, but it can not be > implemented without keyBy(). A general approach to implement this is as > follow. > > {code} > process(Record records) { > for (Record record : records) ( > if (!isFilter(record)) { > agg(record); > } > } > } > {code} > > Where the isFilter() is to filter the duplicated records, and the agg() is > the function to do aggregation, in your case that means the count(). > > In general, the isFilter() can be implemented base on the MapState<String, > Integer> to store the previous records, so the isFilter() may look like. > > {code} > boolean isFilter(Record record) { > Integer oldVal = mapState.get(record); > if (oldVal == null) { > mapState.put(record, 1L); > return false; > } else { > mapState.put(record, oldVal + 1L); > return true; > } > } > {code} > > as you can see, we need to query the state frequently, one way with > better performance is to the use BloomFilter to implement the isFilter() > but with an approximate result(the accuracy is configurable), > unfortunately it's not easy to use the bloom filter in flink, there are > some works need to do to introduce it ( > https://issues.apache.org/jira/browse/FLINK-8601). > > Best, Sihua > On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> <gjying1...@gmail.com> > wrote: > > Hi, community > > I am using flink to deal with some situation. > > 1. "distinct count" to calculate the uv/pv. > 2. calculate the topN of the past 1 hour or 1 day time. > > Are these all realized by window? Or is there a best practice on doing > this? > > 3. And when deal with the distinct, if there is no need to do the keyBy > previous, how does the window deal with this. > > Thanks > Aitozi. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > >