macdoor615 created FLINK-33365: ---------------------------------- Summary: Missing filter condition in execution plan containing lookup join with mysql jdbc connector Key: FLINK-33365 URL: https://issues.apache.org/jira/browse/FLINK-33365 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.17.1, 1.18.0 Environment: Flink 1.17.1 & Flink 1.18.0 with flink-connector-jdbc-3.1.1-1.17.jar Reporter: macdoor615
create table in flink with sql-client.sh {code:java} CREATE TABLE default_catalog.default_database.a ( ip string, proctime as proctime() ) WITH ( 'connector' = 'datagen' );{code} create table in mysql {code:java} create table b ( ip varchar(20), type int ); {code} Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* excute in sql-client.sh {code:java} explain SELECT * FROM default_catalog.default_database.a left join bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and a.ip = b.ip; {code} get the execution plan {code:java} ... == Optimized Execution Plan == Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) +- Calc(select=[ip, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]){code} excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.0.0-1.16.jar* get the execution plan {code:java} == Optimized Execution Plan == Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) +- Calc(select=[ip, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) {code} with flink-connector-jdbc-3.1.1-1.17.jar, the condition is *lookup=[ip=ip]* with flink-connector-jdbc-3.0.0-1.16.jar , the condition is {*}lookup=[type=0, ip=ip], where=[(type = 0)]{*}{*}{*} In out real world production environment, incorrect data is output -- This message was sent by Atlassian Jira (v8.20.10#820010)