This is working as intended, but is admittedly inconvenient.
The reason why the original version does not work is that the
side-output is scoped to the DataStream that the process function
creates; the Map function creates another DataStream though that does
not retain the side-output of the pr
Turns out that this is the way to solve this problem:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
.addSource(new SourceFunction[Int] {
override def run(
Hi,
I have this simple flow:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
.addSource(new SourceFunction[Int] {
override def run(ctx: SourceFunction.Sour