Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995668 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala --- @@ -302,8 +303,87 @@ class RetractionRulesTest extends TableTestBase { ) util.verifyTableTrait(resultTable, expected) } -} + @Test + def testInnerJoinWithoutAgg(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(Int, Int)]('a, 'b) + val rTable = util.addTable[(Int, Int)]('bb, 'c) + + val resultTable = lTable + .join(rTable) + .where('b === 'bb) + .select('a, 'b, 'c) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, Acc" + ), + "false, Acc" + ) + util.verifyTableTrait(resultTable, expected) + } + + @Test + def testLeftJoin(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(Int, Int)]('a, 'b) + val rTable = util.addTable[(Int, String)]('bb, 'c) + + val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + + val expected = + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", + "DataStreamScan(true, Acc)", + "false, AccRetract" + ), + "false, AccRetract" + ) + util.verifyTableTrait(resultTable, expected) + } + + @Test + def testAggFollowedWithLeftJoin(): Unit = { + val util = streamTestForRetractionUtil() + val lTable = util.addTable[(Int, Int)]('a, 'b) + val rTable = util.addTable[(Int, String)]('bb, 'c) + + val countDistinct = new CountDistinct + val resultTable = lTable + .leftOuterJoin(rTable, 'b === 'bb) + .select('a, 'b, 'c) + .groupBy('a) + .select('a, countDistinct('c)) + + val expected = + unaryNode( + "DataStreamGroupAggregate", + unaryNode( + "DataStreamCalc", + binaryNode( + "DataStreamJoin", + "DataStreamScan(true, Acc)", --- End diff -- `testJoin()` has covered this case.
---