should I post in Dev user list ?

On Mon, 7 Jun 2021 at 18:56 Luan Cooper <gc.su...@gmail.com> wrote:

> Hi
>
> We're using multi sink in sql with view, the TestCase is
>
> """java
>   @Test
>   def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
>     createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
>
>     util.addTable(
>       """
>         |CREATE TEMPORARY VIEW v_vvv AS
>         |SELECT * FROM MyTable AS T
>         |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
>         |ON T.a = D.id
>         |""".stripMargin)
>
>     val stmtSet = util.tableEnv.createStatementSet()
>
>     val appendSink1 = util.createRetractTableSink(
>       Array("a", "b","id","name"),
>       Array(INT, STRING, INT, STRING))
>
> util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
>       "appendSink1", appendSink1)
>     stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
>                                                               |SELECT
> a,b,id,name FROM v_vvv
>                                                               |WHERE age =
> 10
>                          """.stripMargin))
>
>     stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
>                                                               |SELECT
> a,b,id,name FROM v_vvv
>                                                               |WHERE age =
> 30
>
> """.stripMargin))
>
> //    util.verifyPlan(stmtSet)
>     util.verifyExecPlan(stmtSet)
>   }
>
> class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
>   def eval(resultFuture: CompletableFuture[JCollection[RowData]], a:
> Integer): Unit = {
>   }
> }
> """
>
> the optimized exec plan is
>
> """
> Calc(select=[a, b])(reuse_id=[1])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
> fields=[a, b, c, proctime, rowtime])
>
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
> joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age =
> 10)], select=[a, b, id, name])
>    +- Reused(reference_id=[1])
>
> LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
> fields=[a, b, id, name])
> +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
> joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age =
> 30)], select=[a, b, id, name])
>    +- Reused(reference_id=[1])
> """
>
> *I have 2 questions*
> *1. The lookup table function execute twice, which is very expensive*
> *2. the age filter is push down to LookupJoin with lookup=[age=10, id=a],
> which result to function signature mismatch (exception follows blow)*
>
> org.apache.flink.table.api.ValidationException: Could not find an
> implementation method 'eval' in class
> 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for
> function 'default_catalog.default_database.LookupTableAsync1, source:
> [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following
> signature:
> void eval(java.util.concurrent.CompletableFuture, java.lang.Integer,
> java.lang.Integer)
>
> Is the optimizer wrong or I'm wrong ?
>
> Cooper.Luan
>
>

Reply via email to