Shuai Xu created FLINK-36962: -------------------------------- Summary: push down non-deterministic filter after stream join to source by mistake Key: FLINK-36962 URL: https://issues.apache.org/jira/browse/FLINK-36962 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 2.0-preview Reporter: Shuai Xu
Non-deterministic filter after stream join is push down to source by mistake. Modify the org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet of code. {code:java} @BeforeEach def setup(): Unit = { util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c) util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c) util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf) } @Test def testCalcWithNonDeterministicFilterAfterJoin(): Unit = { val sqlQuery = "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join SourceTable t2 on t1.b = t2.b) t " + "WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, -2, NOW()) and t.t2c > '2022-01-01 00:00:00'" util.verifyRelPlan(sqlQuery) } {code} we expected the plan with {code:java} Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(), -7200000:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[b]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code} but the plan is {code:java} Calc(select=[a]) +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[b]]) : +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(), -7200000:INTERVAL HOUR))]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)