I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), but encountered an Exception:
Caused by: java.lang.IllegalArgumentException: OutputTag must not be null. code snippet implemented by Scala ``` private final val backupOutputTag = OutputTag[String]("backup") val result = dataStream.assignAscendingTimestamps(_._3) .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .sum(1) .process(new ProcessFunction[(String, Int, Long), (String, Int, Long)] { override def processElement(value: (String, Int, Long), ctx: ProcessFunction[(String, Int, Long), (String, Int, Long)]#Context, out: Collector[(String, Int, Long)]): Unit = { out.collect(value) ctx.output(backupOutputTag, s"backup:${value}") } }) ``` In my opinion, the reason is bcz `backupOutputTag` was created on JobManager, and `ctx.output(backupOutputTag)` was on TaskManager, so the `backupOutputTag` would be null. But the doc example shows that way is ok, what's the correct usage in Scala? flink-version:1.9.1 I post a question here but no resp:http://apache-flink.147419.n8.nabble.com/sideoutput-sql-state-td1533.html Hope resp.Thanks. 1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html