[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785321#comment-17785321 ]
Sergey Nuyanzin edited comment on FLINK-33365 at 11/12/23 10:20 PM: -------------------------------------------------------------------- looks like the issue is incomplete (at least for the case with {noformat} for system_time as of {noformat} ) implementation of filter push down FLINK-16024 TC to reproduce add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}} {code:java} @Test public void issue33365() { String dataId = TestValuesTableFactory.registerData( Arrays.asList( Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), Row.of(3L, "Charlie"))); util.tableEnv().executeSql( String.format( "CREATE TABLE value_source (\n" + "`id` BIGINT,\n" + "`name` STRING,\n" + "`proctime` AS PROCTIME()\n" + ") WITH (\n" + "'connector' = 'values', \n" + "'data-id' = '%s')", dataId)); util.verifyExecPlan( "SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source" + " AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = 0.0"); } {code} and check before and after commit [https://github.com/apache/flink/pull/20140] or same commit in jdbc connector (https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b) it shows different results. At the same time there is a bunch of WA like add something meaningless or cast or some other function use, e.g. slightly changed query started behave as expected since math operations, cast and other functions are not going to push down (yet) however in Flink 1.18 math operations could be simplified by newer Calcite (CALCITE-4420), so need to check case by case like {code:sql} SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = 0.0 + 0 {code} or with casting like {code:sql} SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = cast(0.0 as decimal) {code} [~macdoor615] that might help you with your queries [~qingwei91] could you have a look here please since you are aware of current implementation was (Author: sergey nuyanzin): looks like the issue is incomplete (at least for the case with {noformat} for system_time as of {noformat} ) implementation of filter push down FLINK-16024 TC to reproduce add this to {{org.apache.flink.connector.jdbc.table.JdbcTablePlanTest}} {code:java} @Test public void issue33365() { String dataId = TestValuesTableFactory.registerData( Arrays.asList( Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), Row.of(3L, "Charlie"))); util.tableEnv().executeSql( String.format( "CREATE TABLE value_source (\n" + "`id` BIGINT,\n" + "`name` STRING,\n" + "`proctime` AS PROCTIME()\n" + ") WITH (\n" + "'connector' = 'values', \n" + "'data-id' = '%s')", dataId)); util.verifyExecPlan( "SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source" + " AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = 0.0"); } {code} and check before and after commit [https://github.com/apache/flink/pull/20140] or same commit in jdbc connector (https://github.com/apache/flink-connector-jdbc/commit/3e3b40e8cfadfc16a8ab74d4ef6a3ab3ceafa57b) it shows different results. At the same time there is a bunch of WA like add something meaningless or cast or some other function use, e.g. slightly changed query started behave as expected since math operations, cast and other functions are not going to push down (yet) however in Flink 1.18 math operations could be simplified by newer Calcite (CALCITE-4420), so need to check case by case like {code:sql} SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = 0.0 + 0 {code} or with casting like {code:sql} SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source AS S JOIN jdbc for system_time as of S.proctime AS D ON S.id = D.id and D.decimal_col = cast(0.0 as decimal) {code} [~macdoor615] that might help you with your queries [~qingwei91] could you have a look here please since you are aware of current implementation > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > ------------------------------------------------------------------------------------------- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar > Reporter: macdoor615 > Assignee: david radley > Priority: Critical > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)