[ https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15685826#comment-15685826 ]
ASF GitHub Bot commented on FLINK-5031: --------------------------------------- GitHub user Renkai opened a pull request: https://github.com/apache/flink/pull/2847 [FLINK-5031]Consecutive DataStream.split() ignored I think this is a way to solve this issue, but might not be the best one.Since I'm knowing the code base enough, I hope someone may review it and give some advice. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Renkai/flink FLINK-5031 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2847.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2847 ---- commit e216c79cd99c08d92847a6254c13fc8d75bb94c3 Author: renkai <gaelook...@gmail.com> Date: 2016-11-21T06:38:03Z add a new StreamNode when split commit 8ce23e658102c35c58946429fd0fde5e72d722df Author: renkai <gaelook...@gmail.com> Date: 2016-11-21T07:00:04Z fix test case commit 6672354819f2a6411f1a4c9479653f421b6163c6 Author: renkai <gaelook...@gmail.com> Date: 2016-11-21T07:09:01Z delete output selectors check since those infos are moved to split1 commit d59900a19c0d966ee175fbbe9661004cec556670 Author: renkai <gaelook...@gmail.com> Date: 2016-11-21T07:44:58Z add unit test for consecutive split commit 8198ee28e95371651d8c159db36dfcf6163c1659 Author: renkai <gaelook...@gmail.com> Date: 2016-11-22T02:22:16Z add unit test for consecutive split commit 2a0d0ec730c1b7d5c955a4ef374ebd7fd6c58f5c Author: renkai <gaelook...@gmail.com> Date: 2016-11-22T03:14:37Z add unit test for consecutive split commit 410b8717b76df8bfa40b59f1593439442d34ec49 Author: renkai <gaelook...@gmail.com> Date: 2016-11-22T03:26:18Z add unit test for consecutive split commit 290f2f6f6fc1a249cf30d04cc31429e95dc0ec44 Author: renkai <gaelook...@gmail.com> Date: 2016-11-22T03:50:13Z add unit test for consecutive split ---- > Consecutive DataStream.split() ignored > -------------------------------------- > > Key: FLINK-5031 > URL: https://issues.apache.org/jira/browse/FLINK-5031 > Project: Flink > Issue Type: Bug > Components: Streaming > Affects Versions: 1.2.0, 1.1.3 > Reporter: Fabian Hueske > Assignee: Renkai Ge > Fix For: 1.2.0 > > > The output of the following program > {code} > static final class ThresholdSelector implements OutputSelector<Long> { > long threshold; > public ThresholdSelector(long threshold) { > this.threshold = threshold; > } > @Override > public Iterable<String> select(Long value) { > if (value < threshold) { > return Collections.singletonList("Less"); > } else { > return Collections.singletonList("GreaterEqual"); > } > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > SplitStream<Long> split1 = env.generateSequence(1, 11) > .split(new ThresholdSelector(6)); > // stream11 should be [1,2,3,4,5] > DataStream<Long> stream11 = split1.select("Less"); > SplitStream<Long> split2 = stream11 > // .map(new MapFunction<Long, Long>() { > // @Override > // public Long map(Long value) throws Exception { > // return value; > // } > // }) > .split(new ThresholdSelector(3)); > DataStream<Long> stream21 = split2.select("Less"); > // stream21 should be [1,2] > stream21.print(); > env.execute(); > } > {code} > should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second > {{split}} operation is ignored. > The program is correctly evaluate if the identity {{MapFunction}} is added to > the program. -- This message was sent by Atlassian JIRA (v6.3.4#6332)