[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

macdoor615 updated FLINK-33365:
-------------------------------
    Description: 
create table in flink with sql-client.sh
{code:java}
CREATE TABLE default_catalog.default_database.a (
  ip string, 
  proctime as proctime()
) 
WITH (
  'connector' = 'datagen'
);{code}
create table in mysql
{code:java}
create table b (
  ip varchar(20), 
  type int
);  {code}
 

Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*

excute in sql-client.sh 
{code:java}
explain SELECT * FROM default_catalog.default_database.a left join 
bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
a.ip = b.ip; {code}
get the execution plan
{code:java}
...
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]){code}
 
excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
*flink-connector-jdbc-3.0.0-1.16.jar*

get the execution plan
{code:java}
...
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]) {code}
with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 

*lookup=[ip=ip]*

with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 

*lookup=[type=0, ip=ip], where=[(type = 0)]*

 

In out real world production environment, this lead incorrect data output

 

 

  was:
 

create table in flink with sql-client.sh

 
{code:java}
CREATE TABLE default_catalog.default_database.a (
  ip string, 
  proctime as proctime()
) 
WITH (
  'connector' = 'datagen'
);{code}
 

create table in mysql

 
{code:java}
create table b (
  ip varchar(20), 
  type int
);  {code}
 

Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*

excute in sql-client.sh 

 
{code:java}
explain SELECT * FROM default_catalog.default_database.a left join 
bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
a.ip = b.ip; {code}
get the execution plan

 
{code:java}
...
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]){code}
 
excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
*flink-connector-jdbc-3.0.0-1.16.jar*

get the execution plan

 
{code:java}
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
   +- Calc(select=[ip, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip]) {code}
 

with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 

*lookup=[ip=ip]*

with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 

{*}lookup=[type=0, ip=ip], where=[(type = 0)]{*}{*}{*}

In out real world production environment, incorrect data is output

 

 


> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Priority: Critical
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to