Hi, yes, the input does indeed play a role. If not elements are incoming then there will also be no window.
Cheers, Aljoscha On Fri, 6 May 2016 at 12:18 Balaji Rajagopalan < balaji.rajagopa...@olacabs.com> wrote: > I have a requirement where I want to do aggregation on one data stream > every 5 minutes, a different data stream every 1 minute. I wrote a example > code to test this out but the behavior is different from what I expected , > I expected the window2 to be called 5 times, and window 1 to called once , > but in a 5 minute interval the window 1 is called once and window2 is > called only once, have I understood the windowed function incorrectly, does > the input play a role in no of times a window apply is called. I use the nc > command to write to the socket port 9999 and 9998. > > > > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.scala.function.AllWindowFunction > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, > TimeWindow} > > import org.apache.flink.util.Collector > import org.apache.flink.streaming.api.windowing.windows.Window > > > object WindowWordCount { > def main(args: Array[String]) { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) > val text = env.socketTextStream("localhost", 9999) > val text1 = env.socketTextStream("localhost", 9998) > val stream:DataStream[String] = text.flatMap { > _.toLowerCase.split("\\W+") filter { _.nonEmpty } } > val count = > stream.windowAll(TumblingEventTimeWindows.of(Time.minutes(5))).apply { new > MyAllWindowFunction } > > > count.print > > val counts1 = text1.flatMap { _.toLowerCase.split("\\W+") filter { > _.nonEmpty } } > .windowAll(TumblingEventTimeWindows.of(Time.minutes(1))).apply { new > MyAllWindowFunction2 } > > counts1.print > > env.execute("Window Stream WordCount") > } > > class MyAllWindowFunction extends > AllWindowFunction[String,String,TimeWindow] > { > def apply(window : TimeWindow, input : scala.Iterable[String], out : > org.apache.flink.util.Collector[String]): Unit = > { > System.out.println("timed window1 is called") > } > } > > class MyAllWindowFunction2 extends > AllWindowFunction[String,String,TimeWindow] > { > def apply(window : TimeWindow, input : scala.Iterable[String], out : > org.apache.flink.util.Collector[String]): Unit = > { > System.out.println("timed window2 is called") > } > } > } > > > The output was: > > timed window2 is called > timed window1 is called > >