Hey Aljoscha, that sounds very promising, awesome! Though, I still would need to implement my own window management logic (window assignment and window state purging), right? I was thinking about reusing some of the existing components (TimeWindow) and WindowAssigner, but run my own WindowOperator (aka ProcessFunction). But I am not sure, if that is done easily. I would love to hear your opinion on that, and what the tricky parts will be? For example, common mistakes you experienced in developing the windowing mechanism.
best Stephan > On 14 Nov 2016, at 19:05, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi Stephan, > I was going to suggest that using a flatMap and tracking the timestamp of > each key yourself is a bit like having a per-key watermark. I wanted to wait > a bit before answering because I'm currently working on a new type of > Function that will be release with Flink 1.2: ProcessFunction. This is > somewhat like a FlatMap but also allows to access the element timestamp, > query current processing time/event time and set (per key) timers for > processing time and event time. With this you should be able to easily > implement your per-key tracking, I hope. > > Cheers, > Aljoscha > > P.S. ProcessFunction is already in the Flink repository but it's called > TimelyFlatMapFunction right now, because I was working on it under that > working title. > > On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de > <mailto:stephan.epp...@zweitag.de>> wrote: > Hey Fabian, > > thank you very much. > > - yes, I would window by event time and fire/purge by processing time > - Cheaper in the end meant, that having too much state in the flink cluster > would be more expensive, as we store all data in cassandra too.I think the > fault tolerance would be okay, as we would make a compare and set with > cassandra. > > With the flatMap Operator wouldn’t it be like running my own windowing > mechanism? I need to keep the aggregate window per sensor open (with > checkpointing and state management) until I receive an element for a sensor > that is later in time than the windows time and then purge the state and emit > a new event (which is like having a watermark per sensor). Further, I need a > timer that fires like after 24 hours, in case a sensor dies and doesn’t send > more data which might is possible with window assigner/trigger, right? But > not inside normal functions, e.g. flatMap? We can guarantee that all sensor > data per sensor comes almost in order (might be out of order within a few > seconds), but there might be gaps of several hours after network partitions. > > There is now way to define/redefine the watermark per keyed stream? Or adjust > the window assigner + trigger to achieve the desired behaviour? I am a bit > reserved in implementing the whole state management. Do you plan to support > such use cases on keyed streams? Maybe the WatermarkAssigner could also > receive information about the key for wich the watermark should be calculated > etc. > > best, Stephan > > > >> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing >> List archive.] <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=10098&i=0>> wrote: >> > >> Hi Stephan, >> >> I'm skeptical about two things: >> - using processing time will result in inaccurately bounded aggregates (or >> do you want to group by event time in a processing time window?) >> - writing to and reading from Cassandra might be expensive (not sure what >> you mean by cheaper in the end) and it is not integrated with Flink's >> checkpointing mechanism for fault-tolerance. >> >> To me, the stateful FlatMapOperator looks like the best approach. There is >> an upcoming feature for registering timers in user-functions, i.e., a >> function is called after the timer exceeds. This could be helpful to >> overcome the problem of closing the window without new data. >> >> Best, >> Fabian > >> >> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a >> href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=0 >> <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=0>" >> target="_top" rel="nofollow" link="external" class="">[hidden email]>: > >> Hello Fabian, >> >> Thank you very much. What is your opinion on the following solution: >> >> - Window data per time window, e.g. 15 minutes >> - using processing time as trigger, e.g. 15 minutes >> - which results in an aggregate over sensor values >> - then use cassandra to select the previous aggregate (as there can be >> multiple for the time window due to processing time) >> - then update the aggregate and put it into a cassandra sink again >> >> The cassandra select will be a bit slower than using an in memory/flink >> state, but will be cheaper in the end. Further, what does this have for >> consequences? >> For example, replaying events will be more difficult, right? Also, what >> about Snapshots? Will they work with the mentioned design? >> >> kind regards, >> Stephan > >>> On 11 Nov 2016, at 00:39, Fabian Hueske <<a >>> href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=1 >>> <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=1>" >>> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote: >>> > >>> Hi Stephan, >>> >>> I just wrote an answer to your SO question. >>> >>> Best, Fabian > >>> >>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a >>> href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=2 >>> <x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=2>" >>> target="_top" rel="nofollow" link="external" class="">[hidden email]>: > >>> >>> Hello, >>> >>> I found this question in the Nabble archive >>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html >>> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) >>> but was unable/dont know how to reply. >>> >>> Here is my question regarding the mentioned thread: >>> >>>> Hello, >>>> >>>> I have similar requirements (see StackOverflor >>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data >>>> >>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). >>>> I am pretty new to flink, could you elaborate on a possible solution? We >>>> can guarantee good ordering by sensor_id, thus watermarking by key would >>>> be the only reasonable way for us >>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my >>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... >>>> per key? Or maybe using custom state plus a custom trigger? What happens >>>> if a sensor dies or is being removed completely, how can this be detected >>>> as watermarks would be ignored for window garbage collection. Or could we >>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs. >>> >>> >>> Thanks, >>> Stephan >>> >>> > >>> >> >> >> >> >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html> >> To unsubscribe from Maintaining watermarks per key, instead of per operator >> instance, click here <>. >> NAML >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > View this message in context: Re: Maintaining watermarks per key, instead of > per operator instance > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com <http://nabble.com/>.