I'm delighted to see the progress on this. This is going to be a major
enabler for some important use cases.

The proposed simplifications (global config and ordered mode) for V1 make a
lot of sense to me. +1

David

On Wed, Dec 20, 2023 at 12:31 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:

> Thanks for that feedback Lincoln,
>
> Only one question with the async `timeout` parameter[1](since I
> > haven't seen the POC code), current description is: 'The time which can
> > pass before a restart strategy is triggered',
> > but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> > timeout is the total time, do we keep the behavior of the parameter
> > consistent?
>
> That's a good catch.  I was intending to use *AsyncWaitOperator*, and to
> pass this timeout directly.  Looking through the code a bit, it appears
> that it doesn't restart the timer on a retry, and this timeout is total, as
> you're saying.  I do intend on being consistent with the other FLIPs and
> retaining this behavior, so I will update the wording on my FLIP to reflect
> that.
>
> -Alan
>
> On Wed, Dec 20, 2023 at 1:36 AM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
>
> > +1 for this useful feature!
> > Hope this reply isn't too late. Agree that we start with global
> > async-scalar configuration and ordered mode first.
> >
> > @Alan Only one question with the async `timeout` parameter[1](since I
> > haven't seen the POC code), current description is: 'The time which can
> > pass before a restart strategy is triggered',
> > but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> > timeout is the total time, do we keep the behavior of the parameter
> > consistent?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> > [3]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年12月20日周三 08:41写道:
> >
> > > Thanks for the comments Timo.
> > >
> > >
> > > > Can you remove the necessary parts? Esp.:
> > >
> > >      @Override
> > > >      public Set<FunctionRequirement> getRequirements() {
> > > >          return Collections.singleton(FunctionRequirement.ORDERED);
> > > >      }
> > >
> > >
> > > I removed this section from the FLIP since presumably, there's no use
> in
> > > adding to the public API if it's ignored, with handling just ORDERED
> for
> > > the first version.  I'm not sure how quickly I'll want to add UNORDERED
> > > support, but I guess I can always do another FLIP.
> > >
> > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > > fine as well?
> > >
> > > That would be great.  Any areas that people are interested in
> discussing
> > > further before a vote?
> > >
> > > -Alan
> > >
> > > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > > >  > I would be totally fine with the first version only having ORDERED
> > > >  > mode. For a v2, we could attempt to do the next most conservative
> > > >  > thing
> > > >
> > > > Sounds good to me.
> > > >
> > > > I also cheked AsyncWaitOperator and could not find n access of
> > > > StreamRecord's timestamp but only watermarks. But as we said, let's
> > > > focus on ORDERED first.
> > > >
> > > > Can you remove the necessary parts? Esp.:
> > > >
> > > >      @Override
> > > >      public Set<FunctionRequirement> getRequirements() {
> > > >          return Collections.singleton(FunctionRequirement.ORDERED);
> > > >      }
> > > >
> > > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > > fine as well?
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > > On 19.12.23 07:32, Alan Sheinberg wrote:
> > > > > Thanks for the helpful comments, Xuyang and Timo.
> > > > >
> > > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> > kafka
> > > as
> > > > >> source and mysql as sink as an example.
> > > > >> Although kafka is an append-only source, one of its fields is used
> > as
> > > pk
> > > > >> when writing to mysql. If async udx is executed
> > > > >>   in an unordered mode, there may be problems with the data in
> mysql
> > > in
> > > > the
> > > > >> end. In this case, we need to ensure that
> > > > >> the sink-based pk is in order actually.
> > > > >
> > > > >
> > > > > @Xuyang: That's a great point.  If some node downstream of my
> > operator
> > > > > cares about ordering, there's no way for it to reconstruct the
> > original
> > > > > ordering of the rows as they were input to my operator.  So even if
> > > they
> > > > > want to preserve ordering by key, the order in which they see it
> may
> > > > > already be incorrect.  Somehow I thought that maybe the analysis of
> > the
> > > > > changelog mode at a given operator was aware of downstream
> > operations,
> > > > but
> > > > > it seems not.
> > > > >
> > > > > Clear "no" on this. Changelog semantics make the planner complex
> and
> > we
> > > > >> need to be careful. Therefore I would strongly suggest we
> introduce
> > > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for
> > it
> > > in
> > > > >> plans with appropriate planner rules that guard it.
> > > > >
> > > > >
> > > > > @Timo: The better I understand the complexity, the more I agree
> with
> > > > this.
> > > > > I would be totally fine with the first version only having ORDERED
> > > mode.
> > > > > For a v2, we could attempt to do the next most conservative thing
> and
> > > > only
> > > > > allow UNORDERED when the whole graph is in *INSERT *changelog mode.
> > > The
> > > > > next best type of optimization might understand what's the key
> > required
> > > > > downstream, and allow breaking the original order only between
> > > unrelated
> > > > > keys, but maintaining it between rows of the same key.  Of course
> if
> > > the
> > > > > key used downstream is computed in some manner, that makes it all
> the
> > > > > harder to know this beforehand.
> > > > >
> > > > > So unordering should be fine *within* watermarks. This is also what
> > > > >> watermarks are good for, a trade-off between strict ordering and
> > > making
> > > > >> progress. The async operator from DataStream API also supports
> this
> > > if I
> > > > >> remember correctly. However, it assumes a timestamp is present in
> > > > >> StreamRecord on which it can work. But this is not the case within
> > the
> > > > >> SQL engine.
> > > > >
> > > > >
> > > > > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the
> > > > implementations
> > > > > I plan on using) seem to support exactly this behavior.  I don't
> > think
> > > it
> > > > > makes assumptions about the record's timestamp, but just preserves
> > > > whatever
> > > > > the input order is w.r.t watermarks.  I'd be curious to understand
> > the
> > > > > timestamp use in more detail and see if it's required with the
> > > mentioned
> > > > > classes.
> > > > >
> > > > > TLDR: Let's focus on ORDERED first.
> > > > >
> > > > >
> > > > > I'm more than happy to start here and we can consider UNORDERED as
> a
> > > > > followup.  Then maybe we consider only INSERT mode graphs and ones
> > > where
> > > > we
> > > > > can solve the watermark constraints.
> > > > >
> > > > > Thanks,
> > > > > Alan
> > > > >
> > > > >
> > > > > On Mon, Dec 18, 2023 at 2:36 AM Timo Walther <twal...@apache.org>
> > > wrote:
> > > > >
> > > > >> Hi Xuyang and Alan,
> > > > >>
> > > > >> thanks for this productive discussion.
> > > > >>
> > > > >>   > Would it make a difference if it were exposed by the explain
> > > > >>
> > > > >> @Alan: I think this is great idea. +1 on exposing the sync/async
> > > > >> behavior thought EXPLAIN.
> > > > >>
> > > > >>
> > > > >>   > Is there an easy way to determine if the output of an async
> > > function
> > > > >>   > would be problematic or not?
> > > > >>
> > > > >> Clear "no" on this. Changelog semantics make the planner complex
> and
> > > we
> > > > >> need to be careful. Therefore I would strongly suggest we
> introduce
> > > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for
> > it
> > > in
> > > > >> plans with appropriate planner rules that guard it.
> > > > >>
> > > > >>   > If the input to the operator is append-only, it seems fine,
> > > because
> > > > >>   > this implies that each row is effectively independent and
> > ordering
> > > > is
> > > > >>   > unimportant.
> > > > >>
> > > > >> As @Xuyang pointed out, it's not only the input that decides
> whether
> > > > >> append-only is safe. It's also the subsequent operators in the
> > > pipeline.
> > > > >> The example of Xuyang is a good one, when the sink operates in
> > upsert
> > > > >> mode. Append-only source, append-only operators, and append-only
> > sink
> > > > >> are safer.
> > > > >>
> > > > >> However, even in this combination, a row is not fully
> "independent"
> > > > >> there are still watermarks flowing between rows:
> > > > >>
> > > > >> R(5), W(4), R(3), R(4), R(2), R(1), W(0)
> > > > >>
> > > > >> So unordering should be fine *within* watermarks. This is also
> what
> > > > >> watermarks are good for, a trade-off between strict ordering and
> > > making
> > > > >> progress. The async operator from DataStream API also supports
> this
> > > if I
> > > > >> remember correctly. However, it assumes a timestamp is present in
> > > > >> StreamRecord on which it can work. But this is not the case within
> > the
> > > > >> SQL engine.
> > > > >>
> > > > >> TLDR: Let's focus on ORDERED first.
> > > > >>
> > > > >> If we want to use UNORDERED, I would suggest to check the input
> > > operator
> > > > >> for exactly 1 time attribute column. If there is exactly 1 time
> > > > >> attribute column, we could insert it into the StreamRecord and
> allow
> > > > >> UNORDERED mode. If this condition is not met, we go with ORDERED.
> > > > >>
> > > > >> Regards,
> > > > >> Timo
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On 18.12.23 07:05, Xuyang wrote:
> > > > >>> Hi, Alan and Timo. Thanks for your reply.
> > > > >>>> Would it make a difference if it were exposed by the explain
> > > > >>>> method (the operator having "syncMode" vs not)?
> > > > >>> @Alan: I think this is a good way to tell the user what mode
> these
> > > > async
> > > > >> udx are currently in.
> > > > >>>> A regular SQL user doesn't care whether the function is sync or
> > > async.
> > > > >>> @Timo: I agree that the planner should throw as few exceptions as
> > > > >> possible rather than confusing users. So I think
> > > > >>> it is a good way to expose syncMode through explain syntax.
> > > > >>>> If the input to the operator is append-only, it seems fine,
> > > > >>>> because this implies that each row is effectively independent
> and
> > > > >> ordering is unimportant.
> > > > >>>
> > > > >>>
> > > > >>>> For example, if the query is > an append-only `SELECT FUNC(c)
> FROM
> > > t`,
> > > > >>>> I don't see a reason why the > operator is not allowed to
> produce
> > > > >> unordered results.
> > > > >>>
> > > > >>>
> > > > >>> @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> > > kafka
> > > > >> as source and mysql as sink as an example.
> > > > >>> Although kafka is an append-only source, one of its fields is
> used
> > as
> > > > pk
> > > > >> when writing to mysql. If async udx is executed
> > > > >>>    in an unordered mode, there may be problems with the data in
> > mysql
> > > > in
> > > > >> the end. In this case, we need to ensure that
> > > > >>> the sink-based pk is in order actually.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>>
> > > > >>>       Best!
> > > > >>>       Xuyang
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> At 2023-12-16 03:33:55, "Alan Sheinberg" <
> asheinb...@confluent.io
> > > > .INVALID>
> > > > >> wrote:
> > > > >>>> Thanks for the replies everyone.  My responses are inline:
> > > > >>>>
> > > > >>>> About the configs, what do you think using hints as mentioned in
> > > [1].
> > > > >>>>
> > > > >>>> @Aitozi: I think hints could be a good way to do this, similar
> to
> > > > lookup
> > > > >>>> joins or the proposal in FLIP-313.  One benefit of hints is that
> > > they
> > > > >> allow
> > > > >>>> for the highest granularity of configuration because you can
> > decide
> > > at
> > > > >>>> each and every call site just what parameters to use.  The
> > downside
> > > of
> > > > >>>> hints is that there's more syntax to learn and more verbosity.
> > I'm
> > > > >>>> somewhat partial to a configuration like this with a class
> > > definition
> > > > >> level
> > > > >>>> of granularity (similar to how metrics reporters are defined
> [1]):
> > > > >>>>
> > > > >>>> table.exec.async-scalar.myfunc.class:
> > > > >> org.apache.flink.MyAsyncScalarFunction
> > > > >>>> table.exec.async-scalar.myfunc.buffer-capacity: 10
> > > > >>>> ...
> > > > >>>>
> > > > >>>> As Timo mentioned, the downside to this is that there's not a
> nice
> > > > >> static
> > > > >>>> way to do this at the moment, unless you extend ConfigOption.
> It
> > > > would
> > > > >> be
> > > > >>>> good ultimately if Lookup joins, async scalar functions, and
> other
> > > > >> future
> > > > >>>> configurable UDFs shared the same methodology, but maybe a
> unified
> > > > >> approach
> > > > >>>> is a followup discussion.
> > > > >>>>
> > > > >>>> I’m just curious why you don’t use conf(global) and query
> > > > >> hint(individual
> > > > >>>>> async udx) to mark the output
> > > > >>>>> mode 'order' or 'unorder' like async look join [1] and async
> > > udtf[2],
> > > > >> but
> > > > >>>>> chose to introduce a new enum
> > > > >>>>> in AsyncScalarFunction.
> > > > >>>>
> > > > >>>>
> > > > >>>> @Xuyang: I'm open to adding hints. I think the important part is
> > > that
> > > > we
> > > > >>>> have some method for the user to have a class definition level
> way
> > > to
> > > > >>>> define whether ORDERED or ALLOW_UNORDERED is most appropriate.
> I
> > > > don't
> > > > >>>> have a strong sense yet for what would be most appropriately
> > exposed
> > > > as
> > > > >> a
> > > > >>>> FunctionRequirement vs a simple configuration/hint.
> > > > >>>>
> > > > >>>> What about throwing an exception to make it clear to users that
> > > using
> > > > >> async
> > > > >>>>> scalar functions in this situation
> > > > >>>>> is problematic instead of executing silently in sync mode?
> > Because
> > > > >> users
> > > > >>>>> may be confused about
> > > > >>>>> the final actual job graph.
> > > > >>>>
> > > > >>>>
> > > > >>>> @Xuyang: Would it make a difference if it were exposed by the
> > > explain
> > > > >>>> method (the operator having "syncMode" vs not)?  I'd be fine to
> do
> > > it
> > > > >>>> either way -- certainly throwing an error is a bit simpler.
> > > > >>>>
> > > > >>>> You are right. Actually it should be the planner that fully
> > decides
> > > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the
> > > query
> > > > >> is
> > > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason
> why
> > > the
> > > > >>>>> operator is not allowed to produce unordered results. By global
> > > > >>>>> configuration, we can set ORDERED such that users don't get
> > > confused
> > > > >>>>> about the unordered output.
> > > > >>>>
> > > > >>>>
> > > > >>>> @Timo: Is there an easy way to determine if the output of an
> async
> > > > >> function
> > > > >>>> would be problematic or not?  If the input to the operator is
> > > > >> append-only,
> > > > >>>> it seems fine, because this implies that each row is effectively
> > > > >>>> independent and ordering is unimportant. For upsert mode with +U
> > > rows,
> > > > >> you
> > > > >>>> wouldn't want to swap order with other +U rows for the same key
> > > > because
> > > > >> the
> > > > >>>> last one should win.  For -D or -U rows, you wouldn't want to
> swap
> > > > with
> > > > >>>> other rows for the same key for similar reasons.  Is it as
> simple
> > as
> > > > >>>> looking for the changlelog mode to determine whether it's safe
> to
> > > run
> > > > >> async
> > > > >>>> functions UNORDERED?  I had considered analyzing various query
> > forms
> > > > >> (join
> > > > >>>> vs aggregation vs whatever), but it seems like changelog mode
> > could
> > > be
> > > > >>>> sufficient to understand what works and what would be an issue.
> > Any
> > > > >> code
> > > > >>>> pointers and explanation for similar analysis would be great to
> > > > >> understand
> > > > >>>> this more.
> > > > >>>>
> > > > >>>> The mode UNORDERED however should only have
> > > > >>>>> effect for these simply use cases and throw an exception if
> > > UNORDERED
> > > > >>>>> would mess up a changelog or other subsequent operators.
> > > > >>>>
> > > > >>>> @Timo: Should we throw errors or run in sync mode?  It seems
> like
> > > > >> running
> > > > >>>> in sync mode is an option to ensure correctness in all changelog
> > > > modes.
> > > > >>>>
> > > > >>>> Let's go with global configuration first and later introduce
> > > > >>>>> hints. I feel the more hints we introduce, the harder SQL
> queries
> > > get
> > > > >>>>> when maintaining them.
> > > > >>>>
> > > > >>>> @Timo: That seems like a reasonable approach to me.
> > > > >>>>
> > > > >>>> -Alan
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> > > > >>>>
> > > > >>>> On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <
> twal...@apache.org>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> 1. Override the function `getRequirements` in
> > `AsyncScalarFunction`
> > > > >>>>>
> > > > >>>>>    > If the user overrides `requirements()` to omit the
> `ORDERED`
> > > > >>>>>    > requirement, do we allow the operator to return
> out-of-order
> > > > >> results
> > > > >>>>>    > or should it fall back on
> `AsyncOutputMode.ALLOW_UNORDERED`
> > > type
> > > > >>>>>    > behavior (where we allow out-of-order only if it's deemed
> > > > correct)?
> > > > >>>>>
> > > > >>>>> You are right. Actually it should be the planner that fully
> > decides
> > > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the
> > > query
> > > > >> is
> > > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason
> why
> > > the
> > > > >>>>> operator is not allowed to produce unordered results. By global
> > > > >>>>> configuration, we can set ORDERED such that users don't get
> > > confused
> > > > >>>>> about the unordered output. The mode UNORDERED however should
> > only
> > > > have
> > > > >>>>> effect for these simply use cases and throw an exception if
> > > UNORDERED
> > > > >>>>> would mess up a changelog or other subsequent operators.
> > > > >>>>>
> > > > >>>>> 2. In some scenarios with semantic correctness, async operators
> > > must
> > > > be
> > > > >>>>> executed in sync mode.
> > > > >>>>>
> > > > >>>>>    > What about throwing an exception to make it clear to users
> > > that
> > > > >> using
> > > > >>>>> async scalar functions
> > > > >>>>>
> > > > >>>>> @Xuyang: A regular SQL user doesn't care whether the function
> is
> > > sync
> > > > >> or
> > > > >>>>> async. The planner should simply give its best to make the
> > > execution
> > > > >>>>> performant. I would not throw an exception here. There more
> > > > exceptions
> > > > >>>>> the, the more struggles and questions from the user.
> > Conceptually,
> > > we
> > > > >>>>> can run async code also sync, and that's why we should also do
> it
> > > to
> > > > >>>>> avoid errors.
> > > > >>>>>
> > > > >>>>> 3. Hints
> > > > >>>>>
> > > > >>>>> @Aitozi: Let's go with global configuration first and later
> > > introduce
> > > > >>>>> hints. I feel the more hints we introduce, the harder SQL
> queries
> > > get
> > > > >>>>> when maintaining them.
> > > > >>>>>
> > > > >>>>> Regards,
> > > > >>>>> Timo
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On 15.12.23 04:51, Xuyang wrote:
> > > > >>>>>> Hi, Alan. Thanks for driving this.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Using async to improve throughput has been done on look join,
> > and
> > > > the
> > > > >>>>> improvement
> > > > >>>>>> effect is obvious, so I think it makes sense to support async
> > > scalar
> > > > >>>>> function.  Big +1 for this flip.
> > > > >>>>>> I have some questions below.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> 1. Override the function `getRequirements` in
> > > `AsyncScalarFunction`
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> I’m just curious why you don’t use conf(global) and query
> > > > >>>>> hint(individual async udx) to mark the output
> > > > >>>>>> mode 'order' or 'unorder' like async look join [1] and async
> > > > udtf[2],
> > > > >>>>> but chose to introduce a new enum
> > > > >>>>>> in AsyncScalarFunction.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> 2. In some scenarios with semantic correctness, async
> operators
> > > must
> > > > >> be
> > > > >>>>> executed in sync mode.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> What about throwing an exception to make it clear to users
> that
> > > > using
> > > > >>>>> async scalar functions in this situation
> > > > >>>>>> is problematic instead of executing silently in sync mode?
> > Because
> > > > >> users
> > > > >>>>> may be confused about
> > > > >>>>>> the final actual job graph.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> [1]
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> > > > >>>>>> [2]
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> --
> > > > >>>>>>
> > > > >>>>>>        Best!
> > > > >>>>>>        Xuyang
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
> > > > >>>>>>> Hi Alan,
> > > > >>>>>>>       Nice FLIP, I also explore leveraging the async table
> > > > >> function[1] to
> > > > >>>>>>> improve the throughput before.
> > > > >>>>>>>
> > > > >>>>>>> About the configs, what do you think using hints as mentioned
> > in
> > > > [1].
> > > > >>>>>>>
> > > > >>>>>>> [1]:
> > > > >>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> > > > >>>>>>>
> > > > >>>>>>> Thanks,
> > > > >>>>>>> Aitozi.
> > > > >>>>>>>
> > > > >>>>>>> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Alan,
> > > > >>>>>>>>
> > > > >>>>>>>> thanks for proposing this FLIP. It's a great addition to
> Flink
> > > and
> > > > >> has
> > > > >>>>>>>> been requested multiple times. It will be in particular
> > > > interesting
> > > > >> for
> > > > >>>>>>>> accessing REST endpoints and other remote services.
> > > > >>>>>>>>
> > > > >>>>>>>> Great that we can generalize and reuse parts of the Python
> > > planner
> > > > >>>>> rules
> > > > >>>>>>>> and code for this.
> > > > >>>>>>>>
> > > > >>>>>>>> I have some feedback regarding the API:
> > > > >>>>>>>>
> > > > >>>>>>>> 1) Configuration
> > > > >>>>>>>>
> > > > >>>>>>>> Configuration keys like
> > > > >>>>>>>>
> > > > >>>>>>>>
> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
> > > > >>>>>>>>
> > > > >>>>>>>> are currently not supported in the configuration stack. The
> > key
> > > > >> space
> > > > >>>>>>>> should remain constant. Only a constant key space enables
> the
> > > use
> > > > of
> > > > >>>>> the
> > > > >>>>>>>> ConfigOption class which is required in the layered
> > > configuration.
> > > > >> For
> > > > >>>>>>>> now I would suggest to only allow a global setting for
> buffer
> > > > >> capacity,
> > > > >>>>>>>> timeout, and retry-strategy. We can later work on a
> > per-function
> > > > >>>>>>>> configuration (potentially also needed for other use cases).
> > > > >>>>>>>>
> > > > >>>>>>>> 2) Semantical declaration
> > > > >>>>>>>>
> > > > >>>>>>>> Regarding
> > > > >>>>>>>>
> > > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
> > > > >>>>>>>>
> > > > >>>>>>>> this is a semantical property of a function and should be
> > > defined
> > > > >>>>>>>> per-function. It impacts the query result and potentially
> the
> > > > >> behavior
> > > > >>>>>>>> of planner rules.
> > > > >>>>>>>>
> > > > >>>>>>>> I see two options for this either: (a) an additional method
> in
> > > > >>>>>>>> AsyncScalarFunction or (b) adding this to the function's
> > > > >> requirements.
> > > > >>>>> I
> > > > >>>>>>>> vote for (b), because a FunctionDefinition should be fully
> > self
> > > > >>>>>>>> contained and sufficient for planning.
> > > > >>>>>>>>
> > > > >>>>>>>> Thus, for `FunctionDefinition.getRequirements():
> > > > >>>>>>>> Set<FunctionRequirement>` we can add a new requirement
> > `ORDERED`
> > > > >> which
> > > > >>>>>>>> should also be the default for AsyncScalarFunction.
> > > > >> `getRequirements()`
> > > > >>>>>>>> can be overwritten and return a set without this requirement
> > if
> > > > the
> > > > >>>>> user
> > > > >>>>>>>> intents to do this.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks,
> > > > >>>>>>>> Timo
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On 11.12.23 18:43, Piotr Nowojski wrote:
> > > > >>>>>>>>> +1 to the idea, I don't have any comments.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best,
> > > > >>>>>>>>> Piotrek
> > > > >>>>>>>>>
> > > > >>>>>>>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg <
> > > asheinb...@confluent.io
> > > > >>>>>>>> .invalid>
> > > > >>>>>>>>> napisał(a):
> > > > >>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Nicely written and makes sense.  The only feedback I have
> > is
> > > > >> around
> > > > >>>>> the
> > > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically,
> > > > >>>>>>>> PythonCalcSplitRuleBase
> > > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This
> > > naming
> > > > >>>>> seems
> > > > >>>>>>>> to
> > > > >>>>>>>>>>> imply/suggest that all Async functions are remote.  I
> > wonder
> > > if
> > > > >> we
> > > > >>>>> can
> > > > >>>>>>>>>> find
> > > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe
> > > > >>>>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which
> > > > handles
> > > > >>>>>>>> Python
> > > > >>>>>>>>>>> and Async functions seems reasonable.)
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> Thanks.  That's fair.  I agree that "Remote" isn't always
> > > > >> accurate.
> > > > >>>>> I
> > > > >>>>>>>>>> believe that the python calls are also done
> asynchronously,
> > so
> > > > >> that
> > > > >>>>>>>> might
> > > > >>>>>>>>>> be a reasonable name, so long as there's no confusion
> > between
> > > > the
> > > > >>>>> base
> > > > >>>>>>>> and
> > > > >>>>>>>>>> async child class.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
> > > > >>>>> <jhug...@confluent.io.invalid
> > > > >>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Alan,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Nicely written and makes sense.  The only feedback I have
> > is
> > > > >> around
> > > > >>>>> the
> > > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically,
> > > > >>>>>>>> PythonCalcSplitRuleBase
> > > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This
> > > naming
> > > > >>>>> seems
> > > > >>>>>>>> to
> > > > >>>>>>>>>>> imply/suggest that all Async functions are remote.  I
> > wonder
> > > if
> > > > >> we
> > > > >>>>> can
> > > > >>>>>>>>>> find
> > > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe
> > > > >>>>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which
> > > > handles
> > > > >>>>>>>> Python
> > > > >>>>>>>>>>> and Async functions seems reasonable.)
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Jim
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> > > > >>>>>>>>>>> <asheinb...@confluent.io.invalid> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> I'd like to start a discussion of FLIP-400:
> > > > AsyncScalarFunction
> > > > >> for
> > > > >>>>>>>>>>>> asynchronous scalar function support [1]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> This feature proposes adding a new UDF type
> > > > AsyncScalarFunction
> > > > >>>>> which
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>>> invoked just like a normal ScalarFunction, but is
> > > implemented
> > > > >> with
> > > > >>>>> an
> > > > >>>>>>>>>>>> asynchronous eval method.  I had brought this up
> including
> > > the
> > > > >>>>>>>>>> motivation
> > > > >>>>>>>>>>>> in a previous discussion thread [2].
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> The purpose is to achieve high throughput scalar
> function
> > > UDFs
> > > > >>>>> while
> > > > >>>>>>>>>>>> allowing that an individual call may have high latency.
> > It
> > > > >> allows
> > > > >>>>>>>>>>> scaling
> > > > >>>>>>>>>>>> up the parallelism of just these calls without having to
> > > > >> increase
> > > > >>>>> the
> > > > >>>>>>>>>>>> parallelism of the whole query (which could be rather
> > > resource
> > > > >>>>>>>>>>>> inefficient).
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> In practice, it should enable SQL integration with
> > external
> > > > >>>>> services
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>> systems, which Flink has limited support for at the
> > moment.
> > > It
> > > > >>>>> should
> > > > >>>>>>>>>>> also
> > > > >>>>>>>>>>>> allow easier integration with existing libraries which
> use
> > > > >>>>>>>> asynchronous
> > > > >>>>>>>>>>>> APIs.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Looking forward to your feedback and suggestions.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> [1]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > > >>>>>>>>>>>> <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> [2]
> > > > >>>>>
> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> > > > >>>>>>>>>>>> <
> > > > >> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>> Alan
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to