I think you would need something like this: var hourlyDiscarding = stream .window(1.hour) .trigger(discarding) .apply(..)
//write to cassandra hourlyDiscarding .window(1.hour) .trigger(accumulating) .apply(..) .addSink(cassandra) //forward to next acc step var daily = hourlyDiscarding .window(1.day) .trigger(accumulating) .apply(…) //write to cassandra daily.addSink(cassandra) The decision between accumulating/discarding happens at the point where the window is defined, not downstream (this is the same as in Beam). On Wed, 23 Nov 2016 at 11:37 kaelumania <stephan.epp...@zweitag.de> wrote: > Sounds good to me. But I still need to have some kind of side output > (cassandra) that stores the accumulating aggregates on each time scale > (minute, hour). Thus I would need to have something like this > > var hourly = stream.window(1.hour).apply(..) > //write to cassandra > hourly.trigger(accumulating).addSink(cassandra) > //forward to next acc step > var daily = hourly.trigger(discarding).window(1.day).apply(…) > //write to cassandra > daily.trigger(accumulating).addSink(cassandra) > > Would this be possible? > > best, Stephan > > On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing > List archive.] <[hidden email] > <http:///user/SendEmail.jtp?type=node&node=10295&i=0>> wrote: > > You can implement discarding behaviour by writing a custom trigger (based > on EventTimeTrigger) that returns FIRE_AND_PURGE when firing. With this you > could maybe implement a cascade of windows where the first aggregates for > the smallest time interval and is discarding and where the other triggers > take these "pre-aggregated" values and accumulate. > > On Tue, 22 Nov 2016 at 08:11 Stephan Epping <<a > href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0" > target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote: > > Hey Aljoscha, > > the first solution did not work out as expected. As when late elements > arrive the first window is triggered again and would emit a new > (accumulated) event, that would be counted twice (in time accumulation and > late accumulation) in the second window.I could implement my own > (discarding strategy) like in Apache Beam, but the out stream should > contain accumulated events that are stored in cassandra. The second > solution just gave an compiler error, thus I think is not possible right > now. > > best Stephan > > On 21 Nov 2016, at 17:56, Aljoscha Krettek <<a > href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1" > target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote: > > Hi, > why did you settle for the last solution? > > Cheers, > Aljoscha > > On Thu, 17 Nov 2016 at 15:57 kaelumania <<a > href="x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2" > target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote: > > Hi Fabian, > > your proposed solution for: > > > 1. Multiple window aggregations > > You can construct a data flow of cascading window operators and fork off > (to emit or further processing) the result after each window. > > Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ... > \-> out_1 \-> out_2 \-> out_3 > > does not work, am I missing something? > > First I tried the following > > DataStream<Reading> values = input.assignTimestampsAndWatermarks(new > StrictWatermarkAssigner()); // force lateness > > DataStream<ReadingAggregate> aggregatesPerMinute = values > .keyBy("id") > .timeWindow(Time.minutes(1)) > .allowedLateness(Time.minutes(2)) > .apply(new ReadingAggregate(), new AggregateReadings(), new > AggregateReadings()); > > DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute > .keyBy("id") > .timeWindow(Time.hours(1)) > .allowedLateness(Time.hours(2)) > .apply(new AggregateReadingAggregates(), new > AggregateReadingAggregates()); > > but due to late data the first fold function would emit 2 rolling > aggregates (one with and one without the late element), which results in > being counted twice within the second reducer. Therefore i tried > > WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input > .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // > force lateness > .keyBy("id") > .timeWindow(Time.minutes(1)) > .allowedLateness(Time.hours(2)); > > WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = > readingsPerMinute > .timeWindow(Time.hours(1)) > .allowedLateness(Time.hours(2)); > > DataStream<ReadingAggregate> aggregatesPerMinute = > readingsPerMinute.apply(new ReadingAggregate(), new AggregateReadings(), new > AggregateReadings()); > DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new > ReadingAggregate(), new AggregateReadings(), new AggregateReadings()); > > which gives me a compiler error as WindowedStream does not provide a > timeWindow method. > > Finally I settled with this: > > KeyedStream<Reading, Tuple> readings = input > .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // > force lateness > .keyBy("id"); > > DataStream<ReadingAggregate> aggregatesPerMinute = readings > .timeWindow(Time.minutes(1)) > .allowedLateness(Time.hours(2)) > .apply(new ReadingAggregate(), new AggregateReadings(), new > AggregateReadings()); > > DataStream<ReadingAggregate> aggregatesPerHour = readings > .timeWindow(Time.hours(1)) > .allowedLateness(Time.hours(2)) > .apply(new ReadingAggregate(), new AggregateReadings(), new > AggregateReadings()); > > > > Feedback is very welcome. > > best, Stephan > > > > On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing > List archive.] <[hidden email] > <http://user/SendEmail.jtp?type=node&node=10179&i=0>> wrote: > > Hi Stephan, > > I just wrote an answer to your SO question. > > Best, Fabian > > > 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href=" > x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0" > target="_top" rel="nofollow" link="external" class="">[hidden email]>: > > > Hello, > > I found this question in the Nabble archive ( > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html) > but was unable/dont know how to reply. > > Here is my question regarding the mentioned thread: > > Hello, > > I have similar requirements (see StackOverflor > http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data). > I am pretty new to flink, could you elaborate on a possible solution? We > can guarantee good ordering by sensor_id, thus watermarking by key would be > the only reasonable way for us ( > *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do > my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*... > per key? Or maybe using custom state plus a custom trigger? What happens if > a sensor dies or is being removed completely, how can this be detected as > watermarks would be ignored for window garbage collection. Or could we > dynamically schedule a job of each sensor? Which would result in 1000 Jobs. > > > Thanks, > Stephan > > > If you reply to this email, your message will be added to the discussion > below: > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html > > To unsubscribe from Maintaining watermarks per key, instead of per > operator instance, click here. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > ------------------------------ > View this message in context: Re: Maintaining watermarks per key, instead > of per operator instance > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com <http://nabble.com/>. > > If you reply to this email, your message will be added to the discussion > below: > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html > > To unsubscribe from Maintaining watermarks per key, instead of per > operator instance, click here. > NAML > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > > > ------------------------------ > View this message in context: Re: Maintaining watermarks per key, instead > of per operator instance > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >