Huang Wei created FLINK-2495:
--------------------------------

             Summary: Add a null point check in API DataStream.union
                 Key: FLINK-2495
                 URL: https://issues.apache.org/jira/browse/FLINK-2495
             Project: Flink
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 0.10
            Reporter: Huang Wei
             Fix For: 0.10


The API(public DataStream<OUT> union(DataStream<OUT>... streams)) is a  
external interface for user.
The parameter "streams" maybe null and it will throw NullPointerException error.

This test below can be intuitive to explain this problem:

package org.apache.flink.streaming.api;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.junit.Test;

/**
 * Created by HuangWHWHW on 2015/8/7.
 */
public class test {

        public static class sourceFunction extends 
RichParallelSourceFunction<String> {

                public sourceFunction() {
                }

                @Override
                public void run(SourceContext<String> sourceContext) throws 
Exception {
                        sourceContext.collect("a");
                }

                @Override
                public void cancel() {

                }
        }

        @Test
        public void testUnion(){
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                DataStream<String> source = env.addSource(new sourceFunction());
                DataStream<String> temp1 = null;
                DataStream<String> temp2 = source.map(new MapFunction<String, 
String>() {
                        @Override
                        public String map(String value) throws Exception {
                                if (value == "a") {
                                        return "This is for test temp2.";
                                }
                                return null;
                        }
                });
                DataStream<String> sink = temp2.union(temp1);
                sink.print();
                try {
                        env.execute();
                }catch (Exception e){
                        e.printStackTrace();
                }
        }

}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to