Hi Aitozi, Thanks for taking care of that part. I have no other concern.
Best regards, Jing On Mon, Jun 12, 2023 at 5:38 PM Aitozi <gjying1...@gmail.com> wrote: > BTW, If there are no other more blocking issue / comments, I would like to > start a VOTE in another thread this wednesday 6.14 > > Thanks, > Aitozi. > > Aitozi <gjying1...@gmail.com> 于2023年6月12日周一 23:34写道: > > > Hi, Jing, > > Thanks for your explanation. I get your point now. > > > > For the performance part, I think it's a good idea to run with returning > a > > big table case, the memory consumption > > should be a point to be taken care about. Because in the ordered mode, > the > > head element in buffer may affect the > > total memory consumption. > > > > > > Thanks, > > Aitozi. > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月12日周一 20:28写道: > > > >> Hi Aitozi, > >> > >> Which key will be used for lookup is not an issue, only one row will be > >> required for each key in order to enrich it. True, it depends on the > >> implementation whether multiple rows or single row for each key will be > >> returned. However, for the lookup & enrichment scenario, one row/key is > >> recommended, otherwise, like I mentioned previously, enrichment won't > >> work. > >> > >> I am a little bit concerned about returning a big table for each key, > >> since > >> it will take the async call longer to return and need more memory. The > >> performance tests should cover this scenario. This is not a blocking > issue > >> for this FLIP. > >> > >> Best regards, > >> Jing > >> > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi <gjying1...@gmail.com> wrote: > >> > >> > Hi Jing, > >> > I means the join key is not necessary to be the primary key or > >> unique > >> > index of the database. > >> > In this situation, we may queried out multi rows for one join key. I > >> think > >> > that's why the > >> > LookupFunction#lookup will return a collection of RowData. > >> > > >> > BTW, I think the behavior of lookup join will not affect the semantic > of > >> > the async udtf. > >> > We use the Async TableFunction here and the table function can collect > >> > multiple rows. > >> > > >> > Thanks, > >> > Atiozi. > >> > > >> > > >> > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月10日周六 00:15写道: > >> > > >> > > Hi Aitozi, > >> > > > >> > > The keyRow used in this case contains all keys[1]. > >> > > > >> > > Best regards, > >> > > Jing > >> > > > >> > > [1] > >> > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49 > >> > > > >> > > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi <gjying1...@gmail.com> wrote: > >> > > > >> > > > Hi Jing, > >> > > > > >> > > > The performance test is added to the FLIP. > >> > > > > >> > > > As I know, The lookup join can return multi rows, it depends > on > >> > > > whether the join key > >> > > > is the primary key of the external database or not. The `lookup` > [1] > >> > will > >> > > > return a collection of > >> > > > joined result, and each of them will be collected > >> > > > > >> > > > > >> > > > [1]: > >> > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52 > >> > > > > >> > > > > >> > > > Thanks, > >> > > > Aitozi. > >> > > > > >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 17:05写道: > >> > > > > >> > > > > Hi Aitozi, > >> > > > > > >> > > > > Thanks for the feedback. Looking forward to the performance > tests. > >> > > > > > >> > > > > Afaik, lookup returns one row for each key [1] [2]. > Conceptually, > >> the > >> > > > > lookup function is used to enrich column(s) from the dimension > >> table. > >> > > If, > >> > > > > for the given key, there will be more than one row, there will > be > >> no > >> > > way > >> > > > to > >> > > > > know which row will be used to enrich the key. > >> > > > > > >> > > > > [1] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49 > >> > > > > [2] > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196 > >> > > > > > >> > > > > Best regards, > >> > > > > Jing > >> > > > > > >> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi <gjying1...@gmail.com> > >> wrote: > >> > > > > > >> > > > > > Hi Jing > >> > > > > > Thanks for your good questions. I have updated the example > >> to > >> > the > >> > > > > FLIP. > >> > > > > > > >> > > > > > > Only one row for each lookup > >> > > > > > lookup can also return multi rows, based on the query result. > >> [1] > >> > > > > > > >> > > > > > [1]: > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56 > >> > > > > > > >> > > > > > > If we use async calls with lateral join, my gut feeling is > >> > > > > > that we might have many more async calls than lookup join. I > am > >> not > >> > > > > really > >> > > > > > sure if we will be facing potential issues in this case or > not. > >> > > > > > > >> > > > > > IMO, the work pattern is similar to the lookup function, for > >> each > >> > row > >> > > > > from > >> > > > > > the left table, > >> > > > > > it will evaluate the eval method once, so the async call > numbers > >> > will > >> > > > not > >> > > > > > change. > >> > > > > > and the maximum calls in flight is limited by the Async > >> operators > >> > > > buffer > >> > > > > > capacity > >> > > > > > which will be controlled by the option. > >> > > > > > > >> > > > > > BTW, for the naming of these option, I updated the FLIP about > >> this > >> > > you > >> > > > > can > >> > > > > > refer to > >> > > > > > the section of "ConfigOption" and "Rejected Alternatives" > >> > > > > > > >> > > > > > In the end, for the performance evaluation, I'd like to do > some > >> > tests > >> > > > and > >> > > > > > will update it to the FLIP doc > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Aitozi. > >> > > > > > > >> > > > > > > >> > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 07:23写道: > >> > > > > > > >> > > > > > > Hi Aitozi, > >> > > > > > > > >> > > > > > > Thanks for the clarification. The code example looks > >> > interesting. I > >> > > > > would > >> > > > > > > suggest adding them into the FLIP. The description with code > >> > > examples > >> > > > > > will > >> > > > > > > help readers understand the motivation and how to use it. > >> Afaiac, > >> > > it > >> > > > > is a > >> > > > > > > valid feature for Flink users. > >> > > > > > > > >> > > > > > > As we knew, lookup join is based on temporal join, i.e. FOR > >> > > > SYSTEM_TIME > >> > > > > > AS > >> > > > > > > OF which is also used in your code example. Temporal join > >> > performs > >> > > > the > >> > > > > > > lookup based on the processing time match. Only one row for > >> each > >> > > > > > > lookup(afaiu, I need to check the source code to double > >> confirm) > >> > > will > >> > > > > > > return for further enrichment. One the other hand, lateral > >> join > >> > > will > >> > > > > have > >> > > > > > > sub-queries correlated with every individual value of the > >> > reference > >> > > > > table > >> > > > > > > from the preceding part of the query and each sub query will > >> > return > >> > > > > > > multiple rows. If we use async calls with lateral join, my > gut > >> > > > feeling > >> > > > > is > >> > > > > > > that we might have many more async calls than lookup join. I > >> am > >> > not > >> > > > > > really > >> > > > > > > sure if we will be facing potential issues in this case or > >> not. > >> > > > > Possible > >> > > > > > > issues I can think of now e.g. too many PRC calls, too many > >> async > >> > > > calls > >> > > > > > > processing, the sub query will return a table which might be > >> > (too) > >> > > > big, > >> > > > > > and > >> > > > > > > might cause performance issues. I would suggest preparing > some > >> > use > >> > > > > cases > >> > > > > > > and running some performance tests to check it. These are my > >> > > concerns > >> > > > > > about > >> > > > > > > using async calls with lateral join and I'd like to share > with > >> > you, > >> > > > > happy > >> > > > > > > to discuss with you and hear different opinions, hopefully > the > >> > > > > > > discussion could help me understand it more deeply. Please > >> > correct > >> > > me > >> > > > > if > >> > > > > > I > >> > > > > > > am wrong. > >> > > > > > > > >> > > > > > > Best regards, > >> > > > > > > Jing > >> > > > > > > > >> > > > > > > > >> > > > > > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi <gjying1...@gmail.com > > > >> > > wrote: > >> > > > > > > > >> > > > > > > > Hi Mason, > >> > > > > > > > Thanks for your input. I think if we support the user > >> > defined > >> > > > > async > >> > > > > > > > table function, > >> > > > > > > > user will be able to use it to hold a batch data then > >> handle it > >> > > at > >> > > > > one > >> > > > > > > time > >> > > > > > > > in the customized function. > >> > > > > > > > > >> > > > > > > > AsyncSink is meant for the sink operator. I have not > figure > >> out > >> > > how > >> > > > > to > >> > > > > > > > integrate in this case. > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Atiozi. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四 12:40写道: > >> > > > > > > > > >> > > > > > > > > Hi Aitozi, > >> > > > > > > > > > >> > > > > > > > > I think it makes sense to make it easier for SQL users > to > >> > make > >> > > > > RPCs. > >> > > > > > Do > >> > > > > > > > you > >> > > > > > > > > think your proposal can extend to the ability to batch > >> data > >> > for > >> > > > the > >> > > > > > > RPC? > >> > > > > > > > > This is also another common strategy to increase > >> throughput. > >> > > > Also, > >> > > > > > have > >> > > > > > > > you > >> > > > > > > > > considered solving this a bit differently by leveraging > >> > Flink's > >> > > > > > > > AsyncSink? > >> > > > > > > > > > >> > > > > > > > > Best, > >> > > > > > > > > Mason > >> > > > > > > > > > >> > > > > > > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi < > >> gjying1...@gmail.com> > >> > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > One more thing for discussion: > >> > > > > > > > > > > >> > > > > > > > > > In our internal implementation, we reuse the option > >> > > > > > > > > > `table.exec.async-lookup.buffer-capacity` and > >> > > > > > > > > > `table.exec.async-lookup.timeout` to config > >> > > > > > > > > > the async udtf. Do you think we should add two extra > >> option > >> > > to > >> > > > > > > > > distinguish > >> > > > > > > > > > from the lookup option such as > >> > > > > > > > > > > >> > > > > > > > > > `table.exec.async-udtf.buffer-capacity` > >> > > > > > > > > > `table.exec.async-udtf.timeout` > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Best, > >> > > > > > > > > > Aitozi. > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道: > >> > > > > > > > > > > >> > > > > > > > > > > Hi Jing, > >> > > > > > > > > > > > >> > > > > > > > > > > > what is the difference between the RPC call or > >> > query > >> > > > you > >> > > > > > > > > mentioned > >> > > > > > > > > > > and the lookup in a very > >> > > > > > > > > > > general way > >> > > > > > > > > > > > >> > > > > > > > > > > I think the RPC call or query service is quite > >> similar to > >> > > the > >> > > > > > > lookup > >> > > > > > > > > > join. > >> > > > > > > > > > > But lookup join should work > >> > > > > > > > > > > with `LookupTableSource`. > >> > > > > > > > > > > > >> > > > > > > > > > > Let's see how we can perform an async RPC call with > >> > lookup > >> > > > > join: > >> > > > > > > > > > > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call > >> logic. > >> > > > > > > > > > > (2) Implement a `LookupTableSource` connector run > with > >> > the > >> > > > > async > >> > > > > > > udtf > >> > > > > > > > > > > defined in (1). > >> > > > > > > > > > > (3) Then define a DDL of this look up table in SQL > >> > > > > > > > > > > > >> > > > > > > > > > > CREATE TEMPORARY TABLE Customers ( > >> > > > > > > > > > > id INT, > >> > > > > > > > > > > name STRING, > >> > > > > > > > > > > country STRING, > >> > > > > > > > > > > zip STRING > >> > > > > > > > > > > ) WITH ( > >> > > > > > > > > > > 'connector' = 'custom' > >> > > > > > > > > > > ); > >> > > > > > > > > > > > >> > > > > > > > > > > (4) Run with the query as below: > >> > > > > > > > > > > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip > >> > > > > > > > > > > FROM Orders AS o > >> > > > > > > > > > > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time > AS > >> c > >> > > > > > > > > > > ON o.customer_id = c.id; > >> > > > > > > > > > > > >> > > > > > > > > > > This example is from doc > >> > > > > > > > > > > < > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join > >> > > > > > > > > > >.You > >> > > > > > > > > > > can image the look up process as an async RPC call > >> > process. > >> > > > > > > > > > > > >> > > > > > > > > > > Let's see how we can perform an async RPC call with > >> > lateral > >> > > > > join: > >> > > > > > > > > > > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call > >> logic. > >> > > > > > > > > > > (2) Run query with > >> > > > > > > > > > > > >> > > > > > > > > > > Create function f1 as '...' ; > >> > > > > > > > > > > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip FROM > >> Orders > >> > > > > lateral > >> > > > > > > > table > >> > > > > > > > > > > (f1(order_id)) as T(...); > >> > > > > > > > > > > > >> > > > > > > > > > > As you can see, the lateral join version is more > >> simple > >> > and > >> > > > > > > intuitive > >> > > > > > > > > to > >> > > > > > > > > > > users. Users do not have to wrap a > >> > > > > > > > > > > LookupTableSource for the purpose of using async > udtf. > >> > > > > > > > > > > > >> > > > > > > > > > > In the end, We can also see the user defined async > >> table > >> > > > > function > >> > > > > > > is > >> > > > > > > > an > >> > > > > > > > > > > enhancement of the current lateral table join > >> > > > > > > > > > > which only supports sync lateral join now. > >> > > > > > > > > > > > >> > > > > > > > > > > Best, > >> > > > > > > > > > > Aitozi. > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 > >> > 19:37写道: > >> > > > > > > > > > > > >> > > > > > > > > > >> Hi Aitozi, > >> > > > > > > > > > >> > >> > > > > > > > > > >> Thanks for the update. Just out of curiosity, what > is > >> > the > >> > > > > > > difference > >> > > > > > > > > > >> between the RPC call or query you mentioned and the > >> > lookup > >> > > > in > >> > > > > a > >> > > > > > > very > >> > > > > > > > > > >> general way? Since Lateral join is used in the > FLIP. > >> Is > >> > > > there > >> > > > > > any > >> > > > > > > > > > special > >> > > > > > > > > > >> thought for that? Sorry for asking so many > questions. > >> > The > >> > > > FLIP > >> > > > > > > > > contains > >> > > > > > > > > > >> limited information to understand the motivation. > >> > > > > > > > > > >> > >> > > > > > > > > > >> Best regards, > >> > > > > > > > > > >> Jing > >> > > > > > > > > > >> > >> > > > > > > > > > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi < > >> > > gjying1...@gmail.com > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > > > >> > >> > > > > > > > > > >> > Hi Jing, > >> > > > > > > > > > >> > I have updated the proposed changes to the > >> FLIP. > >> > > IMO, > >> > > > > > lookup > >> > > > > > > > has > >> > > > > > > > > > its > >> > > > > > > > > > >> > clear > >> > > > > > > > > > >> > async call requirement is due to its IO heavy > >> > operator. > >> > > In > >> > > > > our > >> > > > > > > > > usage, > >> > > > > > > > > > >> sql > >> > > > > > > > > > >> > users have > >> > > > > > > > > > >> > logic to do some RPC call or query the > third-party > >> > > service > >> > > > > > which > >> > > > > > > > is > >> > > > > > > > > > >> also IO > >> > > > > > > > > > >> > intensive. > >> > > > > > > > > > >> > In these case, we'd like to leverage the async > >> > function > >> > > to > >> > > > > > > improve > >> > > > > > > > > the > >> > > > > > > > > > >> > throughput. > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > Thanks, > >> > > > > > > > > > >> > Aitozi. > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > Jing Ge <j...@ververica.com.invalid> > 于2023年6月1日周四 > >> > > > 22:55写道: > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > Hi Aitozi, > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > Sorry for the late reply. Would you like to > >> update > >> > the > >> > > > > > > proposed > >> > > > > > > > > > >> changes > >> > > > > > > > > > >> > > with more details into the FLIP too? > >> > > > > > > > > > >> > > I got your point. It looks like a rational > idea. > >> > > > However, > >> > > > > > > since > >> > > > > > > > > > lookup > >> > > > > > > > > > >> > has > >> > > > > > > > > > >> > > its clear async call requirement, are there any > >> real > >> > > use > >> > > > > > cases > >> > > > > > > > > that > >> > > > > > > > > > >> > > need this change? This will help us understand > >> the > >> > > > > > motivation. > >> > > > > > > > > After > >> > > > > > > > > > >> all, > >> > > > > > > > > > >> > > lateral join and temporal lookup join[1] are > >> quite > >> > > > > > different. > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > Best regards, > >> > > > > > > > > > >> > > Jing > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > [1] > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54 > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi < > >> > > > > > gjying1...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > Hi Jing, > >> > > > > > > > > > >> > > > What do you think about it? Can we move > >> > forward > >> > > > this > >> > > > > > > > > feature? > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > Thanks, > >> > > > > > > > > > >> > > > Aitozi. > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 > >> > > 09:56写道: > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > Hi Jing, > >> > > > > > > > > > >> > > > > > "Do you mean to support the > >> > > AyncTableFunction > >> > > > > > beyond > >> > > > > > > > the > >> > > > > > > > > > >> > > > > LookupTableSource?" > >> > > > > > > > > > >> > > > > Yes, I mean to support the > AyncTableFunction > >> > > beyond > >> > > > > the > >> > > > > > > > > > >> > > > LookupTableSource. > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > The "AsyncTableFunction" is the function > with > >> > > > ability > >> > > > > to > >> > > > > > > be > >> > > > > > > > > > >> executed > >> > > > > > > > > > >> > > > async > >> > > > > > > > > > >> > > > > (with AsyncWaitOperator). > >> > > > > > > > > > >> > > > > The async lookup join is a one of usage of > >> this. > >> > > So, > >> > > > > we > >> > > > > > > > don't > >> > > > > > > > > > >> have to > >> > > > > > > > > > >> > > > bind > >> > > > > > > > > > >> > > > > the AyncTableFunction with > LookupTableSource. > >> > > > > > > > > > >> > > > > If User-defined AsyncTableFunction is > >> supported, > >> > > > user > >> > > > > > can > >> > > > > > > > > > directly > >> > > > > > > > > > >> > use > >> > > > > > > > > > >> > > > > lateral table syntax to perform async > >> operation. > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > "It would be better if you could > elaborate > >> the > >> > > > > > proposed > >> > > > > > > > > > changes > >> > > > > > > > > > >> wrt > >> > > > > > > > > > >> > > the > >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator with more details" > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > In the proposal, we use lateral table > syntax > >> to > >> > > > > support > >> > > > > > > the > >> > > > > > > > > > async > >> > > > > > > > > > >> > table > >> > > > > > > > > > >> > > > > function. So the planner will also treat > this > >> > > > > statement > >> > > > > > > to a > >> > > > > > > > > > >> > > > > CommonExecCorrelate node. So the runtime > code > >> > > should > >> > > > > be > >> > > > > > > > > > generated > >> > > > > > > > > > >> in > >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator. > >> > > > > > > > > > >> > > > > In CorrelatedCodeGenerator, we will know > the > >> > > > > > > TableFunction's > >> > > > > > > > > > Kind > >> > > > > > > > > > >> of > >> > > > > > > > > > >> > > > > `FunctionKind.Table` or > >> > `FunctionKind.ASYNC_TABLE` > >> > > > > > > > > > >> > > > > For `FunctionKind.ASYNC_TABLE` we can > >> generate > >> > a > >> > > > > > > > > > >> AsyncWaitOperator > >> > > > > > > > > > >> > to > >> > > > > > > > > > >> > > > > execute the async table function. > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > Thanks, > >> > > > > > > > > > >> > > > > Aitozi. > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > Jing Ge <j...@ververica.com.invalid> > >> > > 于2023年5月29日周一 > >> > > > > > > 03:22写道: > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > >> Hi Aitozi, > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> Thanks for the clarification. The naming > >> > "Lookup" > >> > > > > might > >> > > > > > > > > suggest > >> > > > > > > > > > >> > using > >> > > > > > > > > > >> > > it > >> > > > > > > > > > >> > > > >> for table look up. But conceptually what > the > >> > > eval() > >> > > > > > > method > >> > > > > > > > > will > >> > > > > > > > > > >> do > >> > > > > > > > > > >> > is > >> > > > > > > > > > >> > > to > >> > > > > > > > > > >> > > > >> get a collection of results(Row, RowData) > >> from > >> > > the > >> > > > > > given > >> > > > > > > > > keys. > >> > > > > > > > > > >> How > >> > > > > > > > > > >> > it > >> > > > > > > > > > >> > > > will > >> > > > > > > > > > >> > > > >> be done depends on the implementation, > i.e. > >> you > >> > > can > >> > > > > > > > implement > >> > > > > > > > > > >> your > >> > > > > > > > > > >> > own > >> > > > > > > > > > >> > > > >> Source[1][2]. The example in the FLIP > >> should be > >> > > > able > >> > > > > to > >> > > > > > > be > >> > > > > > > > > > >> handled > >> > > > > > > > > > >> > in > >> > > > > > > > > > >> > > > this > >> > > > > > > > > > >> > > > >> way. > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> Do you mean to support the > AyncTableFunction > >> > > beyond > >> > > > > the > >> > > > > > > > > > >> > > > LookupTableSource? > >> > > > > > > > > > >> > > > >> It would be better if you could elaborate > >> the > >> > > > > proposed > >> > > > > > > > > changes > >> > > > > > > > > > >> wrt > >> > > > > > > > > > >> > the > >> > > > > > > > > > >> > > > >> CorrelatedCodeGenerator with more details. > >> > > Thanks! > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> Best regards, > >> > > > > > > > > > >> > > > >> Jing > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> [1] > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64 > >> > > > > > > > > > >> > > > >> [2] > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49 > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi < > >> > > > > > > > gjying1...@gmail.com > >> > > > > > > > > > > >> > > > > > > > > > >> > wrote: > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > Hi Jing, > >> > > > > > > > > > >> > > > >> > Thanks for your response. As stated > in > >> > the > >> > > > > FLIP, > >> > > > > > > the > >> > > > > > > > > > >> purpose > >> > > > > > > > > > >> > of > >> > > > > > > > > > >> > > > this > >> > > > > > > > > > >> > > > >> > FLIP is meant to support > >> > > > > > > > > > >> > > > >> > user-defined async table function. As > >> > described > >> > > > in > >> > > > > > > flink > >> > > > > > > > > > >> document > >> > > > > > > > > > >> > > [1] > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > Async table functions are special > >> functions > >> > for > >> > > > > table > >> > > > > > > > > sources > >> > > > > > > > > > >> that > >> > > > > > > > > > >> > > > >> perform > >> > > > > > > > > > >> > > > >> > > a lookup. > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > So end user can not directly define and > >> use > >> > > async > >> > > > > > table > >> > > > > > > > > > >> function > >> > > > > > > > > > >> > > now. > >> > > > > > > > > > >> > > > An > >> > > > > > > > > > >> > > > >> > user case is reported in [2] > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > So, in conclusion, no new interface is > >> > > > introduced, > >> > > > > > but > >> > > > > > > we > >> > > > > > > > > > >> extend > >> > > > > > > > > > >> > the > >> > > > > > > > > > >> > > > >> > ability to support user-defined async > >> table > >> > > > > function. > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > [1]: > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/ > >> > > > > > > > > > >> > > > >> > [2]: > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > Thanks. > >> > > > > > > > > > >> > > > >> > Aitozi. > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > Jing Ge <j...@ververica.com.invalid> > >> > > > 于2023年5月27日周六 > >> > > > > > > > > 06:40写道: > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > Hi Aitozi, > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > Thanks for your proposal. I am not > quite > >> > sure > >> > > > if > >> > > > > I > >> > > > > > > > > > understood > >> > > > > > > > > > >> > your > >> > > > > > > > > > >> > > > >> > thoughts > >> > > > > > > > > > >> > > > >> > > correctly. You described a special > case > >> > > > > > > implementation > >> > > > > > > > of > >> > > > > > > > > > the > >> > > > > > > > > > >> > > > >> > > AsyncTableFunction with on public API > >> > > changes. > >> > > > > > Would > >> > > > > > > > you > >> > > > > > > > > > >> please > >> > > > > > > > > > >> > > > >> elaborate > >> > > > > > > > > > >> > > > >> > > your purpose of writing a FLIP > >> according to > >> > > the > >> > > > > > FLIP > >> > > > > > > > > > >> > > > documentation[1]? > >> > > > > > > > > > >> > > > >> > > Thanks! > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > [1] > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > Best regards, > >> > > > > > > > > > >> > > > >> > > Jing > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM > Aitozi < > >> > > > > > > > > > gjying1...@gmail.com > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > > wrote: > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > > May I ask for some feedback :D > >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > Thanks, > >> > > > > > > > > > >> > > > >> > > > Aitozi > >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com> > >> > > 于2023年5月23日周二 > >> > > > > > > 19:14写道: > >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > Just catch an user case report > from > >> > > Giannis > >> > > > > > > Polyzos > >> > > > > > > > > for > >> > > > > > > > > > >> this > >> > > > > > > > > > >> > > > >> usage: > >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b > >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> > >> > > > 于2023年5月23日周二 > >> > > > > > > > 17:45写道: > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > Hi guys, > >> > > > > > > > > > >> > > > >> > > > > > I want to bring up a > >> discussion > >> > > about > >> > > > > > > adding > >> > > > > > > > > > >> support > >> > > > > > > > > > >> > of > >> > > > > > > > > > >> > > > User > >> > > > > > > > > > >> > > > >> > > > > > Defined AsyncTableFunction in > >> Flink. > >> > > > > > > > > > >> > > > >> > > > > > Currently, async table function > >> are > >> > > > special > >> > > > > > > > > functions > >> > > > > > > > > > >> for > >> > > > > > > > > > >> > > > table > >> > > > > > > > > > >> > > > >> > > source > >> > > > > > > > > > >> > > > >> > > > > > to perform > >> > > > > > > > > > >> > > > >> > > > > > async lookup. However, it's > worth > >> to > >> > > > > support > >> > > > > > > the > >> > > > > > > > > user > >> > > > > > > > > > >> > > defined > >> > > > > > > > > > >> > > > >> async > >> > > > > > > > > > >> > > > >> > > > > > table function. > >> > > > > > > > > > >> > > > >> > > > > > Because, in this way, the end > SQL > >> > user > >> > > > can > >> > > > > > > > leverage > >> > > > > > > > > > it > >> > > > > > > > > > >> to > >> > > > > > > > > > >> > > > >> perform > >> > > > > > > > > > >> > > > >> > the > >> > > > > > > > > > >> > > > >> > > > > > async operation > >> > > > > > > > > > >> > > > >> > > > > > which is useful to maximum the > >> system > >> > > > > > > throughput > >> > > > > > > > > > >> > especially > >> > > > > > > > > > >> > > > for > >> > > > > > > > > > >> > > > >> IO > >> > > > > > > > > > >> > > > >> > > > > > bottleneck case. > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > You can find some more detail in > >> [1]. > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > Looking forward to feedbackhttps://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > Thanks, > >> > > > > > > > > > >> > > > >> > > > > > Aitozi. > >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >