[ https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433883#comment-15433883 ]
ASF GitHub Bot commented on FLINK-3899: --------------------------------------- Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75973261 --- Diff: docs/apis/streaming/windows.md --- @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) - .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); --- End diff -- @fhueske does this look OK for this case? If so, I'll finish things up by adding the Reduce example and add both corresponding Scala examples ```java // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) /* ... */ private static class MyFoldFunction implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { Integer cur = acc.getField(2); return new Tuple3<String, Long, Integer> (acc.getField(0), acc.getField(1), cur + 1); } } private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { public void apply(String s, TimeWindow window, Iterable<Tuple3<String, Long, Integer>> counts, Collector<Tuple3<String, Long, Integer>> out) { out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(), counts.iterator().next().getField(2)); } } ``` I found that I had to have the `FoldFunction` include `Tuple3` in its signature since the `WindowFunction` must be of the form `WindowFunction<ACC, ACC, K, W>` according to [here](https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java) > Document window processing with Reduce/FoldFunction + WindowFunction > -------------------------------------------------------------------- > > Key: FLINK-3899 > URL: https://issues.apache.org/jira/browse/FLINK-3899 > Project: Flink > Issue Type: Improvement > Components: Documentation, Streaming > Affects Versions: 1.1.0 > Reporter: Fabian Hueske > > The streaming documentation does not describe how windows can be processed > with FoldFunction or ReduceFunction and a subsequent WindowFunction. This > combination allows for eager window aggregation (only a single element is > kept in the window) and access of the Window object, e.g., to have access to > the window's start and end time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)