Hi
We're using multi sink in sql with view, the TestCase is
"""java
@Test
def testJoinTemporalTableWithViewWithFilterPushDown(): Unit = {
createLookupTable("LookupTableAsync1", new AsyncTableFunction1)
util.addTable(
"""
|CREATE TEMPORARY VIEW v_vvv AS
|SELECT * FROM MyTable AS T
|JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id
|""".stripMargin)
val stmtSet = util.tableEnv.createStatementSet()
val appendSink1 = util.createRetractTableSink(
Array("a", "b","id","name"),
Array(INT, STRING, INT, STRING))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
"appendSink1", appendSink1)
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT
a,b,id,name FROM v_vvv
|WHERE age =
10
""".stripMargin))
stmtSet.addInsert("appendSink1", util.tableEnv.sqlQuery("""
|SELECT
a,b,id,name FROM v_vvv
|WHERE age =
30
""".stripMargin))
// util.verifyPlan(stmtSet)
util.verifyExecPlan(stmtSet)
}
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
def eval(resultFuture: CompletableFuture[JCollection[RowData]], a:
Integer): Unit = {
}
}
"""
the optimized exec plan is
"""
Calc(select=[a, b])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`],
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age =
10)], select=[a, b, id, name])
+- Reused(reference_id=[1])
LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1],
joinType=[InnerJoin], async=[true], lookup=[age=30, id=a], where=[(age =
30)], select=[a, b, id, name])
+- Reused(reference_id=[1])
"""
*I have 2 questions*
*1. The lookup table function execute twice, which is very expensive*
*2. the age filter is push down to LookupJoin with lookup=[age=10, id=a],
which result to function signature mismatch (exception follows blow)*
org.apache.flink.table.api.ValidationException: Could not find an
implementation method 'eval' in class
'org.apache.flink.table.planner.plan.utils.AsyncTableFunction1' for
function 'default_catalog.default_database.LookupTableAsync1, source:
[TestInvalidTemporalTable(id, name, age, ts)]' that matches the following
signature:
void eval(java.util.concurrent.CompletableFuture, java.lang.Integer,
java.lang.Integer)
Is the optimizer wrong or I'm wrong ?
Cooper.Luan