[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784915#comment-17784915 ]
david radley edited comment on FLINK-33365 at 11/10/23 3:04 PM: ---------------------------------------------------------------- an update on what I have found: I have switched on DEBUG put out the rules that are being driven for my recreation. I see : org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - optimize time_indicator cost 1 ms. optimize result: FlinkLogicalSink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type]) +- FlinkLogicalCalc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- FlinkLogicalJoin(condition=[=($0, $4)], joinType=[left]) :- FlinkLogicalCalc(select=[ip, PROCTIME() AS proctime]) : +- FlinkLogicalTableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0]) +- FlinkLogicalTableSourceScan(table=[[mariadb_catalog, menagerie, c, {*}filter=[=(type, 0)]]]{*}, fields=[ip, type]) Is removed in the next stage. org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - optimize physical cost 3 ms. optimize result: Sink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type]) +- Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- LookupJoin(table=[mariadb_catalog.menagerie.c], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, *CAST(0 AS INTEGER)* AS type, CAST(ip AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0]) +- Calc(select=[ip, PROCTIME() AS proctime]) +- TableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip]) The *CAST(0 AS INTEGER)* is in the final Optimized Execution Plan we see in the explain This cast is fine as long as the filter is there. I am not an expert at this, but the comments at the top of [CommonPhysicalLookupJoin.scala]([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)] are correct and this filter should actually be in the lookup keys. The comments says _* For a lookup join query:_ _*_ _* <pre> SELECT T.id, T.content, D.age FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D_ _* ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id WHERE D.name LIKE 'Jack%'_ _* </pre>_ _*_ _* The LookupJoin physical node encapsulates the following RelNode tree:_ _*_ _* <pre> Join (l.name = r.name) / \ RelNode Calc (concat(name, "!") as name, name LIKE 'Jack%') |_ _* DimTable (lookup-keys: age=11, id=l.id) (age, id, name) </pre>_ _*_ _* The important member fields in LookupJoin: <ul> <li>allLookupKeys: [$0=11, $1=l.id] ($0 and $1 is_ _* the indexes of age and id in dim table)</li> <li>remainingCondition: l.name=r.name</li> <ul>_ _*_ _* The workflow of lookup join:_ _*_ _* 1) lookup records dimension table using the lookup-keys <br> 2) project & filter on the lookup-ed_ _* records <br> 3) join left input record and lookup-ed records <br> 4) only outputs the rows which_ _* match to the remainingCondition <br>_ I would have thought that a filter on a value on one of the sources could be pushed down a a filter to that source, rather than being in the lookup keys. Maybe that could be a subsequent optimization. I could be missing something. was (Author: JIRAUSER300523): an update on what I have found: I have switched on DEBUG put out the rules that are being driven for my recreation. I see : org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - optimize time_indicator cost 1 ms. optimize result: FlinkLogicalSink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type]) +- FlinkLogicalCalc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- FlinkLogicalJoin(condition=[=($0, $4)], joinType=[left]) :- FlinkLogicalCalc(select=[ip, PROCTIME() AS proctime]) : +- FlinkLogicalTableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0]) +- FlinkLogicalTableSourceScan(table=[[mariadb_catalog, menagerie, c, {*}filter=[=(type, 0)]]]{*}, fields=[ip, type]) Is removed in the next stage. org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - optimize physical cost 3 ms. optimize result: Sink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type]) +- Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) +- LookupJoin(table=[mariadb_catalog.menagerie.c], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, *CAST(0 AS INTEGER)* AS type, CAST(ip AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0]) +- Calc(select=[ip, PROCTIME() AS proctime]) +- TableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip]) The *CAST(0 AS INTEGER)* is in the final Optimized Execution Plan we see in the explain This cast is fine as long as the filter is there. I am not an expert at this, but it seems to me that either 2 things are happening: 1) This change to the graph is a valid optimization but it is not being actioned properly when executed, such that the CAST(0 AS INTEGER) is ignored. or 2) the comments at the top of [CommonPhysicalLookupJoin.scala]([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)] are correct and this filter should actually be in the lookup keys. The comments says _* For a lookup join query:_ _*_ _* <pre> SELECT T.id, T.content, D.age FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D_ _* ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id WHERE D.name LIKE 'Jack%'_ _* </pre>_ _*_ _* The LookupJoin physical node encapsulates the following RelNode tree:_ _*_ _* <pre> Join (l.name = r.name) / \ RelNode Calc (concat(name, "!") as name, name LIKE 'Jack%') |_ _* DimTable (lookup-keys: age=11, id=l.id) (age, id, name) </pre>_ _*_ _* The important member fields in LookupJoin: <ul> <li>allLookupKeys: [$0=11, $1=l.id] ($0 and $1 is_ _* the indexes of age and id in dim table)</li> <li>remainingCondition: l.name=r.name</li> <ul>_ _*_ _* The workflow of lookup join:_ _*_ _* 1) lookup records dimension table using the lookup-keys <br> 2) project & filter on the lookup-ed_ _* records <br> 3) join left input record and lookup-ed records <br> 4) only outputs the rows which_ _* match to the remainingCondition <br>_ > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > ------------------------------------------------------------------------------------------- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar > Reporter: macdoor615 > Assignee: david radley > Priority: Critical > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)