should I post in Dev user list ? On Mon, 7 Jun 2021 at 18:56 Luan Cooper <gc.su...@gmail.com> wrote:
> Hi > > We're using multi sink in sql with view, the TestCase is > > """java > @Test > def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = { > createLookupTable("LookupTableAsync1", new AsyncTableFunction1) > > util.addTable( > """ > |CREATE TEMPORARY VIEW v_vvv AS > |SELECT * FROM MyTable AS T > |JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D > |ON T.a = D.id > |""".stripMargin) > > val stmtSet = util.tableEnv.createStatementSet() > > val appendSink1 = util.createRetractTableSink( > Array("a", "b","id","name"), > Array(INT, STRING, INT, STRING)) > > util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( > "appendSink1", appendSink1) > stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery(""" > |SELECT > a,b,id,name FROM v_vvv > |WHERE age = > 10 > """.stripMargin)) > > stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery(""" > |SELECT > a,b,id,name FROM v_vvv > |WHERE age = > 30 > > """.stripMargin)) > > // util.verifyPlan(stmtSet) > util.verifyExecPlan(stmtSet) > } > > class AsyncTableFunction1 extends AsyncTableFunction[RowData] { > def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: > Integer): Unit = { > } > } > """ > > the optimized exec plan is > > """ > Calc(select=[a, b])(reuse_id=[1]) > +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], > fields=[a, b, c, proctime, rowtime]) > > LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = > 10)], select=[a, b, id, name]) > +- Reused(reference_id=[1]) > > LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], > fields=[a, b, id, name]) > +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], > joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age = > 30)], select=[a, b, id, name]) > +- Reused(reference_id=[1]) > """ > > *I have 2 questions* > *1. The lookup table function execute twice, which is very expensive* > *2. the age filter is push down to LookupJoin with lookup=[age=10, id=a], > which result to function signature mismatch (exception follows blow)* > > org.apache.flink.table.api.ValidationException: Could not find an > implementation method 'eval' in class > 'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for > function 'default_catalog.default_database.LookupTableAsync1, source: > [TestInvalidTemporalTable(id, name, age, ts)]' that matches the following > signature: > void eval(java.util.concurrent.CompletableFuture, java.lang.Integer, > java.lang.Integer) > > Is the optimizer wrong or I'm wrong ? > > Cooper.Luan > >