One more thing for discussion:

In our internal implementation, we reuse the option
`table.exec.async-lookup.buffer-capacity` and
`table.exec.async-lookup.timeout` to config
the async udtf. Do you think we should add two extra option to distinguish
from the lookup option such as

`table.exec.async-udtf.buffer-capacity`
`table.exec.async-udtf.timeout`


Best,
Aitozi.



Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道:

> Hi Jing,
>
>     > what is the difference between the RPC call or query you mentioned
> and the lookup in a very
> general way
>
> I think the RPC call or query service is quite similar to the lookup join.
> But lookup join should work
> with `LookupTableSource`.
>
> Let's see how we can perform an async RPC call with lookup join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Implement a `LookupTableSource` connector run with the async udtf
> defined in (1).
> (3) Then define a DDL of this look up table in SQL
>
> CREATE TEMPORARY TABLE Customers (
>   id INT,
>   name STRING,
>   country STRING,
>   zip STRING
> ) WITH (
>   'connector' = 'custom'
> );
>
> (4) Run with the query as below:
>
> SELECT o.order_id, o.total, c.country, c.zip
> FROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>     ON o.customer_id = c.id;
>
> This example is from doc
> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join>.You
> can image the look up process as an async RPC call process.
>
> Let's see how we can perform an async RPC call with lateral join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Run query with
>
> Create function f1 as '...' ;
>
> SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> (f1(order_id)) as T(...);
>
> As you can see, the lateral join version is more simple and intuitive to
> users. Users do not have to wrap a
> LookupTableSource for the purpose of using async udtf.
>
> In the end, We can also see the user defined async table function is an
> enhancement of the current lateral table join
> which only supports sync lateral join now.
>
> Best,
> Aitozi.
>
>
> Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道:
>
>> Hi Aitozi,
>>
>> Thanks for the update. Just out of curiosity, what is the difference
>> between the RPC call or query you mentioned and the lookup in a very
>> general way? Since Lateral join is used in the FLIP. Is there any special
>> thought for that? Sorry for asking so many questions. The FLIP contains
>> limited information to understand the motivation.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com> wrote:
>>
>> > Hi Jing,
>> >     I have updated the proposed changes to the FLIP. IMO, lookup has its
>> > clear
>> > async call requirement is due to its IO heavy operator. In our usage,
>> sql
>> > users have
>> > logic to do some RPC call or query the third-party service which is
>> also IO
>> > intensive.
>> > In these case, we'd like to leverage the async function to improve the
>> > throughput.
>> >
>> > Thanks,
>> > Aitozi.
>> >
>> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Sorry for the late reply. Would you like to update the proposed
>> changes
>> > > with more details into the FLIP too?
>> > > I got your point. It looks like a rational idea. However, since lookup
>> > has
>> > > its clear async call requirement, are there any real use cases that
>> > > need this change? This will help us understand the motivation. After
>> all,
>> > > lateral join and temporal lookup join[1] are quite different.
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
>> > >
>> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <gjying1...@gmail.com> wrote:
>> > >
>> > > > Hi Jing,
>> > > >     What do you think about it? Can we move forward this feature?
>> > > >
>> > > > Thanks,
>> > > > Aitozi.
>> > > >
>> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道:
>> > > >
>> > > > > Hi Jing,
>> > > > >     > "Do you mean to support the AyncTableFunction beyond the
>> > > > > LookupTableSource?"
>> > > > > Yes, I mean to support the AyncTableFunction beyond the
>> > > > LookupTableSource.
>> > > > >
>> > > > > The "AsyncTableFunction" is the function with ability to be
>> executed
>> > > > async
>> > > > > (with AsyncWaitOperator).
>> > > > > The async lookup join is a one of usage of this. So, we don't
>> have to
>> > > > bind
>> > > > > the AyncTableFunction with LookupTableSource.
>> > > > > If User-defined AsyncTableFunction is supported, user can directly
>> > use
>> > > > > lateral table syntax to perform async operation.
>> > > > >
>> > > > > > "It would be better if you could elaborate the proposed changes
>> wrt
>> > > the
>> > > > > CorrelatedCodeGenerator with more details"
>> > > > >
>> > > > > In the proposal, we use lateral table syntax to support the async
>> > table
>> > > > > function. So the planner will also treat this statement to a
>> > > > > CommonExecCorrelate node. So the runtime code should be generated
>> in
>> > > > > CorrelatedCodeGenerator.
>> > > > > In CorrelatedCodeGenerator, we will know the TableFunction's Kind
>> of
>> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
>> > > > > For  `FunctionKind.ASYNC_TABLE` we can generate a
>> AsyncWaitOperator
>> > to
>> > > > > execute the async table function.
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Aitozi.
>> > > > >
>> > > > >
>> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一 03:22写道:
>> > > > >
>> > > > >> Hi Aitozi,
>> > > > >>
>> > > > >> Thanks for the clarification. The naming "Lookup" might suggest
>> > using
>> > > it
>> > > > >> for table look up. But conceptually what the eval() method will
>> do
>> > is
>> > > to
>> > > > >> get a collection of results(Row, RowData) from the given keys.
>> How
>> > it
>> > > > will
>> > > > >> be done depends on the implementation, i.e. you can implement
>> your
>> > own
>> > > > >> Source[1][2]. The example in the FLIP should be able to be
>> handled
>> > in
>> > > > this
>> > > > >> way.
>> > > > >>
>> > > > >> Do you mean to support the AyncTableFunction beyond the
>> > > > LookupTableSource?
>> > > > >> It would be better if you could elaborate the proposed changes
>> wrt
>> > the
>> > > > >> CorrelatedCodeGenerator with more details. Thanks!
>> > > > >>
>> > > > >> Best regards,
>> > > > >> Jing
>> > > > >>
>> > > > >> [1]
>> > > > >>
>> > > > >>
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
>> > > > >> [2]
>> > > > >>
>> > > > >>
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
>> > > > >>
>> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi <gjying1...@gmail.com>
>> > wrote:
>> > > > >>
>> > > > >> > Hi Jing,
>> > > > >> >     Thanks for your response. As stated in the FLIP, the
>> purpose
>> > of
>> > > > this
>> > > > >> > FLIP is meant to support
>> > > > >> > user-defined async table function. As described in flink
>> document
>> > > [1]
>> > > > >> >
>> > > > >> > Async table functions are special functions for table sources
>> that
>> > > > >> perform
>> > > > >> > > a lookup.
>> > > > >> > >
>> > > > >> >
>> > > > >> > So end user can not directly define and use async table
>> function
>> > > now.
>> > > > An
>> > > > >> > user case is reported in [2]
>> > > > >> >
>> > > > >> > So, in conclusion, no new interface is introduced, but we
>> extend
>> > the
>> > > > >> > ability to support user-defined async table function.
>> > > > >> >
>> > > > >> > [1]:
>> > > > >> >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
>> > > > >> > [2]:
>> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> > > > >> >
>> > > > >> > Thanks.
>> > > > >> > Aitozi.
>> > > > >> >
>> > > > >> >
>> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六 06:40写道:
>> > > > >> >
>> > > > >> > > Hi Aitozi,
>> > > > >> > >
>> > > > >> > > Thanks for your proposal. I am not quite sure if I understood
>> > your
>> > > > >> > thoughts
>> > > > >> > > correctly. You described a special case implementation of the
>> > > > >> > > AsyncTableFunction with on public API changes. Would you
>> please
>> > > > >> elaborate
>> > > > >> > > your purpose of writing a FLIP according to the FLIP
>> > > > documentation[1]?
>> > > > >> > > Thanks!
>> > > > >> > >
>> > > > >> > > [1]
>> > > > >> > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> > > > >> > >
>> > > > >> > > Best regards,
>> > > > >> > > Jing
>> > > > >> > >
>> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi <gjying1...@gmail.com
>> >
>> > > > wrote:
>> > > > >> > >
>> > > > >> > > > May I ask for some feedback  :D
>> > > > >> > > >
>> > > > >> > > > Thanks,
>> > > > >> > > > Aitozi
>> > > > >> > > >
>> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 19:14写道:
>> > > > >> > > > >
>> > > > >> > > > > Just catch an user case report from Giannis Polyzos for
>> this
>> > > > >> usage:
>> > > > >> > > > >
>> > > > >> > > > >
>> > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> > > > >> > > > >
>> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 17:45写道:
>> > > > >> > > > > >
>> > > > >> > > > > > Hi guys,
>> > > > >> > > > > >     I want to bring up a discussion about adding
>> support
>> > of
>> > > > User
>> > > > >> > > > > > Defined AsyncTableFunction in Flink.
>> > > > >> > > > > > Currently, async table function are special functions
>> for
>> > > > table
>> > > > >> > > source
>> > > > >> > > > > > to perform
>> > > > >> > > > > > async lookup. However, it's worth to support the user
>> > > defined
>> > > > >> async
>> > > > >> > > > > > table function.
>> > > > >> > > > > > Because, in this way, the end SQL user can leverage it
>> to
>> > > > >> perform
>> > > > >> > the
>> > > > >> > > > > > async operation
>> > > > >> > > > > > which is useful to maximum the system throughput
>> > especially
>> > > > for
>> > > > >> IO
>> > > > >> > > > > > bottleneck case.
>> > > > >> > > > > >
>> > > > >> > > > > > You can find some more detail in [1].
>> > > > >> > > > > >
>> > > > >> > > > > > Looking forward to feedback
>> > > > >> > > > > >
>> > > > >> > > > > >
>> > > > >> > > > > > [1]:
>> > > > >> > > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
>> > > > >> > > > > >
>> > > > >> > > > > > Thanks,
>> > > > >> > > > > > Aitozi.
>> > > > >> > > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to