Thanks for that feedback Lincoln,

Only one question with the async `timeout` parameter[1](since I
> haven't seen the POC code), current description is: 'The time which can
> pass before a restart strategy is triggered',
> but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> timeout is the total time, do we keep the behavior of the parameter
> consistent?

That's a good catch.  I was intending to use *AsyncWaitOperator*, and to
pass this timeout directly.  Looking through the code a bit, it appears
that it doesn't restart the timer on a retry, and this timeout is total, as
you're saying.  I do intend on being consistent with the other FLIPs and
retaining this behavior, so I will update the wording on my FLIP to reflect
that.

-Alan

On Wed, Dec 20, 2023 at 1:36 AM Lincoln Lee <lincoln.8...@gmail.com> wrote:

> +1 for this useful feature!
> Hope this reply isn't too late. Agree that we start with global
> async-scalar configuration and ordered mode first.
>
> @Alan Only one question with the async `timeout` parameter[1](since I
> haven't seen the POC code), current description is: 'The time which can
> pass before a restart strategy is triggered',
> but in the previous flip-232[2] and flip-234[3], in retry scenario, this
> timeout is the total time, do we keep the behavior of the parameter
> consistent?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
>
> Best,
> Lincoln Lee
>
>
> Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年12月20日周三 08:41写道:
>
> > Thanks for the comments Timo.
> >
> >
> > > Can you remove the necessary parts? Esp.:
> >
> >      @Override
> > >      public Set<FunctionRequirement> getRequirements() {
> > >          return Collections.singleton(FunctionRequirement.ORDERED);
> > >      }
> >
> >
> > I removed this section from the FLIP since presumably, there's no use in
> > adding to the public API if it's ignored, with handling just ORDERED for
> > the first version.  I'm not sure how quickly I'll want to add UNORDERED
> > support, but I guess I can always do another FLIP.
> >
> > Otherwise I have no objections to start a VOTE soonish. If others are
> > > fine as well?
> >
> > That would be great.  Any areas that people are interested in discussing
> > further before a vote?
> >
> > -Alan
> >
> > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther <twal...@apache.org> wrote:
> >
> > >  > I would be totally fine with the first version only having ORDERED
> > >  > mode. For a v2, we could attempt to do the next most conservative
> > >  > thing
> > >
> > > Sounds good to me.
> > >
> > > I also cheked AsyncWaitOperator and could not find n access of
> > > StreamRecord's timestamp but only watermarks. But as we said, let's
> > > focus on ORDERED first.
> > >
> > > Can you remove the necessary parts? Esp.:
> > >
> > >      @Override
> > >      public Set<FunctionRequirement> getRequirements() {
> > >          return Collections.singleton(FunctionRequirement.ORDERED);
> > >      }
> > >
> > > Otherwise I have no objections to start a VOTE soonish. If others are
> > > fine as well?
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 19.12.23 07:32, Alan Sheinberg wrote:
> > > > Thanks for the helpful comments, Xuyang and Timo.
> > > >
> > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> kafka
> > as
> > > >> source and mysql as sink as an example.
> > > >> Although kafka is an append-only source, one of its fields is used
> as
> > pk
> > > >> when writing to mysql. If async udx is executed
> > > >>   in an unordered mode, there may be problems with the data in mysql
> > in
> > > the
> > > >> end. In this case, we need to ensure that
> > > >> the sink-based pk is in order actually.
> > > >
> > > >
> > > > @Xuyang: That's a great point.  If some node downstream of my
> operator
> > > > cares about ordering, there's no way for it to reconstruct the
> original
> > > > ordering of the rows as they were input to my operator.  So even if
> > they
> > > > want to preserve ordering by key, the order in which they see it may
> > > > already be incorrect.  Somehow I thought that maybe the analysis of
> the
> > > > changelog mode at a given operator was aware of downstream
> operations,
> > > but
> > > > it seems not.
> > > >
> > > > Clear "no" on this. Changelog semantics make the planner complex and
> we
> > > >> need to be careful. Therefore I would strongly suggest we introduce
> > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for
> it
> > in
> > > >> plans with appropriate planner rules that guard it.
> > > >
> > > >
> > > > @Timo: The better I understand the complexity, the more I agree with
> > > this.
> > > > I would be totally fine with the first version only having ORDERED
> > mode.
> > > > For a v2, we could attempt to do the next most conservative thing and
> > > only
> > > > allow UNORDERED when the whole graph is in *INSERT *changelog mode.
> > The
> > > > next best type of optimization might understand what's the key
> required
> > > > downstream, and allow breaking the original order only between
> > unrelated
> > > > keys, but maintaining it between rows of the same key.  Of course if
> > the
> > > > key used downstream is computed in some manner, that makes it all the
> > > > harder to know this beforehand.
> > > >
> > > > So unordering should be fine *within* watermarks. This is also what
> > > >> watermarks are good for, a trade-off between strict ordering and
> > making
> > > >> progress. The async operator from DataStream API also supports this
> > if I
> > > >> remember correctly. However, it assumes a timestamp is present in
> > > >> StreamRecord on which it can work. But this is not the case within
> the
> > > >> SQL engine.
> > > >
> > > >
> > > > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the
> > > implementations
> > > > I plan on using) seem to support exactly this behavior.  I don't
> think
> > it
> > > > makes assumptions about the record's timestamp, but just preserves
> > > whatever
> > > > the input order is w.r.t watermarks.  I'd be curious to understand
> the
> > > > timestamp use in more detail and see if it's required with the
> > mentioned
> > > > classes.
> > > >
> > > > TLDR: Let's focus on ORDERED first.
> > > >
> > > >
> > > > I'm more than happy to start here and we can consider UNORDERED as a
> > > > followup.  Then maybe we consider only INSERT mode graphs and ones
> > where
> > > we
> > > > can solve the watermark constraints.
> > > >
> > > > Thanks,
> > > > Alan
> > > >
> > > >
> > > > On Mon, Dec 18, 2023 at 2:36 AM Timo Walther <twal...@apache.org>
> > wrote:
> > > >
> > > >> Hi Xuyang and Alan,
> > > >>
> > > >> thanks for this productive discussion.
> > > >>
> > > >>   > Would it make a difference if it were exposed by the explain
> > > >>
> > > >> @Alan: I think this is great idea. +1 on exposing the sync/async
> > > >> behavior thought EXPLAIN.
> > > >>
> > > >>
> > > >>   > Is there an easy way to determine if the output of an async
> > function
> > > >>   > would be problematic or not?
> > > >>
> > > >> Clear "no" on this. Changelog semantics make the planner complex and
> > we
> > > >> need to be careful. Therefore I would strongly suggest we introduce
> > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for
> it
> > in
> > > >> plans with appropriate planner rules that guard it.
> > > >>
> > > >>   > If the input to the operator is append-only, it seems fine,
> > because
> > > >>   > this implies that each row is effectively independent and
> ordering
> > > is
> > > >>   > unimportant.
> > > >>
> > > >> As @Xuyang pointed out, it's not only the input that decides whether
> > > >> append-only is safe. It's also the subsequent operators in the
> > pipeline.
> > > >> The example of Xuyang is a good one, when the sink operates in
> upsert
> > > >> mode. Append-only source, append-only operators, and append-only
> sink
> > > >> are safer.
> > > >>
> > > >> However, even in this combination, a row is not fully "independent"
> > > >> there are still watermarks flowing between rows:
> > > >>
> > > >> R(5), W(4), R(3), R(4), R(2), R(1), W(0)
> > > >>
> > > >> So unordering should be fine *within* watermarks. This is also what
> > > >> watermarks are good for, a trade-off between strict ordering and
> > making
> > > >> progress. The async operator from DataStream API also supports this
> > if I
> > > >> remember correctly. However, it assumes a timestamp is present in
> > > >> StreamRecord on which it can work. But this is not the case within
> the
> > > >> SQL engine.
> > > >>
> > > >> TLDR: Let's focus on ORDERED first.
> > > >>
> > > >> If we want to use UNORDERED, I would suggest to check the input
> > operator
> > > >> for exactly 1 time attribute column. If there is exactly 1 time
> > > >> attribute column, we could insert it into the StreamRecord and allow
> > > >> UNORDERED mode. If this condition is not met, we go with ORDERED.
> > > >>
> > > >> Regards,
> > > >> Timo
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On 18.12.23 07:05, Xuyang wrote:
> > > >>> Hi, Alan and Timo. Thanks for your reply.
> > > >>>> Would it make a difference if it were exposed by the explain
> > > >>>> method (the operator having "syncMode" vs not)?
> > > >>> @Alan: I think this is a good way to tell the user what mode these
> > > async
> > > >> udx are currently in.
> > > >>>> A regular SQL user doesn't care whether the function is sync or
> > async.
> > > >>> @Timo: I agree that the planner should throw as few exceptions as
> > > >> possible rather than confusing users. So I think
> > > >>> it is a good way to expose syncMode through explain syntax.
> > > >>>> If the input to the operator is append-only, it seems fine,
> > > >>>> because this implies that each row is effectively independent and
> > > >> ordering is unimportant.
> > > >>>
> > > >>>
> > > >>>> For example, if the query is > an append-only `SELECT FUNC(c) FROM
> > t`,
> > > >>>> I don't see a reason why the > operator is not allowed to produce
> > > >> unordered results.
> > > >>>
> > > >>>
> > > >>> @Timo, @Alan: IIUC, there seems to be something wrong here. Take
> > kafka
> > > >> as source and mysql as sink as an example.
> > > >>> Although kafka is an append-only source, one of its fields is used
> as
> > > pk
> > > >> when writing to mysql. If async udx is executed
> > > >>>    in an unordered mode, there may be problems with the data in
> mysql
> > > in
> > > >> the end. In this case, we need to ensure that
> > > >>> the sink-based pk is in order actually.
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>>
> > > >>>       Best!
> > > >>>       Xuyang
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io
> > > .INVALID>
> > > >> wrote:
> > > >>>> Thanks for the replies everyone.  My responses are inline:
> > > >>>>
> > > >>>> About the configs, what do you think using hints as mentioned in
> > [1].
> > > >>>>
> > > >>>> @Aitozi: I think hints could be a good way to do this, similar to
> > > lookup
> > > >>>> joins or the proposal in FLIP-313.  One benefit of hints is that
> > they
> > > >> allow
> > > >>>> for the highest granularity of configuration because you can
> decide
> > at
> > > >>>> each and every call site just what parameters to use.  The
> downside
> > of
> > > >>>> hints is that there's more syntax to learn and more verbosity.
> I'm
> > > >>>> somewhat partial to a configuration like this with a class
> > definition
> > > >> level
> > > >>>> of granularity (similar to how metrics reporters are defined [1]):
> > > >>>>
> > > >>>> table.exec.async-scalar.myfunc.class:
> > > >> org.apache.flink.MyAsyncScalarFunction
> > > >>>> table.exec.async-scalar.myfunc.buffer-capacity: 10
> > > >>>> ...
> > > >>>>
> > > >>>> As Timo mentioned, the downside to this is that there's not a nice
> > > >> static
> > > >>>> way to do this at the moment, unless you extend ConfigOption.  It
> > > would
> > > >> be
> > > >>>> good ultimately if Lookup joins, async scalar functions, and other
> > > >> future
> > > >>>> configurable UDFs shared the same methodology, but maybe a unified
> > > >> approach
> > > >>>> is a followup discussion.
> > > >>>>
> > > >>>> I’m just curious why you don’t use conf(global) and query
> > > >> hint(individual
> > > >>>>> async udx) to mark the output
> > > >>>>> mode 'order' or 'unorder' like async look join [1] and async
> > udtf[2],
> > > >> but
> > > >>>>> chose to introduce a new enum
> > > >>>>> in AsyncScalarFunction.
> > > >>>>
> > > >>>>
> > > >>>> @Xuyang: I'm open to adding hints. I think the important part is
> > that
> > > we
> > > >>>> have some method for the user to have a class definition level way
> > to
> > > >>>> define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I
> > > don't
> > > >>>> have a strong sense yet for what would be most appropriately
> exposed
> > > as
> > > >> a
> > > >>>> FunctionRequirement vs a simple configuration/hint.
> > > >>>>
> > > >>>> What about throwing an exception to make it clear to users that
> > using
> > > >> async
> > > >>>>> scalar functions in this situation
> > > >>>>> is problematic instead of executing silently in sync mode?
> Because
> > > >> users
> > > >>>>> may be confused about
> > > >>>>> the final actual job graph.
> > > >>>>
> > > >>>>
> > > >>>> @Xuyang: Would it make a difference if it were exposed by the
> > explain
> > > >>>> method (the operator having "syncMode" vs not)?  I'd be fine to do
> > it
> > > >>>> either way -- certainly throwing an error is a bit simpler.
> > > >>>>
> > > >>>> You are right. Actually it should be the planner that fully
> decides
> > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the
> > query
> > > >> is
> > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why
> > the
> > > >>>>> operator is not allowed to produce unordered results. By global
> > > >>>>> configuration, we can set ORDERED such that users don't get
> > confused
> > > >>>>> about the unordered output.
> > > >>>>
> > > >>>>
> > > >>>> @Timo: Is there an easy way to determine if the output of an async
> > > >> function
> > > >>>> would be problematic or not?  If the input to the operator is
> > > >> append-only,
> > > >>>> it seems fine, because this implies that each row is effectively
> > > >>>> independent and ordering is unimportant. For upsert mode with +U
> > rows,
> > > >> you
> > > >>>> wouldn't want to swap order with other +U rows for the same key
> > > because
> > > >> the
> > > >>>> last one should win.  For -D or -U rows, you wouldn't want to swap
> > > with
> > > >>>> other rows for the same key for similar reasons.  Is it as simple
> as
> > > >>>> looking for the changlelog mode to determine whether it's safe to
> > run
> > > >> async
> > > >>>> functions UNORDERED?  I had considered analyzing various query
> forms
> > > >> (join
> > > >>>> vs aggregation vs whatever), but it seems like changelog mode
> could
> > be
> > > >>>> sufficient to understand what works and what would be an issue.
> Any
> > > >> code
> > > >>>> pointers and explanation for similar analysis would be great to
> > > >> understand
> > > >>>> this more.
> > > >>>>
> > > >>>> The mode UNORDERED however should only have
> > > >>>>> effect for these simply use cases and throw an exception if
> > UNORDERED
> > > >>>>> would mess up a changelog or other subsequent operators.
> > > >>>>
> > > >>>> @Timo: Should we throw errors or run in sync mode?  It seems like
> > > >> running
> > > >>>> in sync mode is an option to ensure correctness in all changelog
> > > modes.
> > > >>>>
> > > >>>> Let's go with global configuration first and later introduce
> > > >>>>> hints. I feel the more hints we introduce, the harder SQL queries
> > get
> > > >>>>> when maintaining them.
> > > >>>>
> > > >>>> @Timo: That seems like a reasonable approach to me.
> > > >>>>
> > > >>>> -Alan
> > > >>>>
> > > >>>> [1]
> > > >>>>
> > > >>
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> > > >>>>
> > > >>>> On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org>
> > > >> wrote:
> > > >>>>
> > > >>>>> 1. Override the function `getRequirements` in
> `AsyncScalarFunction`
> > > >>>>>
> > > >>>>>    > If the user overrides `requirements()` to omit the `ORDERED`
> > > >>>>>    > requirement, do we allow the operator to return out-of-order
> > > >> results
> > > >>>>>    > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED`
> > type
> > > >>>>>    > behavior (where we allow out-of-order only if it's deemed
> > > correct)?
> > > >>>>>
> > > >>>>> You are right. Actually it should be the planner that fully
> decides
> > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the
> > query
> > > >> is
> > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why
> > the
> > > >>>>> operator is not allowed to produce unordered results. By global
> > > >>>>> configuration, we can set ORDERED such that users don't get
> > confused
> > > >>>>> about the unordered output. The mode UNORDERED however should
> only
> > > have
> > > >>>>> effect for these simply use cases and throw an exception if
> > UNORDERED
> > > >>>>> would mess up a changelog or other subsequent operators.
> > > >>>>>
> > > >>>>> 2. In some scenarios with semantic correctness, async operators
> > must
> > > be
> > > >>>>> executed in sync mode.
> > > >>>>>
> > > >>>>>    > What about throwing an exception to make it clear to users
> > that
> > > >> using
> > > >>>>> async scalar functions
> > > >>>>>
> > > >>>>> @Xuyang: A regular SQL user doesn't care whether the function is
> > sync
> > > >> or
> > > >>>>> async. The planner should simply give its best to make the
> > execution
> > > >>>>> performant. I would not throw an exception here. There more
> > > exceptions
> > > >>>>> the, the more struggles and questions from the user.
> Conceptually,
> > we
> > > >>>>> can run async code also sync, and that's why we should also do it
> > to
> > > >>>>> avoid errors.
> > > >>>>>
> > > >>>>> 3. Hints
> > > >>>>>
> > > >>>>> @Aitozi: Let's go with global configuration first and later
> > introduce
> > > >>>>> hints. I feel the more hints we introduce, the harder SQL queries
> > get
> > > >>>>> when maintaining them.
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Timo
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On 15.12.23 04:51, Xuyang wrote:
> > > >>>>>> Hi, Alan. Thanks for driving this.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Using async to improve throughput has been done on look join,
> and
> > > the
> > > >>>>> improvement
> > > >>>>>> effect is obvious, so I think it makes sense to support async
> > scalar
> > > >>>>> function.  Big +1 for this flip.
> > > >>>>>> I have some questions below.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 1. Override the function `getRequirements` in
> > `AsyncScalarFunction`
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> I’m just curious why you don’t use conf(global) and query
> > > >>>>> hint(individual async udx) to mark the output
> > > >>>>>> mode 'order' or 'unorder' like async look join [1] and async
> > > udtf[2],
> > > >>>>> but chose to introduce a new enum
> > > >>>>>> in AsyncScalarFunction.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 2. In some scenarios with semantic correctness, async operators
> > must
> > > >> be
> > > >>>>> executed in sync mode.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> What about throwing an exception to make it clear to users that
> > > using
> > > >>>>> async scalar functions in this situation
> > > >>>>>> is problematic instead of executing silently in sync mode?
> Because
> > > >> users
> > > >>>>> may be confused about
> > > >>>>>> the final actual job graph.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> > > >>>>>> [2]
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>>
> > > >>>>>>        Best!
> > > >>>>>>        Xuyang
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
> > > >>>>>>> Hi Alan,
> > > >>>>>>>       Nice FLIP, I also explore leveraging the async table
> > > >> function[1] to
> > > >>>>>>> improve the throughput before.
> > > >>>>>>>
> > > >>>>>>> About the configs, what do you think using hints as mentioned
> in
> > > [1].
> > > >>>>>>>
> > > >>>>>>> [1]:
> > > >>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Aitozi.
> > > >>>>>>>
> > > >>>>>>> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
> > > >>>>>>>
> > > >>>>>>>> Hi Alan,
> > > >>>>>>>>
> > > >>>>>>>> thanks for proposing this FLIP. It's a great addition to Flink
> > and
> > > >> has
> > > >>>>>>>> been requested multiple times. It will be in particular
> > > interesting
> > > >> for
> > > >>>>>>>> accessing REST endpoints and other remote services.
> > > >>>>>>>>
> > > >>>>>>>> Great that we can generalize and reuse parts of the Python
> > planner
> > > >>>>> rules
> > > >>>>>>>> and code for this.
> > > >>>>>>>>
> > > >>>>>>>> I have some feedback regarding the API:
> > > >>>>>>>>
> > > >>>>>>>> 1) Configuration
> > > >>>>>>>>
> > > >>>>>>>> Configuration keys like
> > > >>>>>>>>
> > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
> > > >>>>>>>>
> > > >>>>>>>> are currently not supported in the configuration stack. The
> key
> > > >> space
> > > >>>>>>>> should remain constant. Only a constant key space enables the
> > use
> > > of
> > > >>>>> the
> > > >>>>>>>> ConfigOption class which is required in the layered
> > configuration.
> > > >> For
> > > >>>>>>>> now I would suggest to only allow a global setting for buffer
> > > >> capacity,
> > > >>>>>>>> timeout, and retry-strategy. We can later work on a
> per-function
> > > >>>>>>>> configuration (potentially also needed for other use cases).
> > > >>>>>>>>
> > > >>>>>>>> 2) Semantical declaration
> > > >>>>>>>>
> > > >>>>>>>> Regarding
> > > >>>>>>>>
> > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
> > > >>>>>>>>
> > > >>>>>>>> this is a semantical property of a function and should be
> > defined
> > > >>>>>>>> per-function. It impacts the query result and potentially the
> > > >> behavior
> > > >>>>>>>> of planner rules.
> > > >>>>>>>>
> > > >>>>>>>> I see two options for this either: (a) an additional method in
> > > >>>>>>>> AsyncScalarFunction or (b) adding this to the function's
> > > >> requirements.
> > > >>>>> I
> > > >>>>>>>> vote for (b), because a FunctionDefinition should be fully
> self
> > > >>>>>>>> contained and sufficient for planning.
> > > >>>>>>>>
> > > >>>>>>>> Thus, for `FunctionDefinition.getRequirements():
> > > >>>>>>>> Set<FunctionRequirement>` we can add a new requirement
> `ORDERED`
> > > >> which
> > > >>>>>>>> should also be the default for AsyncScalarFunction.
> > > >> `getRequirements()`
> > > >>>>>>>> can be overwritten and return a set without this requirement
> if
> > > the
> > > >>>>> user
> > > >>>>>>>> intents to do this.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Timo
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 11.12.23 18:43, Piotr Nowojski wrote:
> > > >>>>>>>>> +1 to the idea, I don't have any comments.
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Piotrek
> > > >>>>>>>>>
> > > >>>>>>>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg <
> > asheinb...@confluent.io
> > > >>>>>>>> .invalid>
> > > >>>>>>>>> napisał(a):
> > > >>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Nicely written and makes sense.  The only feedback I have
> is
> > > >> around
> > > >>>>> the
> > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically,
> > > >>>>>>>> PythonCalcSplitRuleBase
> > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This
> > naming
> > > >>>>> seems
> > > >>>>>>>> to
> > > >>>>>>>>>>> imply/suggest that all Async functions are remote.  I
> wonder
> > if
> > > >> we
> > > >>>>> can
> > > >>>>>>>>>> find
> > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe
> > > >>>>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which
> > > handles
> > > >>>>>>>> Python
> > > >>>>>>>>>>> and Async functions seems reasonable.)
> > > >>>>>>>>>>>
> > > >>>>>>>>>> Thanks.  That's fair.  I agree that "Remote" isn't always
> > > >> accurate.
> > > >>>>> I
> > > >>>>>>>>>> believe that the python calls are also done asynchronously,
> so
> > > >> that
> > > >>>>>>>> might
> > > >>>>>>>>>> be a reasonable name, so long as there's no confusion
> between
> > > the
> > > >>>>> base
> > > >>>>>>>> and
> > > >>>>>>>>>> async child class.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
> > > >>>>> <jhug...@confluent.io.invalid
> > > >>>>>>>>>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi Alan,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Nicely written and makes sense.  The only feedback I have
> is
> > > >> around
> > > >>>>> the
> > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically,
> > > >>>>>>>> PythonCalcSplitRuleBase
> > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This
> > naming
> > > >>>>> seems
> > > >>>>>>>> to
> > > >>>>>>>>>>> imply/suggest that all Async functions are remote.  I
> wonder
> > if
> > > >> we
> > > >>>>> can
> > > >>>>>>>>>> find
> > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe
> > > >>>>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which
> > > handles
> > > >>>>>>>> Python
> > > >>>>>>>>>>> and Async functions seems reasonable.)
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Jim
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> > > >>>>>>>>>>> <asheinb...@confluent.io.invalid> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> I'd like to start a discussion of FLIP-400:
> > > AsyncScalarFunction
> > > >> for
> > > >>>>>>>>>>>> asynchronous scalar function support [1]
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> This feature proposes adding a new UDF type
> > > AsyncScalarFunction
> > > >>>>> which
> > > >>>>>>>>>> is
> > > >>>>>>>>>>>> invoked just like a normal ScalarFunction, but is
> > implemented
> > > >> with
> > > >>>>> an
> > > >>>>>>>>>>>> asynchronous eval method.  I had brought this up including
> > the
> > > >>>>>>>>>> motivation
> > > >>>>>>>>>>>> in a previous discussion thread [2].
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> The purpose is to achieve high throughput scalar function
> > UDFs
> > > >>>>> while
> > > >>>>>>>>>>>> allowing that an individual call may have high latency.
> It
> > > >> allows
> > > >>>>>>>>>>> scaling
> > > >>>>>>>>>>>> up the parallelism of just these calls without having to
> > > >> increase
> > > >>>>> the
> > > >>>>>>>>>>>> parallelism of the whole query (which could be rather
> > resource
> > > >>>>>>>>>>>> inefficient).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> In practice, it should enable SQL integration with
> external
> > > >>>>> services
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>> systems, which Flink has limited support for at the
> moment.
> > It
> > > >>>>> should
> > > >>>>>>>>>>> also
> > > >>>>>>>>>>>> allow easier integration with existing libraries which use
> > > >>>>>>>> asynchronous
> > > >>>>>>>>>>>> APIs.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Looking forward to your feedback and suggestions.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > >>>>>>>>>>>> <
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> [2]
> > > >>>>> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> > > >>>>>>>>>>>> <
> > > >> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>> Alan
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>
> > > >>>>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to