NicoK commented on a change in pull request #11845: URL: https://github.com/apache/flink/pull/11845#discussion_r412425045
########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl Review comment: is this nav-id actually the right one? ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall Review comment: TODO: add links to the exercise and the former page ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a Review comment: ```suggestion It is reasonably straightforward, and educational, to do the same thing with a ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. Review comment: ```suggestion * There are several types of ProcessFunctions -- this is a `KeyedProcessFunction`, but there are also `CoProcessFunctions`, `BroadcastProcessFunctions`, etc. ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. Review comment: ```suggestion * A `KeyedProcessFunction` is a kind of `RichFunction`. Being a `RichFunction`, it has access to the `open` and `getRuntimeContext` methods needed for working with managed keyed state. ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: Review comment: ```suggestion `KeyedProcessFunction`. Let us begin by replacing the code above with this: ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events Review comment: out-of-order event vs. event being out of order? ```suggestion Because the fare events can arrive out of order, it will sometimes be necessary to process events ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that Review comment: ```suggestion simultaneously, rather than just two. This implementation supports this by using a `MapState` that ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); Review comment: ```suggestion Tuple3<Long, Long, Float> result = Tuple3.of(driverId, timestamp, sumOfTips); ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same Review comment: ```suggestion * This example uses a `MapState` where the keys are timestamps, and sets a `Timer` for that same ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). Review comment: you used this spelling a few lines down; I also wouldn't see "TimeWindow" as a Flink term, so I think, this is more appropriate: ```suggestion produced by the implementation that uses Flink's built-in time windows). ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, Review comment: FYI: I was actually surprised to read (and then see) that these are really only optimized for RocksDB...I thought the MVCC hashmap also extends its reach into the `MapState` on heap, but that doesn't seem to be the case (as you wrote). ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the +same name should have the same type, and will refer to the same side output. + +### Example + +You are now in a position to do something with the late events that were ignored in the previous +section. In the `processElement` method of `PseudoWindow` you can now do this: + +{% highlight java %} +if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; Review comment: performance-wise, you should probably not do it that way though and define the output tag outside, once(!) Since you show both occurrences here, I would recommend showing it correctly. ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services Review comment: All of these are some sort of errors, maybe also mention normal splits in application logic depending on what input type you have or so? ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the +same name should have the same type, and will refer to the same side output. + +### Example + +You are now in a position to do something with the late events that were ignored in the previous +section. In the `processElement` method of `PseudoWindow` you can now do this: + +{% highlight java %} +if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; + ctx.output(lateFares, fare); +} else { + . . . +} +{% endhighlight %} + +And the job can access this side output: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +SingleOutputStreamOperator hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); + +OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; +hourlyTips.getSideOutput(lateFares).print(); +{% endhighlight %} + +{% top %} + +## Closing Remarks + +In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time Review comment: ```suggestion In this example you have seen how a `ProcessFunction` can be used to reimplement a straightforward time ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the Review comment: ```suggestion correspond to the type of the side output's `DataStream`, and they have names. Two `OutputTag`s with the ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the +same name should have the same type, and will refer to the same side output. + +### Example + +You are now in a position to do something with the late events that were ignored in the previous +section. In the `processElement` method of `PseudoWindow` you can now do this: + +{% highlight java %} +if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; + ctx.output(lateFares, fare); +} else { + . . . +} +{% endhighlight %} + +And the job can access this side output: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +SingleOutputStreamOperator hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); + +OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; +hourlyTips.getSideOutput(lateFares).print(); +{% endhighlight %} + +{% top %} + +## Closing Remarks + +In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time +window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and +use it. But if you find yourself considering doing something contorted with Flink's windows, don't +be afraid to roll your own. + +Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on +exercise below provides an example of something completely different. + +Another common use case for ProcessFunctions is for expiring stale state. If you think back to the +[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares), +where a `RichCoFlatMapFunction` is used to compute a simple join, the sample solution assumes that +the TaxiRides and TaxiFares are perfectly matched, one-to-one for each rideId. If an event is lost, Review comment: I'm not sure whether to also put `TaxiFare` and `TaxiRide` into backticks since they append an "s" and that usually renders strangely... ```suggestion the TaxiRides and TaxiFares are perfectly matched, one-to-one for each `rideId`. If an event is lost, ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each Review comment: For a training/hands-on I would actually spell ser/de out, maybe in this form? ```suggestion state backend can append to `ListState` without going through (de)serialization, and for `MapState`, each ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the +same name should have the same type, and will refer to the same side output. + +### Example + +You are now in a position to do something with the late events that were ignored in the previous +section. In the `processElement` method of `PseudoWindow` you can now do this: + +{% highlight java %} +if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; + ctx.output(lateFares, fare); +} else { + . . . +} +{% endhighlight %} + +And the job can access this side output: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +SingleOutputStreamOperator hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); + +OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; +hourlyTips.getSideOutput(lateFares).print(); +{% endhighlight %} + +{% top %} + +## Closing Remarks + +In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time +window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and +use it. But if you find yourself considering doing something contorted with Flink's windows, don't +be afraid to roll your own. + +Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on +exercise below provides an example of something completely different. + +Another common use case for ProcessFunctions is for expiring stale state. If you think back to the +[Rides and Fares Exercise](https://github.com/apache/flink-training/tree/master/rides-and-fares), +where a `RichCoFlatMapFunction` is used to compute a simple join, the sample solution assumes that +the TaxiRides and TaxiFares are perfectly matched, one-to-one for each rideId. If an event is lost, +the other event for the same rideId will be held in state forever. This could instead be implemented Review comment: ```suggestion the other event for the same `rideId` will be held in state forever. This could instead be implemented ``` ########## File path: docs/tutorials/event_driven.md ########## @@ -0,0 +1,298 @@ +--- +title: Event-driven Applications +nav-id: etl +nav-pos: 6 +nav-title: Event-driven Applications +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Process Functions + +### Introduction + +A `ProcessFunction` combines event processing with timers and state, making it a powerful building +block for stream processing applications. This is the basis for creating event-driven applications +with Flink. It is very similar to a `RichFlatMapFunction`, but with the addition of timers. + +### Example + +If you've done the [HourlyTips exercise]() in the [Streaming Analytics]() section, you will recall +that it uses a `TumblingEventTimeWindow` to compute the sum of the tips for each driver during each +hour, like this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .timeWindow(Time.hours(1)) + .process(new AddTips()); +{% endhighlight %} + +It's reasonably straightforward, and educational, to do the same thing with a +`KeyedProcessFunction`. Let's begin by replacing the code above with this: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); +{% endhighlight %} + +In this code snippet a `KeyedProcessFunction` called `PseudoWindow` is being applied to a keyed +stream, the result of which is a `DataStream<Tuple3<Long, Long, Float>>` (the same kind of stream +produced by the implementation that uses Flink's built-in TimeWindows). + +The overall outline of `PseudoWindow` has this shape: + +{% highlight java %} +// Compute the sum of the tips for each driver in hour-long windows. +// The keys are driverIds. +public static class PseudoWindow extends + KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> { + + private final long durationMsec; + + public PseudoWindow(Time duration) { + this.durationMsec = duration.toMilliseconds(); + } + + @Override + // Called once during initialization. + public void open(Configuration conf) { + . . . + } + + @Override + // Called as each fare arrives to be processed. + public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } + + @Override + // Called when the current watermark indicates that a window is now complete. + public void onTimer(long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + . . . + } +} +{% endhighlight %} + +Things to be aware of: + +* There are several types of ProcessFunctions -- this is a KeyedProcessFunction, but there are also + CoProcessFunctions, BroadcastProcessFunctions, etc. + +* A KeyedProcessFunction is a kind of RichFunction. Being a RichFunction, it has access to the open + and getRuntimeContext methods needed for working with managed keyed state. + +* There are two callbacks to implement: `processElement` and `onTimer`. `processElement` is called + with each incoming event; `onTimer` is called when timers fire. These can be either event time or + processing time timers. Both `processElement` and `onTimer` are provided with a context object + that can be used to interact with a `TimerService` (among other things). Both callbacks are also + passed a `Collector` that can be used to emit results. + +#### The `open()` method + +{% highlight java %} +// Keyed, managed state, with an entry for each window, keyed by the window's end time. +// There is a separate MapState object for each driver. +private transient MapState<Long, Float> sumOfTips; + +@Override +public void open(Configuration conf) { + + MapStateDescriptor<Long, Float> sumDesc = + new MapStateDescriptor<>("sumOfTips", Long.class, Float.class); + sumOfTips = getRuntimeContext().getMapState(sumDesc); +} +{% endhighlight %} + +Because the fare events can arrive out-of-order, it will sometimes be necessary to process events +for one hour before having finished computing the results for the previous hour. In fact, if the +watermarking delay is much longer than the window length, then there may be many windows open +simultaneously, rather than just two. This implementation supports this by using `MapState` that +maps the timestamp for the end of each window to the sum of the tips for that window. + +#### The `processElement()` method + +{% highlight java %} +public void processElement( + TaxiFare fare, + Context ctx, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long eventTime = fare.getEventTime(); + TimerService timerService = ctx.timerService(); + + if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + } else { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + // Schedule a callback for when the window has been completed. + timerService.registerEventTimeTimer(endOfWindow); + + // Add this fare's tip to the running total for that window. + Float sum = sumOfTips.get(endOfWindow); + if (sum == null) { + sum = 0.0F; + } + sum += fare.tip; + sumOfTips.put(endOfWindow, sum); + } +} +{% endhighlight %} + +Things to consider: + +* What happens with late events? Events that are behind the watermark (i.e., late) are being + dropped. If you want to do something better than this, consider using a side output, which is + explained in the [next section]({{ site.baseurl }}{% link tutorials/event_driven.md + %}#side-outputs). + +* This example uses `MapState` where the keys are timestamps, and sets a `Timer` for that same + timestamp. This is a common pattern; it makes it easy and efficient to lookup relevant information + when the timer fires. + +#### The `onTimer()` method + +{% highlight java %} +public void onTimer( + long timestamp, + OnTimerContext context, + Collector<Tuple3<Long, Long, Float>> out) throws Exception { + + long driverId = context.getCurrentKey(); + // Look up the result for the hour that just ended. + Float sumOfTips = this.sumOfTips.get(timestamp); + + Tuple3 result = new Tuple3(driverId, timestamp, sumOfTips); + out.collect(result); + this.sumOfTips.remove(timestamp); +} +{% endhighlight %} + +Observations: + +* The `OnTimerContext context` passed in to `onTimer` can be used to determine the current key. + +* Our pseudo-windows are being triggered when the current watermark reaches the end of each hour, at + which point `onTimer` is called. This onTimer method removes the related entry from `sumOfTips`, + which has the effect of making it impossible to accommodate late events. This is the equivalent of + setting the allowedLateness to zero when working with Flink's time windows. + +### Performance Considerations + +Flink provides `MapState` and `ListState` types that are optimized for RocksDB. Where possible, +these should be used instead of a `ValueState` object holding some sort of collection. The RocksDB +state backend can append to `ListState` without going through ser/de, and for `MapState`, each +key/value pair is a separate RocksDB object, so `MapState` can be efficiently accessed and updated. + +{% top %} + +## Side Outputs + +### Introduction + +There are several good reasons to want to have more than one output stream from a Flink operator, such as reporting: + +* exceptions +* malformed events +* late events +* operational alerts, such as timed-out connections to external services + +Side outputs are a convenient way to do this. + +Each side output channel is associated with an `OutputTag<T>`. The tags have generic types that +correspond to the type of the side output's DataStream, and they have names. Two OutputTags with the +same name should have the same type, and will refer to the same side output. + +### Example + +You are now in a position to do something with the late events that were ignored in the previous +section. In the `processElement` method of `PseudoWindow` you can now do this: + +{% highlight java %} +if (eventTime <= timerService.currentWatermark()) { + // This event is late; its window has already been triggered. + OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; + ctx.output(lateFares, fare); +} else { + . . . +} +{% endhighlight %} + +And the job can access this side output: + +{% highlight java %} +// compute the sum of the tips per hour for each driver +SingleOutputStreamOperator hourlyTips = fares + .keyBy((TaxiFare fare) -> fare.driverId) + .process(new PseudoWindow(Time.hours(1))); + +OutputTag<TaxiFare> lateFares = new OutputTag<TaxiFare>("lateFares") {}; +hourlyTips.getSideOutput(lateFares).print(); +{% endhighlight %} + +{% top %} + +## Closing Remarks + +In this example you've seen how a ProcessFunction can be used to reimplement a straightforward time +window. Of course, if Flink's built-in windowing API meets your needs, by all means, go ahead and +use it. But if you find yourself considering doing something contorted with Flink's windows, don't +be afraid to roll your own. + +Also, ProcessFunctions are useful for many other use cases beyond computing analytics. The hands-on Review comment: ```suggestion Also, `ProcessFunctions` are useful for many other use cases beyond computing analytics. The hands-on ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org