Hi All, I'm having issues with creating side outputs. There are two input sources (both from kafka) and they are connected and fed into a co-process function. Inside the co-process, the regular data stream outputs a POJO and in processElement2 there is a periodic timer which creates the side output. When I start the job I get the below exception. Is there something that I'm doing wrong?
I used the below example to implement the side output. https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java processElement2 ctx.output("side-output", POJO); Job dataStream.getSideOutput("side-output").print(); 2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Flat Map (4/8) (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING. 2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Process (1/8) (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74) at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) 2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Process (7/8) (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74) at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Thanks