[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798131#comment-17798131 ]
Benchao Li edited comment on FLINK-33365 at 12/18/23 11:09 AM: --------------------------------------------------------------- [~davidradl] I didn't see a problem in the example you gave in this thread: https://lists.apache.org/thread/7r49c5ffohz1oqm3oz2g60g5lnovq0l2. The result is correct since you are using 'LEFT JOIN', for the records from left table which does not have matching rows, it should emit the line, and pad nulls for fields in right table. Am I missing something? The behavior should be the same when pushdown is enabled or not as you stated previously. was (Author: libenchao): [~davidradl] I didn't see a problem in the example you gave in this thread: https://lists.apache.org/thread/7r49c5ffohz1oqm3oz2g60g5lnovq0l2. The result is correct since you are using 'LEFT JOIN', for the records from left table which does not have matching rows, it should emit the line, and pad nulls for fields in right table. Am I missing something? The behavior should be the same when pushdown is enabled or not as you stated in previously. > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > ------------------------------------------------------------------------------------------- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar > Reporter: macdoor615 > Assignee: david radley > Priority: Critical > Labels: pull-request-available > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)