[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798091#comment-17798091
 ] 

david radley commented on FLINK-33365:
--------------------------------------

[~libenchao] [~Sergey Nuyanzin] Bringing back a discussion onto the issue. I 
have many of the scenarios working including the Or scenario. There are 
scenarios

like the one [https://lists.apache.org/thread/7r49c5ffohz1oqm3oz2g60g5lnovq0l2] 
if the jdbc lookup source has rows that do not match the filter, then nulls 
come out in the lookup join resultset. The same behaviour occurs with predicate 
pushdown disabled and with my current patch (when I push it up).

On the discussion thread [~ruanhang1993]  agreed that the nulls are not correct.

 

The way I am reading this is that, unless we have a reason to see these nulls, 
there is an issue in lookup joins in certain scenarios. It does not appear to 
relate to JDBC. Are we ok if I make this fix work the same as when predicate 
pushdown is not enabled. And raise a separate issue for the lookup join which 
appears not to be JDBC related.  

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to