Hi Felipe, I am not sure the algorithm requires to construct a new extension of the window operator. I think your implementation of the CountMinSketch object as an aggregator: E.g. 1. AggregateState (ACC) should be the aggregating accumulate count-min-sketch 2-D hash array (plus a few other needed fields). 2. accumulate method just simply do the update. 3. getResult simply get the frequency from sketch.
Thus you will not need to use a customized ValueStateDescriptor. But I agree that maybe it is a good idea to support a class of use cases that requires approximate aggregate state (like HyperLogLog?), this might've been a good value add in my opinion. I think some further discussion is needed if we are going down that path. Do you think the original FLINK-2147 <https://issues.apache.org/jira/browse/FLINK-2147> JIRA ticket is a good place to carry out that discussion? We can probably continue there or create a new JIRA for discussion. -- Rong On Wed, Apr 24, 2019 at 1:32 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hi Rong, > > thanks for your reply. I guess I already did something regarding what you > have told to me. I have one example on this application [1], which uses > this state [2] and computes a CountMinSketch [3]. > > I am seeking how to implement my own operator over a window in order to > have more fine-grained control over it and learn with it. And hopefully, > building a path to contribute to Flink in the future [4]. > > [1] > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69 > [2] > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182 > [3] > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java > [4] https://issues.apache.org/jira/browse/FLINK-2147 > > Best, > Felipe > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Wed, Apr 24, 2019 at 2:06 AM Rong Rong <walter...@gmail.com> wrote: > >> Hi Felipe, >> >> In a short glance, the question can depend on how your window is (is >> there any overlap like sliding window) and how many data you would like to >> process. >> >> In general, you can always buffer all the data into a ListState and apply >> your window function by iterating through all those buffered elements [1]. >> Provided that the data size is small enough to be hold efficiently in the >> state-backend. >> If this algorithm has some sort of pre-aggregation that can simplify the >> buffering through an incremental, orderless aggregation, you can also think >> about using [2]. >> With these two approaches, you do not necessarily need to implement your >> own window operator (extending window operator can be tricky), and you also >> have access to the internal state [3]. >> >> Hope these helps your exploration. >> >> Thanks, >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >> [2] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation >> [3] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >> >> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi, >>> >>> I want to implement my own operator that computes the Count-Min Sketch >>> over a window in Flink. Then, I found this Jira issue [1] >>> <https://issues.apache.org/jira/browse/FLINK-2147> which is exactly >>> what I want. I believe that I have to work out my skills to arrive at a >>> mature solution. >>> >>> So, the first thing that comes to my mind is to create my custom >>> operator like the AggregateApplyWindowFunction [2] >>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html>. >>> Through this I can create the summary of my data over a window. >>> >>> Also, I found this custom JoinOperator example by Till Rohrmann [3] >>> <https://github.com/tillrohrmann/custom-join> which I think I can base >>> my implementation since it is done over a DataStream. >>> >>> What are your suggestions to me in order to start to implement a custom >>> stream operator which computes Count-Min Sketch? Do you have any custom >>> operator over window/keyBy that I can learn with the source code? >>> >>> ps.: I have implemented (looking at Blink source code) this a custom >>> Combiner [4] >>> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java> >>> (map-combiner-reduce) operator. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-2147 >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html >>> [3] https://github.com/tillrohrmann/custom-join >>> [4] >>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java >>> >>> Thanks, >>> Felipe >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>