One more thing for discussion: In our internal implementation, we reuse the option `table.exec.async-lookup.buffer-capacity` and `table.exec.async-lookup.timeout` to config the async udtf. Do you think we should add two extra option to distinguish from the lookup option such as
`table.exec.async-udtf.buffer-capacity` `table.exec.async-udtf.timeout` Best, Aitozi. Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道: > Hi Jing, > > > what is the difference between the RPC call or query you mentioned > and the lookup in a very > general way > > I think the RPC call or query service is quite similar to the lookup join. > But lookup join should work > with `LookupTableSource`. > > Let's see how we can perform an async RPC call with lookup join: > > (1) Implement an AsyncTableFunction with RPC call logic. > (2) Implement a `LookupTableSource` connector run with the async udtf > defined in (1). > (3) Then define a DDL of this look up table in SQL > > CREATE TEMPORARY TABLE Customers ( > id INT, > name STRING, > country STRING, > zip STRING > ) WITH ( > 'connector' = 'custom' > ); > > (4) Run with the query as below: > > SELECT o.order_id, o.total, c.country, c.zip > FROM Orders AS o > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > ON o.customer_id = c.id; > > This example is from doc > <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join>.You > can image the look up process as an async RPC call process. > > Let's see how we can perform an async RPC call with lateral join: > > (1) Implement an AsyncTableFunction with RPC call logic. > (2) Run query with > > Create function f1 as '...' ; > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders lateral table > (f1(order_id)) as T(...); > > As you can see, the lateral join version is more simple and intuitive to > users. Users do not have to wrap a > LookupTableSource for the purpose of using async udtf. > > In the end, We can also see the user defined async table function is an > enhancement of the current lateral table join > which only supports sync lateral join now. > > Best, > Aitozi. > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道: > >> Hi Aitozi, >> >> Thanks for the update. Just out of curiosity, what is the difference >> between the RPC call or query you mentioned and the lookup in a very >> general way? Since Lateral join is used in the FLIP. Is there any special >> thought for that? Sorry for asking so many questions. The FLIP contains >> limited information to understand the motivation. >> >> Best regards, >> Jing >> >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com> wrote: >> >> > Hi Jing, >> > I have updated the proposed changes to the FLIP. IMO, lookup has its >> > clear >> > async call requirement is due to its IO heavy operator. In our usage, >> sql >> > users have >> > logic to do some RPC call or query the third-party service which is >> also IO >> > intensive. >> > In these case, we'd like to leverage the async function to improve the >> > throughput. >> > >> > Thanks, >> > Aitozi. >> > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道: >> > >> > > Hi Aitozi, >> > > >> > > Sorry for the late reply. Would you like to update the proposed >> changes >> > > with more details into the FLIP too? >> > > I got your point. It looks like a rational idea. However, since lookup >> > has >> > > its clear async call requirement, are there any real use cases that >> > > need this change? This will help us understand the motivation. After >> all, >> > > lateral join and temporal lookup join[1] are quite different. >> > > >> > > Best regards, >> > > Jing >> > > >> > > >> > > [1] >> > > >> > > >> > >> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54 >> > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <gjying1...@gmail.com> wrote: >> > > >> > > > Hi Jing, >> > > > What do you think about it? Can we move forward this feature? >> > > > >> > > > Thanks, >> > > > Aitozi. >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道: >> > > > >> > > > > Hi Jing, >> > > > > > "Do you mean to support the AyncTableFunction beyond the >> > > > > LookupTableSource?" >> > > > > Yes, I mean to support the AyncTableFunction beyond the >> > > > LookupTableSource. >> > > > > >> > > > > The "AsyncTableFunction" is the function with ability to be >> executed >> > > > async >> > > > > (with AsyncWaitOperator). >> > > > > The async lookup join is a one of usage of this. So, we don't >> have to >> > > > bind >> > > > > the AyncTableFunction with LookupTableSource. >> > > > > If User-defined AsyncTableFunction is supported, user can directly >> > use >> > > > > lateral table syntax to perform async operation. >> > > > > >> > > > > > "It would be better if you could elaborate the proposed changes >> wrt >> > > the >> > > > > CorrelatedCodeGenerator with more details" >> > > > > >> > > > > In the proposal, we use lateral table syntax to support the async >> > table >> > > > > function. So the planner will also treat this statement to a >> > > > > CommonExecCorrelate node. So the runtime code should be generated >> in >> > > > > CorrelatedCodeGenerator. >> > > > > In CorrelatedCodeGenerator, we will know the TableFunction's Kind >> of >> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE` >> > > > > For `FunctionKind.ASYNC_TABLE` we can generate a >> AsyncWaitOperator >> > to >> > > > > execute the async table function. >> > > > > >> > > > > >> > > > > Thanks, >> > > > > Aitozi. >> > > > > >> > > > > >> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一 03:22写道: >> > > > > >> > > > >> Hi Aitozi, >> > > > >> >> > > > >> Thanks for the clarification. The naming "Lookup" might suggest >> > using >> > > it >> > > > >> for table look up. But conceptually what the eval() method will >> do >> > is >> > > to >> > > > >> get a collection of results(Row, RowData) from the given keys. >> How >> > it >> > > > will >> > > > >> be done depends on the implementation, i.e. you can implement >> your >> > own >> > > > >> Source[1][2]. The example in the FLIP should be able to be >> handled >> > in >> > > > this >> > > > >> way. >> > > > >> >> > > > >> Do you mean to support the AyncTableFunction beyond the >> > > > LookupTableSource? >> > > > >> It would be better if you could elaborate the proposed changes >> wrt >> > the >> > > > >> CorrelatedCodeGenerator with more details. Thanks! >> > > > >> >> > > > >> Best regards, >> > > > >> Jing >> > > > >> >> > > > >> [1] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64 >> > > > >> [2] >> > > > >> >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49 >> > > > >> >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi <gjying1...@gmail.com> >> > wrote: >> > > > >> >> > > > >> > Hi Jing, >> > > > >> > Thanks for your response. As stated in the FLIP, the >> purpose >> > of >> > > > this >> > > > >> > FLIP is meant to support >> > > > >> > user-defined async table function. As described in flink >> document >> > > [1] >> > > > >> > >> > > > >> > Async table functions are special functions for table sources >> that >> > > > >> perform >> > > > >> > > a lookup. >> > > > >> > > >> > > > >> > >> > > > >> > So end user can not directly define and use async table >> function >> > > now. >> > > > An >> > > > >> > user case is reported in [2] >> > > > >> > >> > > > >> > So, in conclusion, no new interface is introduced, but we >> extend >> > the >> > > > >> > ability to support user-defined async table function. >> > > > >> > >> > > > >> > [1]: >> > > > >> > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/ >> > > > >> > [2]: >> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b >> > > > >> > >> > > > >> > Thanks. >> > > > >> > Aitozi. >> > > > >> > >> > > > >> > >> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六 06:40写道: >> > > > >> > >> > > > >> > > Hi Aitozi, >> > > > >> > > >> > > > >> > > Thanks for your proposal. I am not quite sure if I understood >> > your >> > > > >> > thoughts >> > > > >> > > correctly. You described a special case implementation of the >> > > > >> > > AsyncTableFunction with on public API changes. Would you >> please >> > > > >> elaborate >> > > > >> > > your purpose of writing a FLIP according to the FLIP >> > > > documentation[1]? >> > > > >> > > Thanks! >> > > > >> > > >> > > > >> > > [1] >> > > > >> > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >> > > > >> > > >> > > > >> > > Best regards, >> > > > >> > > Jing >> > > > >> > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi <gjying1...@gmail.com >> > >> > > > wrote: >> > > > >> > > >> > > > >> > > > May I ask for some feedback :D >> > > > >> > > > >> > > > >> > > > Thanks, >> > > > >> > > > Aitozi >> > > > >> > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 19:14写道: >> > > > >> > > > > >> > > > >> > > > > Just catch an user case report from Giannis Polyzos for >> this >> > > > >> usage: >> > > > >> > > > > >> > > > >> > > > > >> > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b >> > > > >> > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 17:45写道: >> > > > >> > > > > > >> > > > >> > > > > > Hi guys, >> > > > >> > > > > > I want to bring up a discussion about adding >> support >> > of >> > > > User >> > > > >> > > > > > Defined AsyncTableFunction in Flink. >> > > > >> > > > > > Currently, async table function are special functions >> for >> > > > table >> > > > >> > > source >> > > > >> > > > > > to perform >> > > > >> > > > > > async lookup. However, it's worth to support the user >> > > defined >> > > > >> async >> > > > >> > > > > > table function. >> > > > >> > > > > > Because, in this way, the end SQL user can leverage it >> to >> > > > >> perform >> > > > >> > the >> > > > >> > > > > > async operation >> > > > >> > > > > > which is useful to maximum the system throughput >> > especially >> > > > for >> > > > >> IO >> > > > >> > > > > > bottleneck case. >> > > > >> > > > > > >> > > > >> > > > > > You can find some more detail in [1]. >> > > > >> > > > > > >> > > > >> > > > > > Looking forward to feedback >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > [1]: >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction >> > > > >> > > > > > >> > > > >> > > > > > Thanks, >> > > > >> > > > > > Aitozi. >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > >> > > >> > >> >