Hi Jing, The performance test is added to the FLIP.
As I know, The lookup join can return multi rows, it depends on whether the join key is the primary key of the external database or not. The `lookup` [1] will return a collection of joined result, and each of them will be collected [1]: https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52 Thanks, Aitozi. Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 17:05写道: > Hi Aitozi, > > Thanks for the feedback. Looking forward to the performance tests. > > Afaik, lookup returns one row for each key [1] [2]. Conceptually, the > lookup function is used to enrich column(s) from the dimension table. If, > for the given key, there will be more than one row, there will be no way to > know which row will be used to enrich the key. > > [1] > > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49 > [2] > > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196 > > Best regards, > Jing > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi <gjying1...@gmail.com> wrote: > > > Hi Jing > > Thanks for your good questions. I have updated the example to the > FLIP. > > > > > Only one row for each lookup > > lookup can also return multi rows, based on the query result. [1] > > > > [1]: > > > > > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56 > > > > > If we use async calls with lateral join, my gut feeling is > > that we might have many more async calls than lookup join. I am not > really > > sure if we will be facing potential issues in this case or not. > > > > IMO, the work pattern is similar to the lookup function, for each row > from > > the left table, > > it will evaluate the eval method once, so the async call numbers will not > > change. > > and the maximum calls in flight is limited by the Async operators buffer > > capacity > > which will be controlled by the option. > > > > BTW, for the naming of these option, I updated the FLIP about this you > can > > refer to > > the section of "ConfigOption" and "Rejected Alternatives" > > > > In the end, for the performance evaluation, I'd like to do some tests and > > will update it to the FLIP doc > > > > Thanks, > > Aitozi. > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 07:23写道: > > > > > Hi Aitozi, > > > > > > Thanks for the clarification. The code example looks interesting. I > would > > > suggest adding them into the FLIP. The description with code examples > > will > > > help readers understand the motivation and how to use it. Afaiac, it > is a > > > valid feature for Flink users. > > > > > > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME > > AS > > > OF which is also used in your code example. Temporal join performs the > > > lookup based on the processing time match. Only one row for each > > > lookup(afaiu, I need to check the source code to double confirm) will > > > return for further enrichment. One the other hand, lateral join will > have > > > sub-queries correlated with every individual value of the reference > table > > > from the preceding part of the query and each sub query will return > > > multiple rows. If we use async calls with lateral join, my gut feeling > is > > > that we might have many more async calls than lookup join. I am not > > really > > > sure if we will be facing potential issues in this case or not. > Possible > > > issues I can think of now e.g. too many PRC calls, too many async calls > > > processing, the sub query will return a table which might be (too) big, > > and > > > might cause performance issues. I would suggest preparing some use > cases > > > and running some performance tests to check it. These are my concerns > > about > > > using async calls with lateral join and I'd like to share with you, > happy > > > to discuss with you and hear different opinions, hopefully the > > > discussion could help me understand it more deeply. Please correct me > if > > I > > > am wrong. > > > > > > Best regards, > > > Jing > > > > > > > > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi <gjying1...@gmail.com> wrote: > > > > > > > Hi Mason, > > > > Thanks for your input. I think if we support the user defined > async > > > > table function, > > > > user will be able to use it to hold a batch data then handle it at > one > > > time > > > > in the customized function. > > > > > > > > AsyncSink is meant for the sink operator. I have not figure out how > to > > > > integrate in this case. > > > > > > > > Thanks, > > > > Atiozi. > > > > > > > > > > > > Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四 12:40写道: > > > > > > > > > Hi Aitozi, > > > > > > > > > > I think it makes sense to make it easier for SQL users to make > RPCs. > > Do > > > > you > > > > > think your proposal can extend to the ability to batch data for the > > > RPC? > > > > > This is also another common strategy to increase throughput. Also, > > have > > > > you > > > > > considered solving this a bit differently by leveraging Flink's > > > > AsyncSink? > > > > > > > > > > Best, > > > > > Mason > > > > > > > > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi <gjying1...@gmail.com> > wrote: > > > > > > > > > > > One more thing for discussion: > > > > > > > > > > > > In our internal implementation, we reuse the option > > > > > > `table.exec.async-lookup.buffer-capacity` and > > > > > > `table.exec.async-lookup.timeout` to config > > > > > > the async udtf. Do you think we should add two extra option to > > > > > distinguish > > > > > > from the lookup option such as > > > > > > > > > > > > `table.exec.async-udtf.buffer-capacity` > > > > > > `table.exec.async-udtf.timeout` > > > > > > > > > > > > > > > > > > Best, > > > > > > Aitozi. > > > > > > > > > > > > > > > > > > > > > > > > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道: > > > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > > > > what is the difference between the RPC call or query you > > > > > mentioned > > > > > > > and the lookup in a very > > > > > > > general way > > > > > > > > > > > > > > I think the RPC call or query service is quite similar to the > > > lookup > > > > > > join. > > > > > > > But lookup join should work > > > > > > > with `LookupTableSource`. > > > > > > > > > > > > > > Let's see how we can perform an async RPC call with lookup > join: > > > > > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call logic. > > > > > > > (2) Implement a `LookupTableSource` connector run with the > async > > > udtf > > > > > > > defined in (1). > > > > > > > (3) Then define a DDL of this look up table in SQL > > > > > > > > > > > > > > CREATE TEMPORARY TABLE Customers ( > > > > > > > id INT, > > > > > > > name STRING, > > > > > > > country STRING, > > > > > > > zip STRING > > > > > > > ) WITH ( > > > > > > > 'connector' = 'custom' > > > > > > > ); > > > > > > > > > > > > > > (4) Run with the query as below: > > > > > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip > > > > > > > FROM Orders AS o > > > > > > > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > > > > > > > ON o.customer_id = c.id; > > > > > > > > > > > > > > This example is from doc > > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join > > > > > > >.You > > > > > > > can image the look up process as an async RPC call process. > > > > > > > > > > > > > > Let's see how we can perform an async RPC call with lateral > join: > > > > > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call logic. > > > > > > > (2) Run query with > > > > > > > > > > > > > > Create function f1 as '...' ; > > > > > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders > lateral > > > > table > > > > > > > (f1(order_id)) as T(...); > > > > > > > > > > > > > > As you can see, the lateral join version is more simple and > > > intuitive > > > > > to > > > > > > > users. Users do not have to wrap a > > > > > > > LookupTableSource for the purpose of using async udtf. > > > > > > > > > > > > > > In the end, We can also see the user defined async table > function > > > is > > > > an > > > > > > > enhancement of the current lateral table join > > > > > > > which only supports sync lateral join now. > > > > > > > > > > > > > > Best, > > > > > > > Aitozi. > > > > > > > > > > > > > > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道: > > > > > > > > > > > > > >> Hi Aitozi, > > > > > > >> > > > > > > >> Thanks for the update. Just out of curiosity, what is the > > > difference > > > > > > >> between the RPC call or query you mentioned and the lookup in > a > > > very > > > > > > >> general way? Since Lateral join is used in the FLIP. Is there > > any > > > > > > special > > > > > > >> thought for that? Sorry for asking so many questions. The FLIP > > > > > contains > > > > > > >> limited information to understand the motivation. > > > > > > >> > > > > > > >> Best regards, > > > > > > >> Jing > > > > > > >> > > > > > > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com> > > > wrote: > > > > > > >> > > > > > > >> > Hi Jing, > > > > > > >> > I have updated the proposed changes to the FLIP. IMO, > > lookup > > > > has > > > > > > its > > > > > > >> > clear > > > > > > >> > async call requirement is due to its IO heavy operator. In > our > > > > > usage, > > > > > > >> sql > > > > > > >> > users have > > > > > > >> > logic to do some RPC call or query the third-party service > > which > > > > is > > > > > > >> also IO > > > > > > >> > intensive. > > > > > > >> > In these case, we'd like to leverage the async function to > > > improve > > > > > the > > > > > > >> > throughput. > > > > > > >> > > > > > > > >> > Thanks, > > > > > > >> > Aitozi. > > > > > > >> > > > > > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道: > > > > > > >> > > > > > > > >> > > Hi Aitozi, > > > > > > >> > > > > > > > > >> > > Sorry for the late reply. Would you like to update the > > > proposed > > > > > > >> changes > > > > > > >> > > with more details into the FLIP too? > > > > > > >> > > I got your point. It looks like a rational idea. However, > > > since > > > > > > lookup > > > > > > >> > has > > > > > > >> > > its clear async call requirement, are there any real use > > cases > > > > > that > > > > > > >> > > need this change? This will help us understand the > > motivation. > > > > > After > > > > > > >> all, > > > > > > >> > > lateral join and temporal lookup join[1] are quite > > different. > > > > > > >> > > > > > > > > >> > > Best regards, > > > > > > >> > > Jing > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > [1] > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54 > > > > > > >> > > > > > > > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi < > > gjying1...@gmail.com> > > > > > > wrote: > > > > > > >> > > > > > > > > >> > > > Hi Jing, > > > > > > >> > > > What do you think about it? Can we move forward this > > > > > feature? > > > > > > >> > > > > > > > > > >> > > > Thanks, > > > > > > >> > > > Aitozi. > > > > > > >> > > > > > > > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道: > > > > > > >> > > > > > > > > > >> > > > > Hi Jing, > > > > > > >> > > > > > "Do you mean to support the AyncTableFunction > > beyond > > > > the > > > > > > >> > > > > LookupTableSource?" > > > > > > >> > > > > Yes, I mean to support the AyncTableFunction beyond > the > > > > > > >> > > > LookupTableSource. > > > > > > >> > > > > > > > > > > >> > > > > The "AsyncTableFunction" is the function with ability > to > > > be > > > > > > >> executed > > > > > > >> > > > async > > > > > > >> > > > > (with AsyncWaitOperator). > > > > > > >> > > > > The async lookup join is a one of usage of this. So, > we > > > > don't > > > > > > >> have to > > > > > > >> > > > bind > > > > > > >> > > > > the AyncTableFunction with LookupTableSource. > > > > > > >> > > > > If User-defined AsyncTableFunction is supported, user > > can > > > > > > directly > > > > > > >> > use > > > > > > >> > > > > lateral table syntax to perform async operation. > > > > > > >> > > > > > > > > > > >> > > > > > "It would be better if you could elaborate the > > proposed > > > > > > changes > > > > > > >> wrt > > > > > > >> > > the > > > > > > >> > > > > CorrelatedCodeGenerator with more details" > > > > > > >> > > > > > > > > > > >> > > > > In the proposal, we use lateral table syntax to > support > > > the > > > > > > async > > > > > > >> > table > > > > > > >> > > > > function. So the planner will also treat this > statement > > > to a > > > > > > >> > > > > CommonExecCorrelate node. So the runtime code should > be > > > > > > generated > > > > > > >> in > > > > > > >> > > > > CorrelatedCodeGenerator. > > > > > > >> > > > > In CorrelatedCodeGenerator, we will know the > > > TableFunction's > > > > > > Kind > > > > > > >> of > > > > > > >> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE` > > > > > > >> > > > > For `FunctionKind.ASYNC_TABLE` we can generate a > > > > > > >> AsyncWaitOperator > > > > > > >> > to > > > > > > >> > > > > execute the async table function. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > Thanks, > > > > > > >> > > > > Aitozi. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一 > > > 03:22写道: > > > > > > >> > > > > > > > > > > >> > > > >> Hi Aitozi, > > > > > > >> > > > >> > > > > > > >> > > > >> Thanks for the clarification. The naming "Lookup" > might > > > > > suggest > > > > > > >> > using > > > > > > >> > > it > > > > > > >> > > > >> for table look up. But conceptually what the eval() > > > method > > > > > will > > > > > > >> do > > > > > > >> > is > > > > > > >> > > to > > > > > > >> > > > >> get a collection of results(Row, RowData) from the > > given > > > > > keys. > > > > > > >> How > > > > > > >> > it > > > > > > >> > > > will > > > > > > >> > > > >> be done depends on the implementation, i.e. you can > > > > implement > > > > > > >> your > > > > > > >> > own > > > > > > >> > > > >> Source[1][2]. The example in the FLIP should be able > to > > > be > > > > > > >> handled > > > > > > >> > in > > > > > > >> > > > this > > > > > > >> > > > >> way. > > > > > > >> > > > >> > > > > > > >> > > > >> Do you mean to support the AyncTableFunction beyond > the > > > > > > >> > > > LookupTableSource? > > > > > > >> > > > >> It would be better if you could elaborate the > proposed > > > > > changes > > > > > > >> wrt > > > > > > >> > the > > > > > > >> > > > >> CorrelatedCodeGenerator with more details. Thanks! > > > > > > >> > > > >> > > > > > > >> > > > >> Best regards, > > > > > > >> > > > >> Jing > > > > > > >> > > > >> > > > > > > >> > > > >> [1] > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64 > > > > > > >> > > > >> [2] > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49 > > > > > > >> > > > >> > > > > > > >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi < > > > > gjying1...@gmail.com > > > > > > > > > > > > >> > wrote: > > > > > > >> > > > >> > > > > > > >> > > > >> > Hi Jing, > > > > > > >> > > > >> > Thanks for your response. As stated in the > FLIP, > > > the > > > > > > >> purpose > > > > > > >> > of > > > > > > >> > > > this > > > > > > >> > > > >> > FLIP is meant to support > > > > > > >> > > > >> > user-defined async table function. As described in > > > flink > > > > > > >> document > > > > > > >> > > [1] > > > > > > >> > > > >> > > > > > > > >> > > > >> > Async table functions are special functions for > table > > > > > sources > > > > > > >> that > > > > > > >> > > > >> perform > > > > > > >> > > > >> > > a lookup. > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > So end user can not directly define and use async > > table > > > > > > >> function > > > > > > >> > > now. > > > > > > >> > > > An > > > > > > >> > > > >> > user case is reported in [2] > > > > > > >> > > > >> > > > > > > > >> > > > >> > So, in conclusion, no new interface is introduced, > > but > > > we > > > > > > >> extend > > > > > > >> > the > > > > > > >> > > > >> > ability to support user-defined async table > function. > > > > > > >> > > > >> > > > > > > > >> > > > >> > [1]: > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/ > > > > > > >> > > > >> > [2]: > > > > > > >> > > > > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b > > > > > > >> > > > >> > > > > > > > >> > > > >> > Thanks. > > > > > > >> > > > >> > Aitozi. > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > > >> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六 > > > > > 06:40写道: > > > > > > >> > > > >> > > > > > > > >> > > > >> > > Hi Aitozi, > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > Thanks for your proposal. I am not quite sure if > I > > > > > > understood > > > > > > >> > your > > > > > > >> > > > >> > thoughts > > > > > > >> > > > >> > > correctly. You described a special case > > > implementation > > > > of > > > > > > the > > > > > > >> > > > >> > > AsyncTableFunction with on public API changes. > > Would > > > > you > > > > > > >> please > > > > > > >> > > > >> elaborate > > > > > > >> > > > >> > > your purpose of writing a FLIP according to the > > FLIP > > > > > > >> > > > documentation[1]? > > > > > > >> > > > >> > > Thanks! > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > [1] > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > Best regards, > > > > > > >> > > > >> > > Jing > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi < > > > > > > gjying1...@gmail.com > > > > > > >> > > > > > > > >> > > > wrote: > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > May I ask for some feedback :D > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > Thanks, > > > > > > >> > > > >> > > > Aitozi > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 > > > 19:14写道: > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > Just catch an user case report from Giannis > > > Polyzos > > > > > for > > > > > > >> this > > > > > > >> > > > >> usage: > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 > > > > 17:45写道: > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > Hi guys, > > > > > > >> > > > >> > > > > > I want to bring up a discussion about > > > adding > > > > > > >> support > > > > > > >> > of > > > > > > >> > > > User > > > > > > >> > > > >> > > > > > Defined AsyncTableFunction in Flink. > > > > > > >> > > > >> > > > > > Currently, async table function are special > > > > > functions > > > > > > >> for > > > > > > >> > > > table > > > > > > >> > > > >> > > source > > > > > > >> > > > >> > > > > > to perform > > > > > > >> > > > >> > > > > > async lookup. However, it's worth to > support > > > the > > > > > user > > > > > > >> > > defined > > > > > > >> > > > >> async > > > > > > >> > > > >> > > > > > table function. > > > > > > >> > > > >> > > > > > Because, in this way, the end SQL user can > > > > leverage > > > > > > it > > > > > > >> to > > > > > > >> > > > >> perform > > > > > > >> > > > >> > the > > > > > > >> > > > >> > > > > > async operation > > > > > > >> > > > >> > > > > > which is useful to maximum the system > > > throughput > > > > > > >> > especially > > > > > > >> > > > for > > > > > > >> > > > >> IO > > > > > > >> > > > >> > > > > > bottleneck case. > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > You can find some more detail in [1]. > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > Looking forward to feedback > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > [1]: > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > Thanks, > > > > > > >> > > > >> > > > > > Aitozi. > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >