Sounds good to me. But I still need to have some kind of side output (cassandra) that stores the accumulating aggregates on each time scale (minute, hour). Thus I would need to have something like this
var hourly = stream.window(1.hour).apply(..) //write to cassandra hourly.trigger(accumulating).addSink(cassandra) //forward to next acc step var daily = hourly.trigger(discarding).window(1.day).apply(…) //write to cassandra daily.trigger(accumulating).addSink(cassandra) Would this be possible? best, Stephan > On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing > List archive.] <ml-node+s2336050n10294...@n4.nabble.com> wrote: > > You can implement discarding behaviour by writing a custom trigger (based on > EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you > could maybe implement a cascade of windows where the first aggregates for the > smallest time interval and is discarding and where the other triggers take > these "pre-aggregated" values and accumulate. > > On Tue, 22 Nov 2016 at 08:11 Stephan Epping <[hidden email] > <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0>> wrote: > Hey Aljoscha, > > the first solution did not work out as expected. As when late elements arrive > the first window is triggered again and would emit a new (accumulated) event, > that would be counted twice (in time accumulation and late accumulation) in > the second window.I could implement my own (discarding strategy) like in > Apache Beam, but the out stream should contain accumulated events that are > stored in cassandra. The second solution just gave an compiler error, thus I > think is not possible right now. > > best Stephan > > > >> On 21 Nov 2016, at 17:56, Aljoscha Krettek <[hidden email] >> <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1>> wrote: >> >> Hi, >> why did you settle for the last solution? >> >> Cheers, >> Aljoscha >> >> On Thu, 17 Nov 2016 at 15:57 kaelumania <[hidden email] >> <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2>> wrote: >> Hi Fabian, >> >> your proposed solution for: >> >> Multiple window aggregations >> You can construct a data flow of cascading window operators and fork off (to >> emit or further processing) the result after each window. >> >> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ... >> \-> out_1 \-> out_2 \-> out_3 >> does not work, am I missing something? >> >> First I tried the following >> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new >> StrictWatermarkAssigner()); // force lateness >> >> DataStream<ReadingAggregate> aggregatesPerMinute = values >> .keyBy("id") >> .timeWindow(Time.minutes(1)) >> .allowedLateness(Time.minutes(2)) >> .apply(new ReadingAggregate(), new AggregateReadings(), new >> AggregateReadings()); >> >> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute >> .keyBy("id") >> .timeWindow(Time.hours(1)) >> .allowedLateness(Time.hours(2)) >> .apply(new AggregateReadingAggregates(), new >> AggregateReadingAggregates()); >> but due to late data the first fold function would emit 2 rolling aggregates >> (one with and one without the late element), which results in being counted >> twice within the second reducer. Therefore i tried >> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input >> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // >> force lateness >> .keyBy("id") >> .timeWindow(Time.minutes(1)) >> .allowedLateness(Time.hours(2)); >> >> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = >> readingsPerMinute >> .timeWindow(Time.hours(1)) >> .allowedLateness(Time.hours(2)); >> >> DataStream<ReadingAggregate> aggregatesPerMinute = >> readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new >> AggregateReadings()); >> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new >> ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); >> which gives me a compiler error as WindowedStream does not provide a >> timeWindow method. >> >> Finally I settled with this: >> KeyedStream<Reading, Tuple> readings = input >> .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // >> force lateness >> .keyBy("id"); >> >> DataStream<ReadingAggregate> aggregatesPerMinute = readings >> .timeWindow(Time.minutes(1)) >> .allowedLateness(Time.hours(2)) >> .apply(new ReadingAggregate(), new AggregateReadings(), new >> AggregateReadings()); >> >> DataStream<ReadingAggregate> aggregatesPerHour = readings >> .timeWindow(Time.hours(1)) >> .allowedLateness(Time.hours(2)) >> .apply(new ReadingAggregate(), new AggregateReadings(), new >> AggregateReadings()); >> >> >> Feedback is very welcome. >> >> best, Stephan >> >> >> >> >>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing >>> List archive.] <[hidden email] >>> <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote: >>> >> >>> Hi Stephan, >>> >>> I just wrote an answer to your SO question. >>> >>> Best, Fabian >> >>> >>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a >>> href="x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0 <>" >>> target="_top" rel="nofollow" link="external" class="">[hidden email]>: >> >>> >>> Hello, >>> >>> I found this question in the Nabble archive >>> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html >>> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>) >>> but was unable/dont know how to reply. >>> >>> Here is my question regarding the mentioned thread: >>> >>>> Hello, >>>> >>>> I have similar requirements (see StackOverflor >>>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data >>>> >>>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>). >>>> I am pretty new to flink, could you elaborate on a possible solution? We >>>> can guarantee good ordering by sensor_id, thus watermarking by key would >>>> be the only reasonable way for us >>>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my >>>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... >>>> per key? Or maybe using custom state plus a custom trigger? What happens >>>> if a sensor dies or is being removed completely, how can this be detected >>>> as watermarks would be ignored for window garbage collection. Or could we >>>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs. >>> >>> >>> Thanks, >>> Stephan >>> >>> >> >>> If you reply to this email, your message will be added to the discussion >>> below: >> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html >>> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html> >>> To unsubscribe from Maintaining watermarks per key, instead of per operator >>> instance, click here <>. >>> NAML >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> View this message in context: Re: Maintaining watermarks per key, instead of >> per operator instance >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html> >> Sent from the Apache Flink User Mailing List archive. mailing list archive >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at >> Nabble.com <http://nabble.com/>. > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html > > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html> > To unsubscribe from Maintaining watermarks per key, instead of per operator > instance, click here > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.