[ https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365856#comment-17365856 ]
Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:33 AM: --------------------------------------------------------------- it's ok to implement LookupFunction like JdbcRowDataLookupFunction/JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) AsyncLookupFunction do not accept eval(Object... keys), so we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] was (Author: gsavl): it's ok to implement LookupFunction like JdbcDynamicTableSource because LookupFunction accept eval(Object... keys) AsyncLookupFunction do not accept eval(Object... keys), so we have to implement {code:scala} def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b Integer, c String) def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b String, c String) {code} *but we don't know user schema (in lookup db like redis/hbase), which means don't know how to implement eval function.* however, there is workaround, just change {code:sql} SELECT a,b,id,name FROM v_vvv WHERE age = 30 {code} to {code:sql} SELECT a,b,id,name FROM v_vvv WHERE cast(age as bigint) = 30 {code} so the optimizer won't push down filter [~qingru zhang] > lookup join filter push down result to mismatch function signature > ------------------------------------------------------------------ > > Key: FLINK-22955 > URL: https://issues.apache.org/jira/browse/FLINK-22955 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.11.3, 1.13.1, 1.12.4 > Environment: Flink 1.13.1 > how to reproduce: patch file attached > Reporter: Cooper Luan > Priority: Critical > Fix For: 1.11.4, 1.12.5, 1.13.2 > > Attachments: > 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch > > > a sql like this may result to look function signature mismatch exception when > explain sql > {code:sql} > CREATE TEMPORARY VIEW v_vvv AS > SELECT * FROM MyTable AS T > JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > ON T.a = D.id; > SELECT a,b,id,name > FROM v_vvv > WHERE age = 10;{code} > the lookup function is > {code:scala} > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > }{code} > exec plan is > {code:java} > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) > +- Calc(select=[a, b]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > {code} > the "lookup=[age=10, id=a]" result to mismatch signature mismatch > > but if I add 1 more insert, it works well > {code:sql} > SELECT a,b,id,name > FROM v_vvv > WHERE age = 30 > {code} > exec plan is > {code:java} > == Optimized Execution Plan == > LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, > rowtime, id, name, age, ts])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, > rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 10)]) > +- > Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- Calc(select=[a, b, id, name], where=[(age = 30)]) > +- Reused(reference_id=[1]) > {code} > the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" > (wrong) > > so, in "multi insert" case, planner works great > in "single insert" case, planner throw exception -- This message was sent by Atlassian Jira (v8.3.4#803005)