[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981414#comment-15981414
 ] 

Gábor Hermann commented on FLINK-2147:
--------------------------------------

I would prefer emitting updates on a window basis, as a Flink have quite rich 
options for triggering. With overpartitioning, there could be _count-min sketch 
partitions_ (CMS-partitions), more than the number of partitions (i.e. 
subtasks). We could assign the CMS-partition to the input data (based on 
hashing) and keyBy on CMS-partition. Then, we could fold over the CMS-partition 
(a compact array), which is (AFAIK) internally stored as a keyed state. This 
way, we would not keep state for every key separately (saving memory), while 
allowing scaling operators (inc./dec. parallelism). Does that make sense?

Using windows makes easier to define _when to delete old data_ and _when to 
emit results_ and deal with _out-of-orderness_. However, with windows there's 
slightly more memory overhead compared to e.g. storing one count-min sketch 
array per partition.

A question is then *what API should we provide?* The user could specify the 
key, window assigner, trigger, evictor, allowedLateness, and the count-min 
sketch properties (size, hash functions). Then, the window could be translated 
into a another window keyed by the CMS-partition (as I described). But should 
it be a simply function that takes a DataStream as input and returns a 
DataStream with the results? Or should we add DataStream a special 
countMinSketch function to KeyedDataStream?

Alternatively, we could implement count-min sketch without windows. The user 
would specify two streams: one queries and the other writes the count-min 
sketch. So the "triggering" is done by a stream. The problem is then how do we 
specify when to delete old data and how to deal with out-of-orderness?

Another question is *where could we place the API?* In flink-streaming-java 
module? Or flink-streaming-contrib? This, of course, highly depends on what API 
we would provide.

> Approximate calculation of frequencies in data streams
> ------------------------------------------------------
>
>                 Key: FLINK-2147
>                 URL: https://issues.apache.org/jira/browse/FLINK-2147
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Gabor Gevay
>              Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to