Hi aitozi,

I think it can be implemented by window or non-window, but it can not be 
implemented without keyBy(). A general approach to implement this is as follow.


{code}
process(Record records) {
    for (Record record : records) (
        if (!isFilter(record)) {
            agg(record); 
        }
    }
}
{code}


Where the isFilter() is to filter the duplicated records, and the agg() is the 
function to do aggregation, in your case that means the count().


In general, the isFilter() can be implemented base on the MapState<String, 
Integer> to store the previous records, so the isFilter() may look like.


{code}
boolean isFilter(Record record) {
    Integer oldVal = mapState.get(record);
    if (oldVal == null) {
        mapState.put(record, 1L);
        return false;
    } else {
        mapState.put(record, oldVal + 1L);
        return true;
    }
}
{code}


as you can see, we need to query the state frequently, one way with better 
performance is to the use BloomFilter to implement the isFilter() but with an 
approximate result(the accuracy is configurable), unfortunately it's not easy 
to use the bloom filter in flink, there are some works need to do to introduce 
it (https://issues.apache.org/jira/browse/FLINK-8601).


Best, Sihua
On 06/27/2018 17:12,aitozi<gjying1...@gmail.com> wrote:
Hi, community

I am using flink to deal with some situation.

1. "distinct count" to calculate the uv/pv.
2.  calculate the topN of the past 1 hour or 1 day time.

Are these all realized by window? Or is there a best practice on doing this?

3. And when deal with the distinct, if there is no need to do the keyBy
previous, how does the window deal with this.

Thanks
Aitozi.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to