Cooper Luan created FLINK-22955: ----------------------------------- Summary: lookup join filter push down result to mismatch function signature Key: FLINK-22955 URL: https://issues.apache.org/jira/browse/FLINK-22955 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.4, 1.13.1, 1.11.3 Environment: Flink 1.13.1
how to reproduce: patch file attached Reporter: Cooper Luan Attachments: 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch a sql like this may result to look function signature mismatch exception when explain sql {code:sql} CREATE TEMPORARY VIEW v_vvv AS SELECT * FROM MyTable AS T JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id; SELECT a,b,id,name FROM v_vvv WHERE age = 10;{code} the lookup function is {code:scala} class AsyncTableFunction1 extends AsyncTableFunction[RowData] { def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = { } }{code} exec plan is {code:java} LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name]) +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name]) +- Calc(select=[a, b]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) {code} the "lookup=[age=10, id=a]" result to mismatch signature mismatch but if I add 1 more insert, it works well {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} exec plan is {code:java} == Optimized Execution Plan == LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, ts])(reuse_id=[1]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name]) +- Calc(select=[a, b, id, name], where=[(age = 10)]) +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name]) +- Calc(select=[a, b, id, name], where=[(age = 30)]) +- Reused(reference_id=[1]) {code} the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" (wrong) so, in "multi insert" case, planner works great in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)