Xuyang Zhong created FLINK-38579:
------------------------------------
Summary: NonEquivCond in join and Filters pushed down in source
should also affect the upstream changelog just like Filter in Calc
Key: FLINK-38579
URL: https://issues.apache.org/jira/browse/FLINK-38579
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Xuyang Zhong
Currently, we have checked the filter in calc that if the calc applies on non
upsert key, it requires the upstream changelog to be UA + UB.
However, non-equiv conditions in join and filters that are pushed down in
source should also check this logic to avoid generating drop update before.
For example:
{code:java}
@Test
def test(): Unit = {
val testDataId = TestValuesTableFactory.registerData(
Seq(
changelogRow("+I", Int.box(1), "tom", Int.box(1)),
changelogRow("-U", Int.box(1), "tom", Int.box(1)),
changelogRow("+U", Int.box(1), "tom", Int.box(2))
))
val ddl =
s"""
|CREATE TABLE t (
| a int primary key not enforced,
| b varchar,
| c int
|) WITH (
| 'connector' = 'values',
| 'data-id' = '$testDataId',
| 'changelog-mode' = 'I,UA,UB,D',
| 'filterable-fields' = 'c'
|)
|""".stripMargin
tEnv.executeSql(ddl)
val sink =
s"""
|CREATE TABLE s (
| a int primary key not enforced,
| b varchar,
| c int
|) WITH (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'sink-changelog-mode-enforced' = 'I,UA,D'
|)
|""".stripMargin
tEnv.executeSql(sink)
val sql = "insert into s select * from t where c < 2"
println(tEnv.explainSql(sql, ExplainDetail.CHANGELOG_MODE))
tEnv.executeSql(sql).await()
// the result should be 'empty' instead of '1,tom,1'
println(TestValuesTableFactory.getResultsAsStrings("s").sorted)
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)