[ https://issues.apache.org/jira/browse/FLINK-2998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249736#comment-15249736 ]
ASF GitHub Bot commented on FLINK-2998: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1838#discussion_r60395945 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java --- @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception } @Test + public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception { + /* + * UDF Join on tuples with multiple key field positions and same customized distribution + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple5<Integer, Long, Integer, String, Integer>> ds1 = CollectionDataSets.get5TupleDataSet(env) + .map(new MapFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Integer>>() { + @Override + public Tuple5<Integer, Long, Integer, String, Integer> map(Tuple5<Integer, Long, Integer, String, Long> value) throws Exception { + return new Tuple5<>(value.f0, value.f1, value.f2, value.f3, value.f4.intValue()); + } + }); + + DataSet<Tuple3<Integer, Integer, String>> ds2 = CollectionDataSets.get3TupleDataSet(env) + .map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() { + @Override + public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception { + return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); + } + }); + + env.setParallelism(4); + TestDistribution testDis = new TestDistribution(); + DataSet<Tuple3<Integer, Long, String>> coGrouped = + DataSetUtils.partitionByRange(ds1, testDis, 0, 4) + .coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1)) + .where(0, 4) + .equalTo(0, 1) + .with(new Tuple5Tuple3CoGroup2()); --- End diff -- Please use the same `CoGroupFunction` as in the original test. > Support range partition comparison for multi input nodes. > --------------------------------------------------------- > > Key: FLINK-2998 > URL: https://issues.apache.org/jira/browse/FLINK-2998 > Project: Flink > Issue Type: New Feature > Components: Optimizer > Reporter: Chengxiang Li > Priority: Minor > > The optimizer may have potential opportunity to optimize the DAG while it > found two input range partition are equivalent, we does not support the > comparison yet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)