Xuyang Zhong created FLINK-38579:
------------------------------------

             Summary: NonEquivCond in join and Filters pushed down in source 
should also affect the upstream changelog just like Filter in Calc
                 Key: FLINK-38579
                 URL: https://issues.apache.org/jira/browse/FLINK-38579
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Xuyang Zhong


Currently, we have checked the filter in calc that if the calc applies on non 
upsert key, it requires the upstream changelog to be UA + UB. 

However, non-equiv conditions in join and filters that are pushed down in 
source should also check this logic to avoid generating drop update before.

For example:
{code:java}
@Test
def test(): Unit = {
  val testDataId = TestValuesTableFactory.registerData(
    Seq(
      changelogRow("+I", Int.box(1), "tom", Int.box(1)),
      changelogRow("-U", Int.box(1), "tom", Int.box(1)),
      changelogRow("+U", Int.box(1), "tom", Int.box(2))
    ))
  val ddl =
    s"""
       |CREATE TABLE t (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'data-id' = '$testDataId',
       |  'changelog-mode' = 'I,UA,UB,D',
       |  'filterable-fields' = 'c'
       |)
       |""".stripMargin
  tEnv.executeSql(ddl)
  val sink =
    s"""
       |CREATE TABLE s (
       |  a int primary key not enforced,
       |  b varchar,
       |  c int
       |) WITH (
       |  'connector' = 'values',
       |  'sink-insert-only' = 'false',
       |  'sink-changelog-mode-enforced' = 'I,UA,D'
       |)
       |""".stripMargin
  tEnv.executeSql(sink)

  val sql = "insert into s select * from t where c < 2"
  println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
  tEnv.executeSql(sql).await()       

  // the result should be 'empty' instead of '1,tom,1'
  println(TestValuesTableFactory.getResultsAsStrings("s").sorted) 
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to