Hi Bart,

there are multiple ways how to specify a window function using the Scala
API. The most scalaesque way would probably be to use an anonymous function:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1,2,3,4,5,7)

val pair = input.map(x => (x, x))

val allWindow = pair.timeWindowAll(Time.seconds(10))

val result = allWindow{
  (window, iterable, collector: Collector[Int]) =>
    iterable foreach {
      case (l, r) =>
        collector.collect(l + r)
    }
}

result.print()

env.execute("Flink Scala API Skeleton")

I hope this helps you.

Cheers,
Till
​

On Tue, Mar 22, 2016 at 12:40 PM, Bart van Deenen <bartvandee...@fastmail.fm
> wrote:

>
> Hi all
>
> I'm using 1.0, and have all my data nicely bundled in one allWindow, but
> I don't understand the syntax in Scala to make on json out of those for
> dumping the whole window into Kafka.
>
> My type is:
>
> val stream: AllWindowedStream[(List[String], Long, Int), TimeWindow]
>
> and I want to do
>
> stream.apply ????????
>
> I've tried to convert the Java example from the documentation to Scala,
> but I can't get anything meaningful to compile
>
> allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>,
> Integer, Window>() {
>     public void apply (Window window,
>             Iterable<Tuple2<String, Integer>> values,
>             Collector<Integer> out) throws Exception {
>         int sum = 0;
>         for (value t: values) {
>             sum += t.f1;
>         }
>         out.collect (new Integer(sum));
>     }
> });
>
>
> Help very appreciated!
>
> Greetings
>
>
> --
>   Bart van Deenen
>   bartvandee...@fastmail.fm
>

Reply via email to