Hi Fabian,
your proposed solution for:
Multiple window aggregations
You can construct a data flow of cascading window operators and fork off (to
emit or further processing) the result after each window.
Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
\-> out_1 \-> out_2 \-> out_3
does not work, am I missing something?
First I tried the following
DataStream<Reading> values = input.assignTimestampsAndWatermarks(new
StrictWatermarkAssigner()); // force lateness
DataStream<ReadingAggregate> aggregatesPerMinute = values
.keyBy("id")
.timeWindow(Time.minutes(1))
.allowedLateness(Time.minutes(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new
AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
.keyBy("id")
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2))
.apply(new AggregateReadingAggregates(), new
AggregateReadingAggregates());
but due to late data the first fold function would emit 2 rolling aggregates
(one with and one without the late element), which results in being counted
twice within the second reducer. Therefore i tried
WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force
lateness
.keyBy("id")
.timeWindow(Time.minutes(1))
.allowedLateness(Time.hours(2));
WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2));
DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
which gives me a compiler error as WindowedStream does not provide a timeWindow
method.
Finally I settled with this:
KeyedStream<Reading, Tuple> readings = input
.assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force
lateness
.keyBy("id");
DataStream<ReadingAggregate> aggregatesPerMinute = readings
.timeWindow(Time.minutes(1))
.allowedLateness(Time.hours(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new
AggregateReadings());
DataStream<ReadingAggregate> aggregatesPerHour = readings
.timeWindow(Time.hours(1))
.allowedLateness(Time.hours(2))
.apply(new ReadingAggregate(), new AggregateReadings(), new
AggregateReadings());
Feedback is very welcome.
best, Stephan
> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List
> archive.] <[email protected]> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <[hidden email]
> <x-msg://3/user/SendEmail.jtp?type=node&node=10033&i=0>>:
> Hello,
>
> I found this question in the Nabble archive
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
>> Hello,
>>
>> I have similar requirements (see StackOverflor
>> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
>>
>> <http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
>> I am pretty new to flink, could you elaborate on a possible solution? We
>> can guarantee good ordering by sensor_id, thus watermarking by key would be
>> the only reasonable way for us
>> (sensorData.keyBy('id').timeWindow(1.minute).sum('value')), could I do my
>> own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per
>> key? Or maybe using custom state plus a custom trigger? What happens if a
>> sensor dies or is being removed completely, how can this be detected as
>> watermarks would be ignored for window garbage collection. Or could we
>> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
>
>
>
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator
> instance, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.