Fabian Hueske created FLINK-5031:
------------------------------------

             Summary: Consecutive DataStream.split() ignored
                 Key: FLINK-5031
                 URL: https://issues.apache.org/jira/browse/FLINK-5031
             Project: Flink
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.1.3, 1.2.0
            Reporter: Fabian Hueske
             Fix For: 1.2.0


The output of the following program 
{code}
static final class ThresholdSelector implements OutputSelector<Long> {
        long threshold;

        public ThresholdSelector(long threshold) {
                this.threshold = threshold;
        }

        @Override
        public Iterable<String> select(Long value) {
                if (value < threshold) {
                        return Collections.singletonList("Less");
                } else {
                        return Collections.singletonList("GreaterEqual");
                }
        }
}

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SplitStream<Long> split1 = env.generateSequence(1, 11)
                .split(new ThresholdSelector(6));
        // stream11 should be [1,2,3,4,5]
        DataStream<Long> stream11 = split1.select("Less");
        SplitStream<Long> split2 = stream11
//              .map(new MapFunction<Long, Long>() {
//                      @Override
//                      public Long map(Long value) throws Exception {
//                              return value;
//                      }
//              })
                .split(new ThresholdSelector(3));
        DataStream<Long> stream21 = split2.select("Less");
        // stream21 should be [1,2]
        stream21.print();

        env.execute();
}
{code}

should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
{{split}} operation is ignored.

The program is correctly evaluate if the identity {{MapFunction}} is added to 
the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to