[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786427#comment-17786427 ]
david radley edited comment on FLINK-33365 at 11/15/23 4:34 PM: ---------------------------------------------------------------- [~libenchao] I have moved the code from the AbstractDialog and pushed up the change. I could not see how to get a _PreparedStatement_ to be able to set the condition on{_}.{_} Please could you give me some pointers. I have successfully tested using the supplied test tables: * other simple predicates work * multiple simple predicates work I added extra tests to JdbcDynamicTableSourceITCase but cannot test those changes as I get errors locally when running the tests without my changes. Any pointers would be great I am running on a Mac. I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e (on paimon) and set up the following tables and did the some joins with filters and a join with multiple keys. The results do not look right to me (but I may be misunderstanding) - WDYT? select * from mariadb_catalog.menagerie.d ; {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+ |op| ip| type| age| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+ |+I| 10.10.10.10| 1| 30| |+I| 10.10.10.10| 2| 40| |+I| 10.10.10.10| 2| 50| |+I| 10.10.10.10| 3| 50| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------+ Received a total of 4 rows Flink SQL> select * from e; {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+ |op| ip| age| proctime| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}------------------------+ |+I| 10.10.10.10| 40|2023-11-15 16:12:57.553| |+I| 10.10.10.10| 50|2023-11-15 16:12:57.554| Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip; {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ |op| ip| age| proctime| ip0| type| age0| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ |+I| 10.10.10.10| 40|2023-11-15 16:08:40.973| 10.10.10.10| 2| 50| |+I| 10.10.10.10| 50|2023-11-15 16:08:40.974| 10.10.10.10| 2| 50| Is this what you would expect? Also I get SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on e.age = d.age; {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ |op| ip| age| proctime| ip0| type| age0| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ ^CQuery terminated, received a total of 0 row and SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on e.ip = d.ip; +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ | op | ip | age | proctime | ip0 | type | age0 | +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ ^CQuery terminated, received a total of 0 row and SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on e.age = d.age and d.ip = e.ip; {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ |op| ip| age| proctime| ip0| type| age0| {+}---{-}{-}{+}------------------------------{-}++{-}-----------{-}{-}-----------------------{-}++{-}------------------------------{-}{-}-----------{-}++{-}------------ ^CQuery terminated, received a total of 0 row was (Author: JIRAUSER300523): [~libenchao] I have moved the code from the AbstractDialog and pushed up the change. I could not see how to get a _PreparedStatement_ to be able to set the condition on{_}.{_} Please could you give me some pointers. I have successfully tested using the supplied test tables: * other simple predicates work * multiple simple predicates work I added extra tests to JdbcDynamicTableSourceITCase but cannot test those changes as I get errors locally when running the tests without my changes. Any pointers would be great I am running on a Mac. I wanted to test 2 look up keys, so I created a new table d (on MariaDB) and e (on paimon) and set up the following tables and did the some joins with filters and a join with multiple keys. The results do not look right to me (but I may be misunderstanding) - but want to check with you: select * from mariadb_catalog.menagerie.d ; +----+--------------------------------+-------------+-------------+ | op | ip | type | age | +----+--------------------------------+-------------+-------------+ | +I | 10.10.10.10 | 1 | 30 | | +I | 10.10.10.10 | 2 | 40 | | +I | 10.10.10.10 | 2 | 50 | | +I | 10.10.10.10 | 3 | 50 | +----+--------------------------------+-------------+-------------+ Received a total of 4 rows Flink SQL> select * from e; +----+--------------------------------+-------------+-------------------------+ | op | ip | age | proctime | +----+--------------------------------+-------------+-------------------------+ | +I | 10.10.10.10 | 40 | 2023-11-15 16:12:57.553 | | +I | 10.10.10.10 | 50 | 2023-11-15 16:12:57.554 | Flink SQL> SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on d.type = 2 and d.age = 50 and e.ip = d.ip; +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ | op | ip | age | proctime | ip0 | type | age0 | +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ | +I | 10.10.10.10 | 40 | 2023-11-15 16:08:40.973 | 10.10.10.10 | 2 | 50 | | +I | 10.10.10.10 | 50 | 2023-11-15 16:08:40.974 | 10.10.10.10 | 2 | 50 | Is this what you would expect? Also I get SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on e.age = d.age; +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ | op | ip | age | proctime | ip0 | type | age0 | +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ ^CQuery terminated, received a total of 0 row and SELECT * FROM e left join mariadb_catalog.menagerie.d FOR SYSTEM_TIME AS OF e.proctime on e.age = d.age and d.ip = e.ip; +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ | op | ip | age | proctime | ip0 | type | age0 | +----+--------------------------------+-------------+-------------------------+--------------------------------+-------------+-------------+ ^CQuery terminated, received a total of 0 row > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > ------------------------------------------------------------------------------------------- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar > Reporter: macdoor615 > Assignee: david radley > Priority: Critical > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)