[ 
https://issues.apache.org/jira/browse/FLINK-1159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193222#comment-15193222
 ] 

ASF GitHub Bot commented on FLINK-1159:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1704#discussion_r55995217
  
    --- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/acceptPartialFunctions/OnWindowedStream.scala
 ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package 
org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream}
    +import org.apache.flink.streaming.api.windowing.windows.Window
    +
    +class OnWindowedStream[T, K, W <: Window](ds: WindowedStream[T, K, W]) {
    +
    +  /**
    +    * Applies a reduce function to the window. The window function is 
called for each evaluation
    +    * of the window for each key individually. The output of the reduce 
function is interpreted
    +    * as a regular non-windowed stream.
    +    *
    +    * This window will try and pre-aggregate data as much as the window 
policies permit.
    +    * For example,tumbling time windows can perfectly pre-aggregate the 
data, meaning that only one
    +    * element per key is stored. Sliding time windows will pre-aggregate 
on the granularity of the
    +    * slide interval, so a few elements are stored per key (one per slide 
interval).
    +    * Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
    +    * aggregation tree.
    +    *
    +    * @param function The reduce function.
    +    * @return The data stream that is the result of applying the reduce 
function to the window.
    +    */
    +  def reduceWith(function: (T, T) => T) =
    +    ds.reduce(function)
    +
    +  /**
    +    * Applies the given fold function to each window. The window function 
is called for each
    +    * evaluation of the window for each key individually. The output of 
the reduce function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * @param function The fold function.
    +    * @return The data stream that is the result of applying the fold 
function to the window.
    +    */
    +  def foldWith[R: TypeInformation](initialValue: R)(function: (R, T) => R) 
=
    +    ds.fold(initialValue)(function)
    +
    +  /**
    +    * Applies the given window function to each window. The window 
function is called for each
    +    * evaluation of the window for each key individually. The output of 
the window function is
    +    * interpreted as a regular non-windowed stream.
    +    *
    +    * Arriving data is incrementally aggregated using the given fold 
function.
    +    *
    +    * @param initialValue The initial value of the fold
    +    * @param foldFunction The fold function that is used for incremental 
aggregation
    +    * @param windowFunction The window function.
    +    * @return The data stream that is the result of applying the window 
function to the window.
    +    */
    +  def applyWith[R: TypeInformation](initialValue: R)
    +                                   (foldFunction: (R, T) => R,
    +                                    windowFunction: (K, W, R) => 
TraversableOnce[R]):
    --- End diff --
    
    Why does the `windowFunction` work on a single `R` element and not on all 
elements of a window?


> Case style anonymous functions not supported by Scala API
> ---------------------------------------------------------
>
>                 Key: FLINK-1159
>                 URL: https://issues.apache.org/jira/browse/FLINK-1159
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala API
>            Reporter: Till Rohrmann
>            Assignee: Stefano Baghino
>
> In Scala it is very common to define anonymous functions of the following form
> {code}
> {
> case foo: Bar => foobar(foo)
> case _ => throw new RuntimeException()
> }
> {code}
> These case style anonymous functions are not supported yet by the Scala API. 
> Thus, one has to write redundant code to name the function parameter.
> What works is the following pattern, but it is not intuitive for someone 
> coming from Scala:
> {code}
> dataset.map{
>   _ match{
>     case foo:Bar => ...
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to