[ https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler updated FLINK-9141: ------------------------------------ Description: Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a {{NullPointerException}} to be thrown at runtime. As a work-around one can add a no-op map function before the split() call. Exception: {code} Caused by: java.lang.NullPointerException at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) {code} Reproducer: {code} private static final OutputTag<String> tag = new OutputTag<String>("tag") {}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream1 = env.fromElements("foo"); SingleOutputStreamOperator<String> processedStream = dataStream1 .process(new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out) { } }); processedStream.getSideOutput(tag) .print(); processedStream .split(Collections::singletonList) .select("bar") .print(); env.execute(); } {code} was: Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a {{NullPointerException}} to be thrown at runtime. As a work-around one can add a no-op map function before the split() call. Exception: {code} Caused by: java.lang.NullPointerException at org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) {code} Reproducer: {code} private static final OutputTag<String> tag = new OutputTag<String>("tag") {}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream1 = env.fromElements("foo"); SingleOutputStreamOperator<String> processedStream = dataStream1 .process(new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out) { } }); processedStream.getSideOutput(tag) .print(); processedStream .map(record -> record) .split(Collections::singletonList) .select("bar") .print(); env.execute(); } {code} > Calling getSideOutput() and split() on one DataStream causes NPE > ---------------------------------------------------------------- > > Key: FLINK-9141 > URL: https://issues.apache.org/jira/browse/FLINK-9141 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.0 > Reporter: Chesnay Schepler > Assignee: vinoyang > Priority: Critical > > Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a > {{NullPointerException}} to be thrown at runtime. > As a work-around one can add a no-op map function before the split() call. > Exception: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > {code} > Reproducer: > {code} > private static final OutputTag<String> tag = new OutputTag<String>("tag") {}; > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> dataStream1 = env.fromElements("foo"); > SingleOutputStreamOperator<String> processedStream = dataStream1 > .process(new ProcessFunction<String, String>() { > @Override > public void processElement(String value, Context ctx, > Collector<String> out) { > } > }); > processedStream.getSideOutput(tag) > .print(); > processedStream > .split(Collections::singletonList) > .select("bar") > .print(); > env.execute(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)