Fabian Hueske created FLINK-5031: ------------------------------------ Summary: Consecutive DataStream.split() ignored Key: FLINK-5031 URL: https://issues.apache.org/jira/browse/FLINK-5031 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.1.3, 1.2.0 Reporter: Fabian Hueske Fix For: 1.2.0
The output of the following program {code} static final class ThresholdSelector implements OutputSelector<Long> { long threshold; public ThresholdSelector(long threshold) { this.threshold = threshold; } @Override public Iterable<String> select(Long value) { if (value < threshold) { return Collections.singletonList("Less"); } else { return Collections.singletonList("GreaterEqual"); } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SplitStream<Long> split1 = env.generateSequence(1, 11) .split(new ThresholdSelector(6)); // stream11 should be [1,2,3,4,5] DataStream<Long> stream11 = split1.select("Less"); SplitStream<Long> split2 = stream11 // .map(new MapFunction<Long, Long>() { // @Override // public Long map(Long value) throws Exception { // return value; // } // }) .split(new ThresholdSelector(3)); DataStream<Long> stream21 = split2.select("Less"); // stream21 should be [1,2] stream21.print(); env.execute(); } {code} should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second {{split}} operation is ignored. The program is correctly evaluate if the identity {{MapFunction}} is added to the program. -- This message was sent by Atlassian JIRA (v6.3.4#6332)