I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), 
but encountered an Exception:

Caused by: java.lang.IllegalArgumentException: OutputTag must not be null.


code snippet implemented by Scala

```

  private final val backupOutputTag = OutputTag[String]("backup")

  val result = dataStream.assignAscendingTimestamps(_._3)

    .keyBy(0)

    .window(TumblingEventTimeWindows.of(Time.seconds(10)))

    .sum(1)

      .process(new ProcessFunction[(String, Int, Long), (String, Int, Long)] {

        override def processElement(value: (String, Int, Long), ctx: 
ProcessFunction[(String, Int, Long), (String, Int, Long)]#Context, out: 
Collector[(String, Int, Long)]): Unit = {

          out.collect(value)

          ctx.output(backupOutputTag, s"backup:${value}")

        }

      })

```

In my opinion, the reason is bcz `backupOutputTag` was created on JobManager, 
and `ctx.output(backupOutputTag)` was on TaskManager, so the `backupOutputTag` 
would be null.

But the doc example shows that way is ok, what's the correct usage in Scala?

flink-version:1.9.1

I post a question here but no 
resp:http://apache-flink.147419.n8.nabble.com/sideoutput-sql-state-td1533.html

Hope resp.Thanks.

1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html



Reply via email to