Hi Stephan, I was going to suggest that using a flatMap and tracking the timestamp of each key yourself is a bit like having a per-key watermark. I wanted to wait a bit before answering because I'm currently working on a new type of Function that will be release with Flink 1.2: ProcessFunction. This is somewhat like a FlatMap but also allows to access the element timestamp, query current processing time/event time and set (per key) timers for processing time and event time. With this you should be able to easily implement your per-key tracking, I hope.
Cheers, Aljoscha P.S. ProcessFunction is already in the Flink repository but it's called TimelyFlatMapFunction right now, because I was working on it under that working title. On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de> wrote: > Hey Fabian, > > thank you very much. > > - yes, I would window by event time and fire/purge by processing time > - Cheaper in the end meant, that having too much state in the flink > cluster would be more expensive, as we store all data in cassandra too.I > think the fault tolerance would be okay, as we would make a compare and set > with cassandra. > > With the flatMap Operator wouldn’t it be like running my own windowing > mechanism? I need to keep the aggregate window per sensor open (with > checkpointing and state management) until I receive an element for a sensor > that is later in time than the windows time and then purge the state and > emit a new event (which is like having a watermark per sensor). Further, I > need a timer that fires like after 24 hours, in case a sensor dies and > doesn’t send more data which might is possible with window > assigner/trigger, right? But not inside normal functions, e.g. flatMap? We > can guarantee that all sensor data per sensor comes almost in order (might > be out of order within a few seconds), but there might be gaps of several > hours after network partitions. > > There is now way to define/redefine the watermark per keyed stream? Or > adjust the window assigner + trigger to achieve the desired behaviour? I am > a bit reserved in implementing the whole state management. Do you plan to > support such use cases on keyed streams? Maybe the WatermarkAssigner could > also receive information about the key for wich the watermark should be > calculated etc. > > best, Stephan > > > On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing > List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=10098&i=0>> wrote: > > Hi Stephan, > > I'm skeptical about two things: > - using processing time will result in inaccurately bounded aggregates (or > do you want to group by event time in a processing time window?) > - writing to and reading from Cassandra might be expensive (not sure what > you mean by cheaper in the end) and it is not integrated with Flink's > checkpointing mechanism for fault-tolerance. > > To me, the stateful FlatMapOperator looks like the best approach. There is > an upcoming feature for registering timers in user-functions, i.e., a > function is called after the timer exceeds. This could be helpful to > overcome the problem of closing the window without new data. > > Best, > Fabian > > > 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a > href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=0" > target="_top" rel="nofollow" link="external" class="">[hidden email]>: > > Hello Fabian, > > Thank you very much. What is your opinion on the following solution: > > - Window data per time window, e.g. 15 minutes > - using processing time as trigger, e.g. 15 minutes > - which results in an aggregate over sensor values > - then use cassandra to select the previous aggregate (as there can be > multiple for the time window due to processing time) > - then update the aggregate and put it into a cassandra sink again > > The cassandra select will be a bit slower than using an in memory/flink > state, but will be cheaper in the end. Further, what does this have for > consequences? > For example, replaying events will be more difficult, right? Also, what > about Snapshots? Will they work with the mentioned design? > > kind regards, > Stephan > > On 11 Nov 2016, at 00:39, Fabian Hueske <<a > href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=1" > target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote: > > Hi Stephan, > > I just wrote an answer to your SO question. > > Best, Fabian > > > 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a > href="x-msg://10/user/SendEmail.jtp?type=node&node=10094&i=2" > target="_top" rel="nofollow" link="external" class="">[hidden email]>: > > > Hello, > > I found this question in the Nabble archive ( > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html) > but was unable/dont know how to reply. > > Here is my question regarding the mentioned thread: > > Hello, > > I have similar requirements (see StackOverflor > http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data). > I am pretty new to flink, could you elaborate on a possible solution? We > can guarantee good ordering by sensor_id, thus watermarking by key would be > the only reasonable way for us ( > *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do > my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*... > per key? Or maybe using custom state plus a custom trigger? What happens if > a sensor dies or is being removed completely, how can this be detected as > watermarks would be ignored for window garbage collection. Or could we > dynamically schedule a job of each sensor? Which would result in 1000 Jobs. > > > Thanks, > Stephan > > > > > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html > To unsubscribe from Maintaining watermarks per key, instead of per > operator instance, click here. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > > ------------------------------ > View this message in context: Re: Maintaining watermarks per key, instead > of per operator instance > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >