[ 
https://issues.apache.org/jira/browse/FLINK-22955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365856#comment-17365856
 ] 

Cooper Luan edited comment on FLINK-22955 at 6/19/21, 6:33 AM:
---------------------------------------------------------------

it's ok to implement LookupFunction like 
JdbcRowDataLookupFunction/JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

AsyncLookupFunction do not accept eval(Object... keys), so we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 


was (Author: gsavl):
it's ok to implement LookupFunction like JdbcDynamicTableSource

because LookupFunction accept eval(Object... keys)

 

AsyncLookupFunction do not accept eval(Object... keys), so we have to implement
{code:scala}
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
Integer, c String)
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer, b 
String, c String)
{code}
*but we don't know user schema (in lookup db like redis/hbase), which means 
don't know how to implement eval function.* 

 

however, there is workaround, just change
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
to
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE cast(age as bigint) = 30
{code}
 so the optimizer won't push down filter

 

[~qingru zhang] 

> lookup join filter push down result to mismatch function signature
> ------------------------------------------------------------------
>
>                 Key: FLINK-22955
>                 URL: https://issues.apache.org/jira/browse/FLINK-22955
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.3, 1.13.1, 1.12.4
>         Environment: Flink 1.13.1
> how to reproduce: patch file attached
>            Reporter: Cooper Luan
>            Priority: Critical
>             Fix For: 1.11.4, 1.12.5, 1.13.2
>
>         Attachments: 
> 0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch
>
>
> a sql like this may result to look function signature mismatch exception when 
> explain sql
> {code:sql}
> CREATE TEMPORARY VIEW v_vvv AS
> SELECT * FROM MyTable AS T
> JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
> ON T.a = D.id;
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 10;{code}
> the lookup function is
> {code:scala}
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: 
> Integer): Unit = {
>   }
> }{code}
> exec plan is
> {code:java}
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 
> 10)], select=[a, b, id, name])
>    +- Calc(select=[a, b])
>       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, rowtime])
> {code}
> the "lookup=[age=10, id=a]" result to mismatch signature mismatch
>  
> but if I add 1 more insert, it works well
> {code:sql}
> SELECT a,b,id,name
> FROM v_vvv
> WHERE age = 30
> {code}
> exec plan is
> {code:java}
> == Optimized Execution Plan ==
> LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
> joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
> rowtime, id, name, age, ts])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
> fields=[a, b, c, proctime, 
> rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 10)])
>    +- 
> Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
>  fields=[a, b, id, name])
> +- Calc(select=[a, b, id, name], where=[(age = 30)])
>    +- Reused(reference_id=[1])
> {code}
>  the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
> (wrong)
>  
> so, in "multi insert" case, planner works great
> in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to