[ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678716#comment-15678716 ]
Renkai Ge commented on FLINK-5031: ---------------------------------- [~fhueske]The second split was not ignored, it was unioned by the first one.{code}union({1,2},{1,2,3,4,5})={1,2,3,4,5}{code},if the second select change to "GreaterEqual", the result would be {3,4,5,6,7,8,9,10,11},that was {code} union({3,4,5,6,7,8,9,10,11},{6,7,8,9,10,11}) {code} see https://github.com/apache/flink/blob/a612b9966f3ee020a5721ac2f039a3633c40146c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java#L114. In the current implementation of split, you will get a unioned result of all split&select combination, I think it was strange somehow.We might solve this issue by reimplement the split function by an OneInputTransformation. > Consecutive DataStream.split() ignored > -------------------------------------- > > Key: FLINK-5031 > URL: https://issues.apache.org/jira/browse/FLINK-5031 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 1.2.0, 1.1.3 > Reporter: Fabian Hueske > Assignee: Renkai Ge > Fix For: 1.2.0 > > > The output of the following program > {code} > static final class ThresholdSelector implements OutputSelector<Long> { > long threshold; > public ThresholdSelector(long threshold) { > this.threshold = threshold; > } > @Override > public Iterable<String> select(Long value) { > if (value < threshold) { > return Collections.singletonList("Less"); > } else { > return Collections.singletonList("GreaterEqual"); > } > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > SplitStream<Long> split1 = env.generateSequence(1, 11) > .split(new ThresholdSelector(6)); > // stream11 should be [1,2,3,4,5] > DataStream<Long> stream11 = split1.select("Less"); > SplitStream<Long> split2 = stream11 > // .map(new MapFunction<Long, Long>() { > // @Override > // public Long map(Long value) throws Exception { > // return value; > // } > // }) > .split(new ThresholdSelector(3)); > DataStream<Long> stream21 = split2.select("Less"); > // stream21 should be [1,2] > stream21.print(); > env.execute(); > } > {code} > should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second > {{split}} operation is ignored. > The program is correctly evaluate if the identity {{MapFunction}} is added to > the program. -- This message was sent by Atlassian JIRA (v6.3.4#6332)