Hi Feng,
    Thanks for your question. We do not provide a way to switch the UDTF
between sync and async way,
So there should be no thread safety problem here.

Best,
Aitozi

Feng Jin <jinfeng1...@gmail.com> 于2023年6月13日周二 10:31写道:

> Hi Aitozi, We do need to remind users about thread safety issues. Thank you
> for your efforts on this FLIP. I have no further questions.
> Best, Feng
>
>
> On Tue, Jun 13, 2023 at 6:05 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
>
> > Hi Aitozi,
> >
> > Thanks for taking care of that part. I have no other concern.
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Jun 12, 2023 at 5:38 PM Aitozi <gjying1...@gmail.com> wrote:
> >
> > > BTW, If there are no other more blocking issue / comments, I would like
> > to
> > > start a VOTE in another thread this wednesday 6.14
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > > Aitozi <gjying1...@gmail.com> 于2023年6月12日周一 23:34写道:
> > >
> > > > Hi, Jing,
> > > >     Thanks for your explanation. I get your point now.
> > > >
> > > > For the performance part, I think it's a good idea to run with
> > returning
> > > a
> > > > big table case, the memory consumption
> > > > should be a point to be taken care about. Because in the ordered
> mode,
> > > the
> > > > head element in buffer may affect the
> > > > total memory consumption.
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > >
> > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月12日周一 20:28写道:
> > > >
> > > >> Hi Aitozi,
> > > >>
> > > >> Which key will be used for lookup is not an issue, only one row will
> > be
> > > >> required for each key in order to enrich it. True, it depends on the
> > > >> implementation whether multiple rows or single row for each key will
> > be
> > > >> returned. However, for the lookup & enrichment scenario, one row/key
> > is
> > > >> recommended, otherwise, like I mentioned previously, enrichment
> won't
> > > >> work.
> > > >>
> > > >> I am a little bit concerned about returning a big table for each
> key,
> > > >> since
> > > >> it will take the async call longer to return and need more memory.
> The
> > > >> performance tests should cover this scenario. This is not a blocking
> > > issue
> > > >> for this FLIP.
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi <gjying1...@gmail.com>
> wrote:
> > > >>
> > > >> > Hi Jing,
> > > >> >     I means the join key is not necessary to be the primary key or
> > > >> unique
> > > >> > index of the database.
> > > >> > In this situation, we may queried out multi rows for one join
> key. I
> > > >> think
> > > >> > that's why the
> > > >> > LookupFunction#lookup will return a collection of RowData.
> > > >> >
> > > >> > BTW, I think the behavior of lookup join will not affect the
> > semantic
> > > of
> > > >> > the async udtf.
> > > >> > We use the Async TableFunction here and the table function can
> > collect
> > > >> > multiple rows.
> > > >> >
> > > >> > Thanks,
> > > >> > Atiozi.
> > > >> >
> > > >> >
> > > >> >
> > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月10日周六 00:15写道:
> > > >> >
> > > >> > > Hi Aitozi,
> > > >> > >
> > > >> > > The keyRow used in this case contains all keys[1].
> > > >> > >
> > > >> > > Best regards,
> > > >> > > Jing
> > > >> > >
> > > >> > > [1]
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > >> > >
> > > >> > >
> > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi <gjying1...@gmail.com>
> > wrote:
> > > >> > >
> > > >> > > > Hi Jing,
> > > >> > > >
> > > >> > > >      The performance test is added to the FLIP.
> > > >> > > >
> > > >> > > >      As I know, The lookup join can return multi rows, it
> > depends
> > > on
> > > >> > > > whether  the join key
> > > >> > > > is the primary key of the external database or not. The
> `lookup`
> > > [1]
> > > >> > will
> > > >> > > > return a collection of
> > > >> > > > joined result, and each of them will be collected
> > > >> > > >
> > > >> > > >
> > > >> > > > [1]:
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > > >> > > >
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Aitozi.
> > > >> > > >
> > > >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 17:05写道:
> > > >> > > >
> > > >> > > > > Hi Aitozi,
> > > >> > > > >
> > > >> > > > > Thanks for the feedback. Looking forward to the performance
> > > tests.
> > > >> > > > >
> > > >> > > > > Afaik, lookup returns one row for each key [1] [2].
> > > Conceptually,
> > > >> the
> > > >> > > > > lookup function is used to enrich column(s) from the
> dimension
> > > >> table.
> > > >> > > If,
> > > >> > > > > for the given key, there will be more than one row, there
> will
> > > be
> > > >> no
> > > >> > > way
> > > >> > > > to
> > > >> > > > > know which row will be used to enrich the key.
> > > >> > > > >
> > > >> > > > > [1]
> > > >> > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > >> > > > > [2]
> > > >> > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > > >> > > > >
> > > >> > > > > Best regards,
> > > >> > > > > Jing
> > > >> > > > >
> > > >> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi <gjying1...@gmail.com
> >
> > > >> wrote:
> > > >> > > > >
> > > >> > > > > > Hi Jing
> > > >> > > > > >     Thanks for your good questions. I have updated the
> > example
> > > >> to
> > > >> > the
> > > >> > > > > FLIP.
> > > >> > > > > >
> > > >> > > > > > > Only one row for each lookup
> > > >> > > > > > lookup can also return multi rows, based on the query
> > result.
> > > >> [1]
> > > >> > > > > >
> > > >> > > > > > [1]:
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > > >> > > > > >
> > > >> > > > > > > If we use async calls with lateral join, my gut feeling
> is
> > > >> > > > > > that we might have many more async calls than lookup
> join. I
> > > am
> > > >> not
> > > >> > > > > really
> > > >> > > > > > sure if we will be facing potential issues in this case or
> > > not.
> > > >> > > > > >
> > > >> > > > > > IMO, the work pattern is similar to the lookup function,
> for
> > > >> each
> > > >> > row
> > > >> > > > > from
> > > >> > > > > > the left table,
> > > >> > > > > > it will evaluate the eval method once, so the async call
> > > numbers
> > > >> > will
> > > >> > > > not
> > > >> > > > > > change.
> > > >> > > > > > and the maximum calls in flight is limited by the Async
> > > >> operators
> > > >> > > > buffer
> > > >> > > > > > capacity
> > > >> > > > > > which will be controlled by the option.
> > > >> > > > > >
> > > >> > > > > > BTW, for the naming of these option, I updated the FLIP
> > about
> > > >> this
> > > >> > > you
> > > >> > > > > can
> > > >> > > > > > refer to
> > > >> > > > > > the section of "ConfigOption" and "Rejected Alternatives"
> > > >> > > > > >
> > > >> > > > > > In the end, for the performance evaluation, I'd like to do
> > > some
> > > >> > tests
> > > >> > > > and
> > > >> > > > > > will update it to the FLIP doc
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Aitozi.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五
> 07:23写道:
> > > >> > > > > >
> > > >> > > > > > > Hi Aitozi,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for the clarification. The code example looks
> > > >> > interesting. I
> > > >> > > > > would
> > > >> > > > > > > suggest adding them into the FLIP. The description with
> > code
> > > >> > > examples
> > > >> > > > > > will
> > > >> > > > > > > help readers understand the motivation and how to use
> it.
> > > >> Afaiac,
> > > >> > > it
> > > >> > > > > is a
> > > >> > > > > > > valid feature for Flink users.
> > > >> > > > > > >
> > > >> > > > > > > As we knew, lookup join is based on temporal join, i.e.
> > FOR
> > > >> > > > SYSTEM_TIME
> > > >> > > > > > AS
> > > >> > > > > > > OF which is also used in your code example. Temporal
> join
> > > >> > performs
> > > >> > > > the
> > > >> > > > > > > lookup based on the processing time match. Only one row
> > for
> > > >> each
> > > >> > > > > > > lookup(afaiu, I need to check the source code to double
> > > >> confirm)
> > > >> > > will
> > > >> > > > > > > return for further enrichment. One the other hand,
> lateral
> > > >> join
> > > >> > > will
> > > >> > > > > have
> > > >> > > > > > > sub-queries correlated with every individual value of
> the
> > > >> > reference
> > > >> > > > > table
> > > >> > > > > > > from the preceding part of the query and each sub query
> > will
> > > >> > return
> > > >> > > > > > > multiple rows. If we use async calls with lateral join,
> my
> > > gut
> > > >> > > > feeling
> > > >> > > > > is
> > > >> > > > > > > that we might have many more async calls than lookup
> > join. I
> > > >> am
> > > >> > not
> > > >> > > > > > really
> > > >> > > > > > > sure if we will be facing potential issues in this case
> or
> > > >> not.
> > > >> > > > > Possible
> > > >> > > > > > > issues I can think of now e.g. too many PRC calls, too
> > many
> > > >> async
> > > >> > > > calls
> > > >> > > > > > > processing, the sub query will return a table which
> might
> > be
> > > >> > (too)
> > > >> > > > big,
> > > >> > > > > > and
> > > >> > > > > > > might cause performance issues. I would suggest
> preparing
> > > some
> > > >> > use
> > > >> > > > > cases
> > > >> > > > > > > and running some performance tests to check it. These
> are
> > my
> > > >> > > concerns
> > > >> > > > > > about
> > > >> > > > > > > using async calls with lateral join and I'd like to
> share
> > > with
> > > >> > you,
> > > >> > > > > happy
> > > >> > > > > > > to discuss with you and hear different opinions,
> hopefully
> > > the
> > > >> > > > > > > discussion could help me understand it more deeply.
> Please
> > > >> > correct
> > > >> > > me
> > > >> > > > > if
> > > >> > > > > > I
> > > >> > > > > > > am wrong.
> > > >> > > > > > >
> > > >> > > > > > > Best regards,
> > > >> > > > > > > Jing
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi <
> > gjying1...@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hi Mason,
> > > >> > > > > > > >     Thanks for your input. I think if we support the
> > user
> > > >> > defined
> > > >> > > > > async
> > > >> > > > > > > > table function,
> > > >> > > > > > > > user will be able to use it to hold a batch data then
> > > >> handle it
> > > >> > > at
> > > >> > > > > one
> > > >> > > > > > > time
> > > >> > > > > > > > in the customized function.
> > > >> > > > > > > >
> > > >> > > > > > > > AsyncSink is meant for the sink operator. I have not
> > > figure
> > > >> out
> > > >> > > how
> > > >> > > > > to
> > > >> > > > > > > > integrate in this case.
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks,
> > > >> > > > > > > > Atiozi.
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四
> > 12:40写道:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Aitozi,
> > > >> > > > > > > > >
> > > >> > > > > > > > > I think it makes sense to make it easier for SQL
> users
> > > to
> > > >> > make
> > > >> > > > > RPCs.
> > > >> > > > > > Do
> > > >> > > > > > > > you
> > > >> > > > > > > > > think your proposal can extend to the ability to
> batch
> > > >> data
> > > >> > for
> > > >> > > > the
> > > >> > > > > > > RPC?
> > > >> > > > > > > > > This is also another common strategy to increase
> > > >> throughput.
> > > >> > > > Also,
> > > >> > > > > > have
> > > >> > > > > > > > you
> > > >> > > > > > > > > considered solving this a bit differently by
> > leveraging
> > > >> > Flink's
> > > >> > > > > > > > AsyncSink?
> > > >> > > > > > > > >
> > > >> > > > > > > > > Best,
> > > >> > > > > > > > > Mason
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi <
> > > >> gjying1...@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > One more thing for discussion:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > In our internal implementation, we reuse the
> option
> > > >> > > > > > > > > > `table.exec.async-lookup.buffer-capacity` and
> > > >> > > > > > > > > > `table.exec.async-lookup.timeout` to config
> > > >> > > > > > > > > > the async udtf. Do you think we should add two
> extra
> > > >> option
> > > >> > > to
> > > >> > > > > > > > > distinguish
> > > >> > > > > > > > > > from the lookup option such as
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > `table.exec.async-udtf.buffer-capacity`
> > > >> > > > > > > > > > `table.exec.async-udtf.timeout`
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Best,
> > > >> > > > > > > > > > Aitozi.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一
> 12:20写道:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Hi Jing,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > >     > what is the difference between the RPC
> call
> > or
> > > >> > query
> > > >> > > > you
> > > >> > > > > > > > > mentioned
> > > >> > > > > > > > > > > and the lookup in a very
> > > >> > > > > > > > > > > general way
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > I think the RPC call or query service is quite
> > > >> similar to
> > > >> > > the
> > > >> > > > > > > lookup
> > > >> > > > > > > > > > join.
> > > >> > > > > > > > > > > But lookup join should work
> > > >> > > > > > > > > > > with `LookupTableSource`.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Let's see how we can perform an async RPC call
> > with
> > > >> > lookup
> > > >> > > > > join:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC
> call
> > > >> logic.
> > > >> > > > > > > > > > > (2) Implement a `LookupTableSource` connector
> run
> > > with
> > > >> > the
> > > >> > > > > async
> > > >> > > > > > > udtf
> > > >> > > > > > > > > > > defined in (1).
> > > >> > > > > > > > > > > (3) Then define a DDL of this look up table in
> SQL
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > CREATE TEMPORARY TABLE Customers (
> > > >> > > > > > > > > > >   id INT,
> > > >> > > > > > > > > > >   name STRING,
> > > >> > > > > > > > > > >   country STRING,
> > > >> > > > > > > > > > >   zip STRING
> > > >> > > > > > > > > > > ) WITH (
> > > >> > > > > > > > > > >   'connector' = 'custom'
> > > >> > > > > > > > > > > );
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > (4) Run with the query as below:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip
> > > >> > > > > > > > > > > FROM Orders AS o
> > > >> > > > > > > > > > >   JOIN Customers FOR SYSTEM_TIME AS OF
> o.proc_time
> > > AS
> > > >> c
> > > >> > > > > > > > > > >     ON o.customer_id = c.id;
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > This example is from doc
> > > >> > > > > > > > > > > <
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> > > >> > > > > > > > > > >.You
> > > >> > > > > > > > > > > can image the look up process as an async RPC
> call
> > > >> > process.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Let's see how we can perform an async RPC call
> > with
> > > >> > lateral
> > > >> > > > > join:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC
> call
> > > >> logic.
> > > >> > > > > > > > > > > (2) Run query with
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Create function f1 as '...' ;
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip
> FROM
> > > >> Orders
> > > >> > > > > lateral
> > > >> > > > > > > > table
> > > >> > > > > > > > > > > (f1(order_id)) as T(...);
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > As you can see, the lateral join version is more
> > > >> simple
> > > >> > and
> > > >> > > > > > > intuitive
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > users. Users do not have to wrap a
> > > >> > > > > > > > > > > LookupTableSource for the purpose of using async
> > > udtf.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > In the end, We can also see the user defined
> async
> > > >> table
> > > >> > > > > function
> > > >> > > > > > > is
> > > >> > > > > > > > an
> > > >> > > > > > > > > > > enhancement of the current lateral table join
> > > >> > > > > > > > > > > which only supports sync lateral join now.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Best,
> > > >> > > > > > > > > > > Aitozi.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Jing Ge <j...@ververica.com.invalid>
> 于2023年6月2日周五
> > > >> > 19:37写道:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > >> Hi Aitozi,
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > > >> Thanks for the update. Just out of curiosity,
> > what
> > > is
> > > >> > the
> > > >> > > > > > > difference
> > > >> > > > > > > > > > >> between the RPC call or query you mentioned and
> > the
> > > >> > lookup
> > > >> > > > in
> > > >> > > > > a
> > > >> > > > > > > very
> > > >> > > > > > > > > > >> general way? Since Lateral join is used in the
> > > FLIP.
> > > >> Is
> > > >> > > > there
> > > >> > > > > > any
> > > >> > > > > > > > > > special
> > > >> > > > > > > > > > >> thought for that? Sorry for asking so many
> > > questions.
> > > >> > The
> > > >> > > > FLIP
> > > >> > > > > > > > > contains
> > > >> > > > > > > > > > >> limited information to understand the
> motivation.
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > > >> Best regards,
> > > >> > > > > > > > > > >> Jing
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <
> > > >> > > gjying1...@gmail.com
> > > >> > > > >
> > > >> > > > > > > wrote:
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > > >> > Hi Jing,
> > > >> > > > > > > > > > >> >     I have updated the proposed changes to
> the
> > > >> FLIP.
> > > >> > > IMO,
> > > >> > > > > > lookup
> > > >> > > > > > > > has
> > > >> > > > > > > > > > its
> > > >> > > > > > > > > > >> > clear
> > > >> > > > > > > > > > >> > async call requirement is due to its IO heavy
> > > >> > operator.
> > > >> > > In
> > > >> > > > > our
> > > >> > > > > > > > > usage,
> > > >> > > > > > > > > > >> sql
> > > >> > > > > > > > > > >> > users have
> > > >> > > > > > > > > > >> > logic to do some RPC call or query the
> > > third-party
> > > >> > > service
> > > >> > > > > > which
> > > >> > > > > > > > is
> > > >> > > > > > > > > > >> also IO
> > > >> > > > > > > > > > >> > intensive.
> > > >> > > > > > > > > > >> > In these case, we'd like to leverage the
> async
> > > >> > function
> > > >> > > to
> > > >> > > > > > > improve
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > >> > throughput.
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >> > Thanks,
> > > >> > > > > > > > > > >> > Aitozi.
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >> > Jing Ge <j...@ververica.com.invalid>
> > > 于2023年6月1日周四
> > > >> > > > 22:55写道:
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >> > > Hi Aitozi,
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > > Sorry for the late reply. Would you like to
> > > >> update
> > > >> > the
> > > >> > > > > > > proposed
> > > >> > > > > > > > > > >> changes
> > > >> > > > > > > > > > >> > > with more details into the FLIP too?
> > > >> > > > > > > > > > >> > > I got your point. It looks like a rational
> > > idea.
> > > >> > > > However,
> > > >> > > > > > > since
> > > >> > > > > > > > > > lookup
> > > >> > > > > > > > > > >> > has
> > > >> > > > > > > > > > >> > > its clear async call requirement, are there
> > any
> > > >> real
> > > >> > > use
> > > >> > > > > > cases
> > > >> > > > > > > > > that
> > > >> > > > > > > > > > >> > > need this change? This will help us
> > understand
> > > >> the
> > > >> > > > > > motivation.
> > > >> > > > > > > > > After
> > > >> > > > > > > > > > >> all,
> > > >> > > > > > > > > > >> > > lateral join and temporal lookup join[1]
> are
> > > >> quite
> > > >> > > > > > different.
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > > Best regards,
> > > >> > > > > > > > > > >> > > Jing
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > > [1]
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <
> > > >> > > > > > gjying1...@gmail.com>
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> > > > Hi Jing,
> > > >> > > > > > > > > > >> > > >     What do you think about it? Can we
> move
> > > >> > forward
> > > >> > > > this
> > > >> > > > > > > > > feature?
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > > > Thanks,
> > > >> > > > > > > > > > >> > > > Aitozi.
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > > > Aitozi <gjying1...@gmail.com>
> > 于2023年5月29日周一
> > > >> > > 09:56写道:
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > > > > Hi Jing,
> > > >> > > > > > > > > > >> > > > >     > "Do you mean to support the
> > > >> > > AyncTableFunction
> > > >> > > > > > beyond
> > > >> > > > > > > > the
> > > >> > > > > > > > > > >> > > > > LookupTableSource?"
> > > >> > > > > > > > > > >> > > > > Yes, I mean to support the
> > > AyncTableFunction
> > > >> > > beyond
> > > >> > > > > the
> > > >> > > > > > > > > > >> > > > LookupTableSource.
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > > The "AsyncTableFunction" is the
> function
> > > with
> > > >> > > > ability
> > > >> > > > > to
> > > >> > > > > > > be
> > > >> > > > > > > > > > >> executed
> > > >> > > > > > > > > > >> > > > async
> > > >> > > > > > > > > > >> > > > > (with AsyncWaitOperator).
> > > >> > > > > > > > > > >> > > > > The async lookup join is a one of usage
> > of
> > > >> this.
> > > >> > > So,
> > > >> > > > > we
> > > >> > > > > > > > don't
> > > >> > > > > > > > > > >> have to
> > > >> > > > > > > > > > >> > > > bind
> > > >> > > > > > > > > > >> > > > > the AyncTableFunction with
> > > LookupTableSource.
> > > >> > > > > > > > > > >> > > > > If User-defined AsyncTableFunction is
> > > >> supported,
> > > >> > > > user
> > > >> > > > > > can
> > > >> > > > > > > > > > directly
> > > >> > > > > > > > > > >> > use
> > > >> > > > > > > > > > >> > > > > lateral table syntax to perform async
> > > >> operation.
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > > > "It would be better if you could
> > > elaborate
> > > >> the
> > > >> > > > > > proposed
> > > >> > > > > > > > > > changes
> > > >> > > > > > > > > > >> wrt
> > > >> > > > > > > > > > >> > > the
> > > >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator with more
> > details"
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > > In the proposal, we use lateral table
> > > syntax
> > > >> to
> > > >> > > > > support
> > > >> > > > > > > the
> > > >> > > > > > > > > > async
> > > >> > > > > > > > > > >> > table
> > > >> > > > > > > > > > >> > > > > function. So the planner will also
> treat
> > > this
> > > >> > > > > statement
> > > >> > > > > > > to a
> > > >> > > > > > > > > > >> > > > > CommonExecCorrelate node. So the
> runtime
> > > code
> > > >> > > should
> > > >> > > > > be
> > > >> > > > > > > > > > generated
> > > >> > > > > > > > > > >> in
> > > >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator.
> > > >> > > > > > > > > > >> > > > > In CorrelatedCodeGenerator, we will
> know
> > > the
> > > >> > > > > > > TableFunction's
> > > >> > > > > > > > > > Kind
> > > >> > > > > > > > > > >> of
> > > >> > > > > > > > > > >> > > > > `FunctionKind.Table` or
> > > >> > `FunctionKind.ASYNC_TABLE`
> > > >> > > > > > > > > > >> > > > > For  `FunctionKind.ASYNC_TABLE` we can
> > > >> generate
> > > >> > a
> > > >> > > > > > > > > > >> AsyncWaitOperator
> > > >> > > > > > > > > > >> > to
> > > >> > > > > > > > > > >> > > > > execute the async table function.
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > > Thanks,
> > > >> > > > > > > > > > >> > > > > Aitozi.
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > > Jing Ge <j...@ververica.com.invalid>
> > > >> > > 于2023年5月29日周一
> > > >> > > > > > > 03:22写道:
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >> Hi Aitozi,
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> Thanks for the clarification. The
> naming
> > > >> > "Lookup"
> > > >> > > > > might
> > > >> > > > > > > > > suggest
> > > >> > > > > > > > > > >> > using
> > > >> > > > > > > > > > >> > > it
> > > >> > > > > > > > > > >> > > > >> for table look up. But conceptually
> what
> > > the
> > > >> > > eval()
> > > >> > > > > > > method
> > > >> > > > > > > > > will
> > > >> > > > > > > > > > >> do
> > > >> > > > > > > > > > >> > is
> > > >> > > > > > > > > > >> > > to
> > > >> > > > > > > > > > >> > > > >> get a collection of results(Row,
> > RowData)
> > > >> from
> > > >> > > the
> > > >> > > > > > given
> > > >> > > > > > > > > keys.
> > > >> > > > > > > > > > >> How
> > > >> > > > > > > > > > >> > it
> > > >> > > > > > > > > > >> > > > will
> > > >> > > > > > > > > > >> > > > >> be done depends on the implementation,
> > > i.e.
> > > >> you
> > > >> > > can
> > > >> > > > > > > > implement
> > > >> > > > > > > > > > >> your
> > > >> > > > > > > > > > >> > own
> > > >> > > > > > > > > > >> > > > >> Source[1][2]. The example in the FLIP
> > > >> should be
> > > >> > > > able
> > > >> > > > > to
> > > >> > > > > > > be
> > > >> > > > > > > > > > >> handled
> > > >> > > > > > > > > > >> > in
> > > >> > > > > > > > > > >> > > > this
> > > >> > > > > > > > > > >> > > > >> way.
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> Do you mean to support the
> > > AyncTableFunction
> > > >> > > beyond
> > > >> > > > > the
> > > >> > > > > > > > > > >> > > > LookupTableSource?
> > > >> > > > > > > > > > >> > > > >> It would be better if you could
> > elaborate
> > > >> the
> > > >> > > > > proposed
> > > >> > > > > > > > > changes
> > > >> > > > > > > > > > >> wrt
> > > >> > > > > > > > > > >> > the
> > > >> > > > > > > > > > >> > > > >> CorrelatedCodeGenerator with more
> > details.
> > > >> > > Thanks!
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> Best regards,
> > > >> > > > > > > > > > >> > > > >> Jing
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> [1]
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> > > >> > > > > > > > > > >> > > > >> [2]
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> On Sat, May 27, 2023 at 9:48 AM
> Aitozi <
> > > >> > > > > > > > gjying1...@gmail.com
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > >> > wrote:
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >> > Hi Jing,
> > > >> > > > > > > > > > >> > > > >> >     Thanks for your response. As
> > stated
> > > in
> > > >> > the
> > > >> > > > > FLIP,
> > > >> > > > > > > the
> > > >> > > > > > > > > > >> purpose
> > > >> > > > > > > > > > >> > of
> > > >> > > > > > > > > > >> > > > this
> > > >> > > > > > > > > > >> > > > >> > FLIP is meant to support
> > > >> > > > > > > > > > >> > > > >> > user-defined async table function.
> As
> > > >> > described
> > > >> > > > in
> > > >> > > > > > > flink
> > > >> > > > > > > > > > >> document
> > > >> > > > > > > > > > >> > > [1]
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > Async table functions are special
> > > >> functions
> > > >> > for
> > > >> > > > > table
> > > >> > > > > > > > > sources
> > > >> > > > > > > > > > >> that
> > > >> > > > > > > > > > >> > > > >> perform
> > > >> > > > > > > > > > >> > > > >> > > a lookup.
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > So end user can not directly define
> > and
> > > >> use
> > > >> > > async
> > > >> > > > > > table
> > > >> > > > > > > > > > >> function
> > > >> > > > > > > > > > >> > > now.
> > > >> > > > > > > > > > >> > > > An
> > > >> > > > > > > > > > >> > > > >> > user case is reported in [2]
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > So, in conclusion, no new interface
> is
> > > >> > > > introduced,
> > > >> > > > > > but
> > > >> > > > > > > we
> > > >> > > > > > > > > > >> extend
> > > >> > > > > > > > > > >> > the
> > > >> > > > > > > > > > >> > > > >> > ability to support user-defined
> async
> > > >> table
> > > >> > > > > function.
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > [1]:
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
> > > >> > > > > > > > > > >> > > > >> > [2]:
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > >
> > > >> > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > Thanks.
> > > >> > > > > > > > > > >> > > > >> > Aitozi.
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > Jing Ge <j...@ververica.com.invalid
> >
> > > >> > > > 于2023年5月27日周六
> > > >> > > > > > > > > 06:40写道:
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >> > > Hi Aitozi,
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > > Thanks for your proposal. I am not
> > > quite
> > > >> > sure
> > > >> > > > if
> > > >> > > > > I
> > > >> > > > > > > > > > understood
> > > >> > > > > > > > > > >> > your
> > > >> > > > > > > > > > >> > > > >> > thoughts
> > > >> > > > > > > > > > >> > > > >> > > correctly. You described a special
> > > case
> > > >> > > > > > > implementation
> > > >> > > > > > > > of
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > >> > > > >> > > AsyncTableFunction with on public
> > API
> > > >> > > changes.
> > > >> > > > > > Would
> > > >> > > > > > > > you
> > > >> > > > > > > > > > >> please
> > > >> > > > > > > > > > >> > > > >> elaborate
> > > >> > > > > > > > > > >> > > > >> > > your purpose of writing a FLIP
> > > >> according to
> > > >> > > the
> > > >> > > > > > FLIP
> > > >> > > > > > > > > > >> > > > documentation[1]?
> > > >> > > > > > > > > > >> > > > >> > > Thanks!
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > > [1]
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > > Best regards,
> > > >> > > > > > > > > > >> > > > >> > > Jing
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM
> > > Aitozi <
> > > >> > > > > > > > > > gjying1...@gmail.com
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >> > > > wrote:
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> > > > May I ask for some feedback  :D
> > > >> > > > > > > > > > >> > > > >> > > >
> > > >> > > > > > > > > > >> > > > >> > > > Thanks,
> > > >> > > > > > > > > > >> > > > >> > > > Aitozi
> > > >> > > > > > > > > > >> > > > >> > > >
> > > >> > > > > > > > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com>
> > > >> > > 于2023年5月23日周二
> > > >> > > > > > > 19:14写道:
> > > >> > > > > > > > > > >> > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > Just catch an user case report
> > > from
> > > >> > > Giannis
> > > >> > > > > > > Polyzos
> > > >> > > > > > > > > for
> > > >> > > > > > > > > > >> this
> > > >> > > > > > > > > > >> > > > >> usage:
> > > >> > > > > > > > > > >> > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >> > > > >
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > >
> > > >> > >
> https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > > >> > > > > > > > > > >> > > > >> > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com>
> > > >> > > > 于2023年5月23日周二
> > > >> > > > > > > > 17:45写道:
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > > Hi guys,
> > > >> > > > > > > > > > >> > > > >> > > > > >     I want to bring up a
> > > >> discussion
> > > >> > > about
> > > >> > > > > > > adding
> > > >> > > > > > > > > > >> support
> > > >> > > > > > > > > > >> > of
> > > >> > > > > > > > > > >> > > > User
> > > >> > > > > > > > > > >> > > > >> > > > > > Defined AsyncTableFunction
> in
> > > >> Flink.
> > > >> > > > > > > > > > >> > > > >> > > > > > Currently, async table
> > function
> > > >> are
> > > >> > > > special
> > > >> > > > > > > > > functions
> > > >> > > > > > > > > > >> for
> > > >> > > > > > > > > > >> > > > table
> > > >> > > > > > > > > > >> > > > >> > > source
> > > >> > > > > > > > > > >> > > > >> > > > > > to perform
> > > >> > > > > > > > > > >> > > > >> > > > > > async lookup. However, it's
> > > worth
> > > >> to
> > > >> > > > > support
> > > >> > > > > > > the
> > > >> > > > > > > > > user
> > > >> > > > > > > > > > >> > > defined
> > > >> > > > > > > > > > >> > > > >> async
> > > >> > > > > > > > > > >> > > > >> > > > > > table function.
> > > >> > > > > > > > > > >> > > > >> > > > > > Because, in this way, the
> end
> > > SQL
> > > >> > user
> > > >> > > > can
> > > >> > > > > > > > leverage
> > > >> > > > > > > > > > it
> > > >> > > > > > > > > > >> to
> > > >> > > > > > > > > > >> > > > >> perform
> > > >> > > > > > > > > > >> > > > >> > the
> > > >> > > > > > > > > > >> > > > >> > > > > > async operation
> > > >> > > > > > > > > > >> > > > >> > > > > > which is useful to maximum
> the
> > > >> system
> > > >> > > > > > > throughput
> > > >> > > > > > > > > > >> > especially
> > > >> > > > > > > > > > >> > > > for
> > > >> > > > > > > > > > >> > > > >> IO
> > > >> > > > > > > > > > >> > > > >> > > > > > bottleneck case.
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > > You can find some more
> detail
> > in
> > > >> [1].
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > > Looking forward to feedback
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > > [1]:
> > > >> > > > > > > > > > >> > > > >> > > >
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> > > >> > > > > > > > > > >> > > > >> > > > > >
> > > >> > > > > > > > > > >> > > > >> > > > > > Thanks,
> > > >> > > > > > > > > > >> > > > >> > > > > > Aitozi.
> > > >> > > > > > > > > > >> > > > >> > > >
> > > >> > > > > > > > > > >> > > > >> > >
> > > >> > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > >> > > >
> > > >> > > > > > > > > > >> > >
> > > >> > > > > > > > > > >> >
> > > >> > > > > > > > > > >>
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to