Hi All,

I'm having issues with creating side outputs. There are two input sources
(both from kafka) and they are connected and fed into a co-process
function. Inside the co-process, the regular data stream outputs a POJO and
in processElement2 there is a periodic timer which creates the side output.
When I start the job I get the below exception. Is there something that I'm
doing wrong?

I used the below example to implement the side output.
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java

processElement2
ctx.output("side-output", POJO);

Job
dataStream.getSideOutput("side-output").print();


2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8)
(20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8)
(fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8)
(a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

Thanks

Reply via email to