[ 
https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433602#comment-16433602
 ] 

ASF GitHub Bot commented on FLINK-9141:
---------------------------------------

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5836

    [FLINK-9141][datastream] Fail early when using both split and side-outputs

    ## What is the purpose of the change
    
    With this PR we fail early if a user attempts to use split() and 
side-outputs on a single DataStream. Previously this would lead to a 
NullPointerException at runtime.
    
    ## Brief change log
    
    * keep track of split() calls in `SingleOutputStreamOperator` by overriding 
it and setting the `wasSplitApplied` flag
    * add checks to split() and getSideOutput() that throw an exception if the 
other method was already called
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    * run SplitSideOutputTest
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9141

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5836.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5836
    
----
commit ed3ec8716c6d26eee31c4d0ff02c8bfdd70a19d4
Author: zentol <chesnay@...>
Date:   2018-04-11T09:13:52Z

    [FLINK-9141][datastream] Fail early when using both split and side-outputs

----


> Calling getSideOutput() and split() on one DataStream causes NPE
> ----------------------------------------------------------------
>
>                 Key: FLINK-9141
>                 URL: https://issues.apache.org/jira/browse/FLINK-9141
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>
> Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a 
> {{NullPointerException}} to be thrown at runtime.
> As a work-around one can add a no-op map function before the split() call.
> Exception:
> {code}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:79)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:128)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> Reproducer:
> {code}
> private static final OutputTag<String> tag = new OutputTag<String>("tag") {};
> public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       DataStream<String> dataStream1 = env.fromElements("foo");
>       SingleOutputStreamOperator<String> processedStream = dataStream1
>               .process(new ProcessFunction<String, String>() {
>                       @Override
>                       public void processElement(String value, Context ctx, 
> Collector<String> out) {
>                       }
>               });
>       processedStream.getSideOutput(tag)
>               .print();
>       processedStream
>               .split(Collections::singletonList)
>               .select("bar")
>               .print();
>       env.execute();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to