Hi aitozi,
I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow. {code} process(Record records) { for (Record record : records) ( if (!isFilter(record)) { agg(record); } } } {code} Where the isFilter() is to filter the duplicated records, and the agg() is the function to do aggregation, in your case that means the count(). In general, the isFilter() can be implemented base on the MapState<String, Integer> to store the previous records, so the isFilter() may look like. {code} boolean isFilter(Record record) { Integer oldVal = mapState.get(record); if (oldVal == null) { mapState.put(record, 1L); return false; } else { mapState.put(record, oldVal + 1L); return true; } } {code} as you can see, we need to query the state frequently, one way with better performance is to the use BloomFilter to implement the isFilter() but with an approximate result(the accuracy is configurable), unfortunately it's not easy to use the bloom filter in flink, there are some works need to do to introduce it (https://issues.apache.org/jira/browse/FLINK-8601). Best, Sihua On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> wrote: Hi, community I am using flink to deal with some situation. 1. "distinct count" to calculate the uv/pv. 2. calculate the topN of the past 1 hour or 1 day time. Are these all realized by window? Or is there a best practice on doing this? 3. And when deal with the distinct, if there is no need to do the keyBy previous, how does the window deal with this. Thanks Aitozi. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/