[ 
https://issues.apache.org/jira/browse/FLINK-5031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15685826#comment-15685826
 ] 

ASF GitHub Bot commented on FLINK-5031:
---------------------------------------

GitHub user Renkai opened a pull request:

    https://github.com/apache/flink/pull/2847

    [FLINK-5031]Consecutive DataStream.split() ignored

    I think this is a way to solve this issue, but might not be the best 
one.Since I'm knowing the code base enough, I hope someone may review it and 
give some advice.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Renkai/flink FLINK-5031

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2847.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2847
    
----
commit e216c79cd99c08d92847a6254c13fc8d75bb94c3
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T06:38:03Z

    add a new StreamNode when split

commit 8ce23e658102c35c58946429fd0fde5e72d722df
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:00:04Z

    fix test case

commit 6672354819f2a6411f1a4c9479653f421b6163c6
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:09:01Z

    delete output selectors check since those infos are moved to split1

commit d59900a19c0d966ee175fbbe9661004cec556670
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-21T07:44:58Z

    add unit test for consecutive split

commit 8198ee28e95371651d8c159db36dfcf6163c1659
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T02:22:16Z

    add unit test for consecutive split

commit 2a0d0ec730c1b7d5c955a4ef374ebd7fd6c58f5c
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:14:37Z

    add unit test for consecutive split

commit 410b8717b76df8bfa40b59f1593439442d34ec49
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:26:18Z

    add unit test for consecutive split

commit 290f2f6f6fc1a249cf30d04cc31429e95dc0ec44
Author: renkai <gaelook...@gmail.com>
Date:   2016-11-22T03:50:13Z

    add unit test for consecutive split

----


> Consecutive DataStream.split() ignored
> --------------------------------------
>
>                 Key: FLINK-5031
>                 URL: https://issues.apache.org/jira/browse/FLINK-5031
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Fabian Hueske
>            Assignee: Renkai Ge
>             Fix For: 1.2.0
>
>
> The output of the following program 
> {code}
> static final class ThresholdSelector implements OutputSelector<Long> {
>       long threshold;
>       public ThresholdSelector(long threshold) {
>               this.threshold = threshold;
>       }
>       @Override
>       public Iterable<String> select(Long value) {
>               if (value < threshold) {
>                       return Collections.singletonList("Less");
>               } else {
>                       return Collections.singletonList("GreaterEqual");
>               }
>       }
> }
> public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       env.setParallelism(1);
>       SplitStream<Long> split1 = env.generateSequence(1, 11)
>               .split(new ThresholdSelector(6));
>       // stream11 should be [1,2,3,4,5]
>       DataStream<Long> stream11 = split1.select("Less");
>       SplitStream<Long> split2 = stream11
> //            .map(new MapFunction<Long, Long>() {
> //                    @Override
> //                    public Long map(Long value) throws Exception {
> //                            return value;
> //                    }
> //            })
>               .split(new ThresholdSelector(3));
>       DataStream<Long> stream21 = split2.select("Less");
>       // stream21 should be [1,2]
>       stream21.print();
>       env.execute();
> }
> {code}
> should be {{1, 2}}, however it is {{1, 2, 3, 4, 5}}. It seems that the second 
> {{split}} operation is ignored.
> The program is correctly evaluate if the identity {{MapFunction}} is added to 
> the program.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to