[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433078#comment-15433078
 ] 

ASF GitHub Bot commented on FLINK-3899:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2368#discussion_r75895039
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
     the additional meta information that writing a `WindowFunction` provides.
     
     This is an example that shows how incremental aggregation functions can be 
combined with
    -a `WindowFunction`.
    +a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
    +ending event-time of a window of sensor readings that contain a timestamp, 
    +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
    +aggregation (only a single element is kept in the window).
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -DataStream<Tuple2<String, Long>> input = ...;
    +DataStream<SensorReading> input = ...;
     
     // for folding incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
    -    .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
    +    .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myFoldFunction implements 
FoldFunction<SensorReading, Long> {
    +
    +    public Long fold(Long acc, SensorReading s) {
    +        return Math.max(acc, s.timestamp());
    +    }
    +}
    +
    +private static class MyWindowFunction implements WindowFunction<Long, 
Long, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, Iterable<Long> 
timestamps, Collector<Long> out) {
    +            out.collect(timestamps.iterator().next());
    +        }
    +}
     
     // for reducing incremental computation
     input
         .keyBy(<key selector>)
         .window(<window assigner>)
         .apply(new MyReduceFunction(), new MyWindowFunction());
    +
    +/* ... */
    +
    +private static  class myReduceFunction implements 
ReduceFunction<SensorReading> {
    +
    +    public SensorReading reduce(SensorReading s1, SensorReading s2)  {
    +        return s1;
    +    }
    +}
    +
    +private static class MyWindowFunction implements 
WindowFunction<SensorReading, SensorReading, String, TimeWindow> {
    +
    +    public void apply(String key, TimeWindow window, 
Iterable<SensorReading> readings, Collector<SensorReading> out) {
    +        out.collect(readings.iterator().next());
    --- End diff --
    
    Not sure if this is a good example. The same result could be achieved by a 
single `ReduceFunction`. How about the `ReduceFunction` searches for a 
`SensorReading` with a minimum value and the `WindowFunction` emits a `Tuple2` 
of start time of window and minimum `SensorReading`?


> Document window processing with Reduce/FoldFunction + WindowFunction
> --------------------------------------------------------------------
>
>                 Key: FLINK-3899
>                 URL: https://issues.apache.org/jira/browse/FLINK-3899
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 1.1.0
>            Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to