Turns out that this is the way to solve this problem:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tag = OutputTag[Tuple1[Int]]("late")
val stream = senv
  .addSource(new SourceFunction[Int] {
    override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
      (10000 to 10090).foreach(ctx.collect)
      Thread.sleep(1000)
      (20 to 30).foreach(ctx.collect)
    }
    override def cancel(): Unit = {}
  })
  .map(x => Tuple1(x))
  .assignAscendingTimestamps(_._1)
  .keyBy(_ => 1)
  .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
  .sideOutputLateData(tag)
  .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
TimeWindow] {
    override def process(key: Int, context: Context, elements:
Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
      out.collect(elements.map(_._1).toList)
    }
  })
stream
  .getSideOutput(tag)
  .map(a => s"late: $a")
  .print()
stream
  .map(list => list :+ 42)
  .print()

senv.execute()

On Thu, Sep 17, 2020 at 3:32 PM Ori Popowski <ori....@gmail.com> wrote:

> Hi,
>
> I have this simple flow:
>
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tag = OutputTag[Tuple1[Int]]("late")
> val stream = senv
>   .addSource(new SourceFunction[Int] {
>     override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
>       (10000 to 10090).foreach(ctx.collect)
>       Thread.sleep(1000)
>       (20 to 30).foreach(ctx.collect)
>     }
>     override def cancel(): Unit = {}
>   })
>   .map(x => Tuple1(x))
>   .assignAscendingTimestamps(_._1)
>   .keyBy(_ => 1)
>   .window(EventTimeSessionWindows.withGap(Time.milliseconds(2000)))
>   .sideOutputLateData(tag)
>   .process(new ProcessWindowFunction[Tuple1[Int], List[Int], Int,
> TimeWindow] {
>     override def process(key: Int, context: Context, elements:
> Iterable[Tuple1[Int]], out: Collector[List[Int]]): Unit = {
>       out.collect(elements.map(_._1).toList)
>     }
>   })
> stream
>   .print()
> stream
>   .getSideOutput(tag)
>   .map(a => s"late: $a")
>   .print()
>
> senv.execute()
>
> This is a simple stream which uses a session window on integers and then
> uses process(…) to just collect them into a list. There's also side
> output for late data.
> When I run this job I can see printing to stdout of the late messages
> without any problem.
>
> However, when I add a map(…) after process(…), the late data isn't
> getting into the sideoutput and I cannot see the printing to stdout:
> …
> .sideOutputLateData(tag)
> .process(…)
> .map(list => list :+ 42)
> …
>
> Is this a bug or is it working as intended? If it's not a bug - does it
> mean I cannot add any operator after process(…)?
>
> Thanks
>

Reply via email to