gustavodemorais commented on code in PR #26627: URL: https://github.com/apache/flink/pull/26627#discussion_r2128368031
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.scala: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api._ +import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} +import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase} + +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.tools.RuleSets +import org.junit.jupiter.api.{BeforeEach, Test} + +/** Tests for [[FlinkRightJoinToLeftJoinRule]]. */ +class FlinkRightJoinToLeftJoinRuleTest extends TableTestBase { + private val util = batchTestUtil() + + @BeforeEach + def setup(): Unit = { + util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE) + val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig) + calciteConfig.getBatchProgram.get.addLast( + "right-to-left-rules", + FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList(FlinkRightJoinToLeftJoinRule.INSTANCE)) + .build() + ) + + util.addTableSource[(Int, Long)]("T1", 'a, 'b) + util.addTableSource[(Int, Long, Long)]("T2", 'c, 'd, 'e) + util.addTableSource[(Int, Long)]("T3", 'f, 'g) + } + + @Test + def testRightJoinIsConverted(): Unit = { + val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a = c" + util.verifyRelPlan(sqlQuery) + } + + @Test + def testJoinChainIsConverted(): Unit = { + val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f" + util.verifyRelPlan(sqlQuery) + } + + @Test + def testLeftJoinRemainsUnchanged(): Unit = { + val sqlQuery = "SELECT * FROM T1 LEFT JOIN T2 ON a = c" + util.verifyRelPlan(sqlQuery) + } + + @Test + def testRightJoinWithExpressionCondition(): Unit = { + val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1" + util.verifyRelPlan(sqlQuery) + } + + @Test + def testRightJoinWithProjection(): Unit = { + val sqlQuery = "SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c" + util.verifyRelPlan(sqlQuery) + } +} Review Comment: Can you please add one test for multiple chained right joins? T1 RIGHT JOIN T2 RIGHT JOIN T3 ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.scala: ########## @@ -0,0 +1,79 @@ +/* Review Comment: Can you please write the test in .java? We're trying to move away from scala and implement new things in java ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.scala: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api._ +import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} +import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase} + +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.tools.RuleSets +import org.junit.jupiter.api.{BeforeEach, Test} + +/** Tests for [[FlinkRightJoinToLeftJoinRule]]. */ +class FlinkRightJoinToLeftJoinRuleTest extends TableTestBase { + private val util = batchTestUtil() + + @BeforeEach + def setup(): Unit = { + util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE) Review Comment: Could we use streaming for the setup? ########## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.xml: ########## @@ -0,0 +1,144 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testRightJoinIsConverted"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) + +- LogicalJoin(condition=[=($3, $0)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testJoinChainIsConverted"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) ++- LogicalJoin(condition=[=($0, $5)], joinType=[right]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) + : +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T3]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) ++- LogicalProject(a=[$2], b=[$3], c=[$4], d=[$5], e=[$6], f=[$0], g=[$1]) + +- LogicalJoin(condition=[=($2, $0)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T3]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) + +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testLeftJoinRemainsUnchanged"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 LEFT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testRightJoinWithExpressionCondition"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) + +- LogicalJoin(condition=[=($2, $6)], joinType=[right]) + :- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) + +- LogicalProject(a=[$4], b=[$5], $f2=[$6], c=[$0], d=[$1], e=[$2], $f3=[$3]) + +- LogicalJoin(condition=[=($6, $3)], joinType=[left]) + :- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testRightJoinWithProjection"> + <Resource name="sql"> + <![CDATA[SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(b=[$1], d=[$3]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(b=[$1], d=[$3]) ++- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) Review Comment: I don't think we want to actually reorder the projected fields since the user expects the query to be returned as they wrote it? What we actually want is to make sure the indexes for the projected fields are correct ########## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.xml: ########## @@ -0,0 +1,144 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testRightJoinIsConverted"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) + +- LogicalJoin(condition=[=($3, $0)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testJoinChainIsConverted"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) ++- LogicalJoin(condition=[=($0, $5)], joinType=[right]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) + : +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T3]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6]) ++- LogicalProject(a=[$2], b=[$3], c=[$4], d=[$5], e=[$6], f=[$0], g=[$1]) + +- LogicalJoin(condition=[=($2, $0)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T3]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) + +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testLeftJoinRemainsUnchanged"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 LEFT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testRightJoinWithExpressionCondition"> + <Resource name="sql"> + <![CDATA[SELECT * FROM T1 RIGHT JOIN T2 ON a + 1 = c - 1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) + +- LogicalJoin(condition=[=($2, $6)], joinType=[right]) + :- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]) ++- LogicalProject(a=[$0], b=[$1], c=[$3], d=[$4], e=[$5]) + +- LogicalProject(a=[$4], b=[$5], $f2=[$6], c=[$0], d=[$1], e=[$2], $f3=[$3]) + +- LogicalJoin(condition=[=($6, $3)], joinType=[left]) + :- LogicalProject(c=[$0], d=[$1], e=[$2], $f3=[-($0, 1)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +]]> + </Resource> + </TestCase> + + <TestCase name="testRightJoinWithProjection"> + <Resource name="sql"> + <![CDATA[SELECT b, d FROM T1 RIGHT JOIN T2 ON a = c]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(b=[$1], d=[$3]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + :- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(b=[$1], d=[$3]) ++- LogicalProject(a=[$3], b=[$4], c=[$0], d=[$1], e=[$2]) Review Comment: Why are indexes here for the same key (d=[$1] and d=[$3]) different? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkRightJoinToLeftJoinRuleTest.scala: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.table.api._ +import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} +import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase} + +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.tools.RuleSets +import org.junit.jupiter.api.{BeforeEach, Test} + +/** Tests for [[FlinkRightJoinToLeftJoinRule]]. */ +class FlinkRightJoinToLeftJoinRuleTest extends TableTestBase { + private val util = batchTestUtil() + + @BeforeEach + def setup(): Unit = { + util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE) + val calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv.getConfig) + calciteConfig.getBatchProgram.get.addLast( + "right-to-left-rules", + FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList(FlinkRightJoinToLeftJoinRule.INSTANCE)) + .build() + ) + + util.addTableSource[(Int, Long)]("T1", 'a, 'b) + util.addTableSource[(Int, Long, Long)]("T2", 'c, 'd, 'e) + util.addTableSource[(Int, Long)]("T3", 'f, 'g) + } + + @Test + def testRightJoinIsConverted(): Unit = { + val sqlQuery = "SELECT * FROM T1 RIGHT JOIN T2 ON a = c" + util.verifyRelPlan(sqlQuery) + } + + @Test + def testJoinChainIsConverted(): Unit = { + val sqlQuery = "SELECT * FROM (SELECT * FROM T1 JOIN T2 ON a = c) RIGHT JOIN T3 ON a = f" Review Comment: Nice test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org