Re: sideOutputLateData doesn't work with map()

2020-09-17 Thread Chesnay Schepler
This is working as intended, but is admittedly inconvenient. The reason why the original version does not work is that the side-output is scoped to the DataStream that the process function creates; the Map function creates another DataStream though that does not retain the side-output of the pr

Re: sideOutputLateData doesn't work with map()

2020-09-17 Thread Ori Popowski
Turns out that this is the way to solve this problem: val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tag = OutputTag[Tuple1[Int]]("late") val stream = senv .addSource(new SourceFunction[Int] { override def run(

sideOutputLateData doesn't work with map()

2020-09-17 Thread Ori Popowski
Hi, I have this simple flow: val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tag = OutputTag[Tuple1[Int]]("late") val stream = senv .addSource(new SourceFunction[Int] { override def run(ctx: SourceFunction.Sour