Shuai Xu created FLINK-36962:
--------------------------------

             Summary: push down non-deterministic filter after stream join to 
source by mistake
                 Key: FLINK-36962
                 URL: https://issues.apache.org/jira/browse/FLINK-36962
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.0-preview
            Reporter: Shuai Xu


Non-deterministic filter after stream join is push down to source by mistake.

Modify the 
org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet 
of code.
 
{code:java}
@BeforeEach
def setup(): Unit = {
  util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
  util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
  util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
}

@Test
def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
  val sqlQuery =
    "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join 
SourceTable t2 on t1.b = t2.b) t " +
      "WHERE TO_TIMESTAMP(t.t1c, 'yyyy-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, 
-2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
  util.verifyRelPlan(sqlQuery)
}
{code}
we expected the plan with 
{code:java}
Calc(select=[a], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), +(NOW(), 
-7200000:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)], 
select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])   :- Exchange(distribution=[hash[b]])   :  +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])   +- 
Exchange(distribution=[hash[b]])      +- Calc(select=[b], where=[>(c, 
'2022-01-01 00:00:00')])         +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
but the plan is
{code:java}
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[b]])
   :  +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, 'yyyy-MM-dd HH:mm:ss'), 
+(NOW(), -7200000:INTERVAL HOUR))])
   :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   +- Exchange(distribution=[hash[b]])
      +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to