Turns out that this is the way to solve this problem: val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tag = OutputTag[Tuple1[Int]]("late") val stream = senv .addSource(new SourceFunction[Int] { override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { (10000 to 10090).foreach(ctx.collect) Thread.sleep(1000) (20 to 30).foreach(ctx.collect) } override def cancel(): Unit = {} }) .map(x => Tuple1(x)) .assignAscendingTimestamps(_._1) .keyBy(_ => 1) .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000))) .sideOutputLateData(tag) .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, TimeWindow] { override def process(key: Int, context: Context, elements: Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = { out.collect(elements.map(_._1).toList) } }) stream .getSideOutput(tag) .map(a => s"late: $a") .print() stream .map(list => list :+ 42) .print()
senv.execute() On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski <ori....@gmail.com> wrote: > Hi, > > I have this simple flow: > > val senv = StreamExecutionEnvironment.getExecutionEnvironment > senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tag = OutputTag[Tuple1[Int]]("late") > val stream = senv > .addSource(new SourceFunction[Int] { > override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { > (10000 to 10090).foreach(ctx.collect) > Thread.sleep(1000) > (20 to 30).foreach(ctx.collect) > } > override def cancel(): Unit = {} > }) > .map(x => Tuple1(x)) > .assignAscendingTimestamps(_._1) > .keyBy(_ => 1) > .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000))) > .sideOutputLateData(tag) > .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int, > TimeWindow] { > override def process(key: Int, context: Context, elements: > Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = { > out.collect(elements.map(_._1).toList) > } > }) > stream > .print() > stream > .getSideOutput(tag) > .map(a => s"late: $a") > .print() > > senv.execute() > > This is a simple stream which uses a session window on integers and then > uses process(…) to just collect them into a list. There's also side > output for late data. > When I run this job I can see printing to stdout of the late messages > without any problem. > > However, when I add a map(…) after process(…), the late data isn't > getting into the sideoutput and I cannot see the printing to stdout: > … > .sideOutputLateData(tag) > .process(…) > .map(list => list :+ 42) > … > > Is this a bug or is it working as intended? If it's not a bug - does it > mean I cannot add any operator after process(…)? > > Thanks >