[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786427#comment-17786427
 ] 

david radley edited comment on FLINK-33365 at 11/15/23 4:34 PM:
----------------------------------------------------------------

[~libenchao] 

I have moved the code from the AbstractDialog and pushed up the change. I could 
not see how to get a  _PreparedStatement_ to be able to set the condition 
on{_}.{_} Please could you give me some pointers.

 

I have successfully tested using the supplied test tables:
 * other simple predicates work
 * multiple simple predicates work

 

I added extra tests to JdbcDynamicTableSourceITCase but cannot test those 
changes as I get errors locally when running the tests without my changes. Any 
pointers would be great I am running on a Mac.  

 

I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e 
(on paimon)  and set up the following tables and did the some joins with 
filters and a join with multiple keys. The results do not look right to me (but 
I may be misunderstanding) - WDYT?

select * from mariadb_catalog.menagerie.d ;

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+
|op|                            ip|        type|        age|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+
|+I|                    10.10.10.10|          1|          30|
|+I|                    10.10.10.10|          2|          40|
|+I|                    10.10.10.10|          2|          50|
|+I|                    10.10.10.10|          3|          50|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+

Received a total of 4 rows

 

Flink SQL> select * from e;

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+
|op|                            ip|        age|                proctime|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+
|+I|                    10.10.10.10|          40|2023-11-15 16:12:57.553|
|+I|                    10.10.10.10|          50|2023-11-15 16:12:57.554|

 

Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR 
SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip;

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op|                            ip|        age|                proctime|        
                    ip0|        type|        age0|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|+I|                    10.10.10.10|          40|2023-11-15 16:08:40.973|       
             10.10.10.10|          2|          50|
|+I|                    10.10.10.10|          50|2023-11-15 16:08:40.974|       
             10.10.10.10|          2|          50|

 

Is this what you would expect? 

 

Also I get

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age;

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op|                            ip|        age|                proctime|        
                    ip0|        type|        age0|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------

^CQuery terminated, received a total of 0 row

and 

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on e.ip = d.ip;

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

^CQuery terminated, received a total of 0 row

and

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age and d.ip = e.ip;

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------
|op|                            ip|        age|                proctime|        
                    ip0|        type|        age0|

{+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------

^CQuery terminated, received a total of 0 row

 

 

 

 

 

 

 

 

 

 

 

 

 


was (Author: JIRAUSER300523):
[~libenchao] 

I have moved the code from the AbstractDialog and pushed up the change. I could 
not see how to get a  _PreparedStatement_ to be able to set the condition 
on{_}.{_} Please could you give me some pointers.

 

I have successfully tested using the supplied test tables:
 * other simple predicates work
 * multiple simple predicates work

 

I added extra tests to JdbcDynamicTableSourceITCase but cannot test those 
changes as I get errors locally when running the tests without my changes. Any 
pointers would be great I am running on a Mac.  

 

I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e 
(on paimon)  and set up the following tables and did the some joins with 
filters and a join with multiple keys. The results do not look right to me (but 
I may be misunderstanding) - but want to check with you: 

select * from mariadb_catalog.menagerie.d ;

+----+--------------------------------+-------------+-------------+

| op |                             ip |        type |         age |

+----+--------------------------------+-------------+-------------+

| +I |                    10.10.10.10 |           1 |          30 |

| +I |                    10.10.10.10 |           2 |          40 |

| +I |                    10.10.10.10 |           2 |          50 |

| +I |                    10.10.10.10 |           3 |          50 |

+----+--------------------------------+-------------+-------------+

Received a total of 4 rows

 

Flink SQL> select * from e; 

+----+--------------------------------+-------------+-------------------------+

| op |                             ip |         age |                proctime |

+----+--------------------------------+-------------+-------------------------+

| +I |                    10.10.10.10 |          40 | 2023-11-15 16:12:57.553 |

| +I |                    10.10.10.10 |          50 | 2023-11-15 16:12:57.554 |

 

Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR 
SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip;

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

| +I |                    10.10.10.10 |          40 | 2023-11-15 16:08:40.973 | 
                   10.10.10.10 |           2 |          50 |

| +I |                    10.10.10.10 |          50 | 2023-11-15 16:08:40.974 | 
                   10.10.10.10 |           2 |          50 |

 

Is this what you would expect? 

 

Also I get

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age;

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

^CQuery terminated, received a total of 0 row

 

and

SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF 
e.proctime on  e.age = d.age and d.ip = e.ip;

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

| op |                             ip |         age |                proctime | 
                           ip0 |        type |        age0 |

+----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+

^CQuery terminated, received a total of 0 row

 

 

 

 

 

 

 

 

 

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to