macdoor615 created FLINK-33365:

             Summary: Missing filter condition in execution plan containing 
lookup join with mysql jdbc connector
                 Key: FLINK-33365
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.17.1, 1.18.0
         Environment: Flink 1.17.1 & Flink 1.18.0 with 
            Reporter: macdoor615


create table in flink with

CREATE TABLE default_catalog.default_database.a (
  ip string, 
  proctime as proctime()
  'connector' = 'datagen'

create table in mysql

create table b (
  ip varchar(20), 
  type int
);  {code}

Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*

excute in 

explain SELECT * FROM default_catalog.default_database.a left join 
bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
a.ip = b.ip; {code}
get the execution plan

== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 

get the execution plan

== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]) {code}

with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 


with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 

{*}lookup=[type=0, ip=ip], where=[(type = 0)]{*}{*}{*}

In out real world production environment, incorrect data is output



This message was sent by Atlassian Jira

Reply via email to