I'm delighted to see the progress on this. This is going to be a major enabler for some important use cases.
The proposed simplifications (global config and ordered mode) for V1 make a lot of sense to me. +1 David On Wed, Dec 20, 2023 at 12:31 PM Alan Sheinberg <asheinb...@confluent.io.invalid> wrote: > Thanks for that feedback Lincoln, > > Only one question with the async `timeout` parameter[1](since I > > haven't seen the POC code), current description is: 'The time which can > > pass before a restart strategy is triggered', > > but in the previous flip-232[2] and flip-234[3], in retry scenario, this > > timeout is the total time, do we keep the behavior of the parameter > > consistent? > > That's a good catch. I was intending to use *AsyncWaitOperator*, and to > pass this timeout directly. Looking through the code a bit, it appears > that it doesn't restart the timer on a retry, and this timeout is total, as > you're saying. I do intend on being consistent with the other FLIPs and > retaining this behavior, so I will update the wording on my FLIP to reflect > that. > > -Alan > > On Wed, Dec 20, 2023 at 1:36 AM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > > > +1 for this useful feature! > > Hope this reply isn't too late. Agree that we start with global > > async-scalar configuration and ordered mode first. > > > > @Alan Only one question with the async `timeout` parameter[1](since I > > haven't seen the POC code), current description is: 'The time which can > > pass before a restart strategy is triggered', > > but in the previous flip-232[2] and flip-234[3], in retry scenario, this > > timeout is the total time, do we keep the behavior of the parameter > > consistent? > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > > [2] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 > > [3] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > > > Best, > > Lincoln Lee > > > > > > Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年12月20日周三 08:41写道: > > > > > Thanks for the comments Timo. > > > > > > > > > > Can you remove the necessary parts? Esp.: > > > > > > @Override > > > > public Set<FunctionRequirement> getRequirements() { > > > > return Collections.singleton(FunctionRequirement.ORDERED); > > > > } > > > > > > > > > I removed this section from the FLIP since presumably, there's no use > in > > > adding to the public API if it's ignored, with handling just ORDERED > for > > > the first version. I'm not sure how quickly I'll want to add UNORDERED > > > support, but I guess I can always do another FLIP. > > > > > > Otherwise I have no objections to start a VOTE soonish. If others are > > > > fine as well? > > > > > > That would be great. Any areas that people are interested in > discussing > > > further before a vote? > > > > > > -Alan > > > > > > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther <twal...@apache.org> > wrote: > > > > > > > > I would be totally fine with the first version only having ORDERED > > > > > mode. For a v2, we could attempt to do the next most conservative > > > > > thing > > > > > > > > Sounds good to me. > > > > > > > > I also cheked AsyncWaitOperator and could not find n access of > > > > StreamRecord's timestamp but only watermarks. But as we said, let's > > > > focus on ORDERED first. > > > > > > > > Can you remove the necessary parts? Esp.: > > > > > > > > @Override > > > > public Set<FunctionRequirement> getRequirements() { > > > > return Collections.singleton(FunctionRequirement.ORDERED); > > > > } > > > > > > > > Otherwise I have no objections to start a VOTE soonish. If others are > > > > fine as well? > > > > > > > > Regards, > > > > Timo > > > > > > > > > > > > On 19.12.23 07:32, Alan Sheinberg wrote: > > > > > Thanks for the helpful comments, Xuyang and Timo. > > > > > > > > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take > > kafka > > > as > > > > >> source and mysql as sink as an example. > > > > >> Although kafka is an append-only source, one of its fields is used > > as > > > pk > > > > >> when writing to mysql. If async udx is executed > > > > >> in an unordered mode, there may be problems with the data in > mysql > > > in > > > > the > > > > >> end. In this case, we need to ensure that > > > > >> the sink-based pk is in order actually. > > > > > > > > > > > > > > > @Xuyang: That's a great point. If some node downstream of my > > operator > > > > > cares about ordering, there's no way for it to reconstruct the > > original > > > > > ordering of the rows as they were input to my operator. So even if > > > they > > > > > want to preserve ordering by key, the order in which they see it > may > > > > > already be incorrect. Somehow I thought that maybe the analysis of > > the > > > > > changelog mode at a given operator was aware of downstream > > operations, > > > > but > > > > > it seems not. > > > > > > > > > > Clear "no" on this. Changelog semantics make the planner complex > and > > we > > > > >> need to be careful. Therefore I would strongly suggest we > introduce > > > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for > > it > > > in > > > > >> plans with appropriate planner rules that guard it. > > > > > > > > > > > > > > > @Timo: The better I understand the complexity, the more I agree > with > > > > this. > > > > > I would be totally fine with the first version only having ORDERED > > > mode. > > > > > For a v2, we could attempt to do the next most conservative thing > and > > > > only > > > > > allow UNORDERED when the whole graph is in *INSERT *changelog mode. > > > The > > > > > next best type of optimization might understand what's the key > > required > > > > > downstream, and allow breaking the original order only between > > > unrelated > > > > > keys, but maintaining it between rows of the same key. Of course > if > > > the > > > > > key used downstream is computed in some manner, that makes it all > the > > > > > harder to know this beforehand. > > > > > > > > > > So unordering should be fine *within* watermarks. This is also what > > > > >> watermarks are good for, a trade-off between strict ordering and > > > making > > > > >> progress. The async operator from DataStream API also supports > this > > > if I > > > > >> remember correctly. However, it assumes a timestamp is present in > > > > >> StreamRecord on which it can work. But this is not the case within > > the > > > > >> SQL engine. > > > > > > > > > > > > > > > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the > > > > implementations > > > > > I plan on using) seem to support exactly this behavior. I don't > > think > > > it > > > > > makes assumptions about the record's timestamp, but just preserves > > > > whatever > > > > > the input order is w.r.t watermarks. I'd be curious to understand > > the > > > > > timestamp use in more detail and see if it's required with the > > > mentioned > > > > > classes. > > > > > > > > > > TLDR: Let's focus on ORDERED first. > > > > > > > > > > > > > > > I'm more than happy to start here and we can consider UNORDERED as > a > > > > > followup. Then maybe we consider only INSERT mode graphs and ones > > > where > > > > we > > > > > can solve the watermark constraints. > > > > > > > > > > Thanks, > > > > > Alan > > > > > > > > > > > > > > > On Mon, Dec 18, 2023 at 2:36 AM Timo Walther <twal...@apache.org> > > > wrote: > > > > > > > > > >> Hi Xuyang and Alan, > > > > >> > > > > >> thanks for this productive discussion. > > > > >> > > > > >> > Would it make a difference if it were exposed by the explain > > > > >> > > > > >> @Alan: I think this is great idea. +1 on exposing the sync/async > > > > >> behavior thought EXPLAIN. > > > > >> > > > > >> > > > > >> > Is there an easy way to determine if the output of an async > > > function > > > > >> > would be problematic or not? > > > > >> > > > > >> Clear "no" on this. Changelog semantics make the planner complex > and > > > we > > > > >> need to be careful. Therefore I would strongly suggest we > introduce > > > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for > > it > > > in > > > > >> plans with appropriate planner rules that guard it. > > > > >> > > > > >> > If the input to the operator is append-only, it seems fine, > > > because > > > > >> > this implies that each row is effectively independent and > > ordering > > > > is > > > > >> > unimportant. > > > > >> > > > > >> As @Xuyang pointed out, it's not only the input that decides > whether > > > > >> append-only is safe. It's also the subsequent operators in the > > > pipeline. > > > > >> The example of Xuyang is a good one, when the sink operates in > > upsert > > > > >> mode. Append-only source, append-only operators, and append-only > > sink > > > > >> are safer. > > > > >> > > > > >> However, even in this combination, a row is not fully > "independent" > > > > >> there are still watermarks flowing between rows: > > > > >> > > > > >> R(5), W(4), R(3), R(4), R(2), R(1), W(0) > > > > >> > > > > >> So unordering should be fine *within* watermarks. This is also > what > > > > >> watermarks are good for, a trade-off between strict ordering and > > > making > > > > >> progress. The async operator from DataStream API also supports > this > > > if I > > > > >> remember correctly. However, it assumes a timestamp is present in > > > > >> StreamRecord on which it can work. But this is not the case within > > the > > > > >> SQL engine. > > > > >> > > > > >> TLDR: Let's focus on ORDERED first. > > > > >> > > > > >> If we want to use UNORDERED, I would suggest to check the input > > > operator > > > > >> for exactly 1 time attribute column. If there is exactly 1 time > > > > >> attribute column, we could insert it into the StreamRecord and > allow > > > > >> UNORDERED mode. If this condition is not met, we go with ORDERED. > > > > >> > > > > >> Regards, > > > > >> Timo > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On 18.12.23 07:05, Xuyang wrote: > > > > >>> Hi, Alan and Timo. Thanks for your reply. > > > > >>>> Would it make a difference if it were exposed by the explain > > > > >>>> method (the operator having "syncMode" vs not)? > > > > >>> @Alan: I think this is a good way to tell the user what mode > these > > > > async > > > > >> udx are currently in. > > > > >>>> A regular SQL user doesn't care whether the function is sync or > > > async. > > > > >>> @Timo: I agree that the planner should throw as few exceptions as > > > > >> possible rather than confusing users. So I think > > > > >>> it is a good way to expose syncMode through explain syntax. > > > > >>>> If the input to the operator is append-only, it seems fine, > > > > >>>> because this implies that each row is effectively independent > and > > > > >> ordering is unimportant. > > > > >>> > > > > >>> > > > > >>>> For example, if the query is > an append-only `SELECT FUNC(c) > FROM > > > t`, > > > > >>>> I don't see a reason why the > operator is not allowed to > produce > > > > >> unordered results. > > > > >>> > > > > >>> > > > > >>> @Timo, @Alan: IIUC, there seems to be something wrong here. Take > > > kafka > > > > >> as source and mysql as sink as an example. > > > > >>> Although kafka is an append-only source, one of its fields is > used > > as > > > > pk > > > > >> when writing to mysql. If async udx is executed > > > > >>> in an unordered mode, there may be problems with the data in > > mysql > > > > in > > > > >> the end. In this case, we need to ensure that > > > > >>> the sink-based pk is in order actually. > > > > >>> > > > > >>> > > > > >>> > > > > >>> -- > > > > >>> > > > > >>> Best! > > > > >>> Xuyang > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> At 2023-12-16 03:33:55, "Alan Sheinberg" < > asheinb...@confluent.io > > > > .INVALID> > > > > >> wrote: > > > > >>>> Thanks for the replies everyone. My responses are inline: > > > > >>>> > > > > >>>> About the configs, what do you think using hints as mentioned in > > > [1]. > > > > >>>> > > > > >>>> @Aitozi: I think hints could be a good way to do this, similar > to > > > > lookup > > > > >>>> joins or the proposal in FLIP-313. One benefit of hints is that > > > they > > > > >> allow > > > > >>>> for the highest granularity of configuration because you can > > decide > > > at > > > > >>>> each and every call site just what parameters to use. The > > downside > > > of > > > > >>>> hints is that there's more syntax to learn and more verbosity. > > I'm > > > > >>>> somewhat partial to a configuration like this with a class > > > definition > > > > >> level > > > > >>>> of granularity (similar to how metrics reporters are defined > [1]): > > > > >>>> > > > > >>>> table.exec.async-scalar.myfunc.class: > > > > >> org.apache.flink.MyAsyncScalarFunction > > > > >>>> table.exec.async-scalar.myfunc.buffer-capacity: 10 > > > > >>>> ... > > > > >>>> > > > > >>>> As Timo mentioned, the downside to this is that there's not a > nice > > > > >> static > > > > >>>> way to do this at the moment, unless you extend ConfigOption. > It > > > > would > > > > >> be > > > > >>>> good ultimately if Lookup joins, async scalar functions, and > other > > > > >> future > > > > >>>> configurable UDFs shared the same methodology, but maybe a > unified > > > > >> approach > > > > >>>> is a followup discussion. > > > > >>>> > > > > >>>> I’m just curious why you don’t use conf(global) and query > > > > >> hint(individual > > > > >>>>> async udx) to mark the output > > > > >>>>> mode 'order' or 'unorder' like async look join [1] and async > > > udtf[2], > > > > >> but > > > > >>>>> chose to introduce a new enum > > > > >>>>> in AsyncScalarFunction. > > > > >>>> > > > > >>>> > > > > >>>> @Xuyang: I'm open to adding hints. I think the important part is > > > that > > > > we > > > > >>>> have some method for the user to have a class definition level > way > > > to > > > > >>>> define whether ORDERED or ALLOW_UNORDERED is most appropriate. > I > > > > don't > > > > >>>> have a strong sense yet for what would be most appropriately > > exposed > > > > as > > > > >> a > > > > >>>> FunctionRequirement vs a simple configuration/hint. > > > > >>>> > > > > >>>> What about throwing an exception to make it clear to users that > > > using > > > > >> async > > > > >>>>> scalar functions in this situation > > > > >>>>> is problematic instead of executing silently in sync mode? > > Because > > > > >> users > > > > >>>>> may be confused about > > > > >>>>> the final actual job graph. > > > > >>>> > > > > >>>> > > > > >>>> @Xuyang: Would it make a difference if it were exposed by the > > > explain > > > > >>>> method (the operator having "syncMode" vs not)? I'd be fine to > do > > > it > > > > >>>> either way -- certainly throwing an error is a bit simpler. > > > > >>>> > > > > >>>> You are right. Actually it should be the planner that fully > > decides > > > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the > > > query > > > > >> is > > > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason > why > > > the > > > > >>>>> operator is not allowed to produce unordered results. By global > > > > >>>>> configuration, we can set ORDERED such that users don't get > > > confused > > > > >>>>> about the unordered output. > > > > >>>> > > > > >>>> > > > > >>>> @Timo: Is there an easy way to determine if the output of an > async > > > > >> function > > > > >>>> would be problematic or not? If the input to the operator is > > > > >> append-only, > > > > >>>> it seems fine, because this implies that each row is effectively > > > > >>>> independent and ordering is unimportant. For upsert mode with +U > > > rows, > > > > >> you > > > > >>>> wouldn't want to swap order with other +U rows for the same key > > > > because > > > > >> the > > > > >>>> last one should win. For -D or -U rows, you wouldn't want to > swap > > > > with > > > > >>>> other rows for the same key for similar reasons. Is it as > simple > > as > > > > >>>> looking for the changlelog mode to determine whether it's safe > to > > > run > > > > >> async > > > > >>>> functions UNORDERED? I had considered analyzing various query > > forms > > > > >> (join > > > > >>>> vs aggregation vs whatever), but it seems like changelog mode > > could > > > be > > > > >>>> sufficient to understand what works and what would be an issue. > > Any > > > > >> code > > > > >>>> pointers and explanation for similar analysis would be great to > > > > >> understand > > > > >>>> this more. > > > > >>>> > > > > >>>> The mode UNORDERED however should only have > > > > >>>>> effect for these simply use cases and throw an exception if > > > UNORDERED > > > > >>>>> would mess up a changelog or other subsequent operators. > > > > >>>> > > > > >>>> @Timo: Should we throw errors or run in sync mode? It seems > like > > > > >> running > > > > >>>> in sync mode is an option to ensure correctness in all changelog > > > > modes. > > > > >>>> > > > > >>>> Let's go with global configuration first and later introduce > > > > >>>>> hints. I feel the more hints we introduce, the harder SQL > queries > > > get > > > > >>>>> when maintaining them. > > > > >>>> > > > > >>>> @Timo: That seems like a reasonable approach to me. > > > > >>>> > > > > >>>> -Alan > > > > >>>> > > > > >>>> [1] > > > > >>>> > > > > >> > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/ > > > > >>>> > > > > >>>> On Fri, Dec 15, 2023 at 2:56 AM Timo Walther < > twal...@apache.org> > > > > >> wrote: > > > > >>>> > > > > >>>>> 1. Override the function `getRequirements` in > > `AsyncScalarFunction` > > > > >>>>> > > > > >>>>> > If the user overrides `requirements()` to omit the > `ORDERED` > > > > >>>>> > requirement, do we allow the operator to return > out-of-order > > > > >> results > > > > >>>>> > or should it fall back on > `AsyncOutputMode.ALLOW_UNORDERED` > > > type > > > > >>>>> > behavior (where we allow out-of-order only if it's deemed > > > > correct)? > > > > >>>>> > > > > >>>>> You are right. Actually it should be the planner that fully > > decides > > > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the > > > query > > > > >> is > > > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason > why > > > the > > > > >>>>> operator is not allowed to produce unordered results. By global > > > > >>>>> configuration, we can set ORDERED such that users don't get > > > confused > > > > >>>>> about the unordered output. The mode UNORDERED however should > > only > > > > have > > > > >>>>> effect for these simply use cases and throw an exception if > > > UNORDERED > > > > >>>>> would mess up a changelog or other subsequent operators. > > > > >>>>> > > > > >>>>> 2. In some scenarios with semantic correctness, async operators > > > must > > > > be > > > > >>>>> executed in sync mode. > > > > >>>>> > > > > >>>>> > What about throwing an exception to make it clear to users > > > that > > > > >> using > > > > >>>>> async scalar functions > > > > >>>>> > > > > >>>>> @Xuyang: A regular SQL user doesn't care whether the function > is > > > sync > > > > >> or > > > > >>>>> async. The planner should simply give its best to make the > > > execution > > > > >>>>> performant. I would not throw an exception here. There more > > > > exceptions > > > > >>>>> the, the more struggles and questions from the user. > > Conceptually, > > > we > > > > >>>>> can run async code also sync, and that's why we should also do > it > > > to > > > > >>>>> avoid errors. > > > > >>>>> > > > > >>>>> 3. Hints > > > > >>>>> > > > > >>>>> @Aitozi: Let's go with global configuration first and later > > > introduce > > > > >>>>> hints. I feel the more hints we introduce, the harder SQL > queries > > > get > > > > >>>>> when maintaining them. > > > > >>>>> > > > > >>>>> Regards, > > > > >>>>> Timo > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> On 15.12.23 04:51, Xuyang wrote: > > > > >>>>>> Hi, Alan. Thanks for driving this. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Using async to improve throughput has been done on look join, > > and > > > > the > > > > >>>>> improvement > > > > >>>>>> effect is obvious, so I think it makes sense to support async > > > scalar > > > > >>>>> function. Big +1 for this flip. > > > > >>>>>> I have some questions below. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> 1. Override the function `getRequirements` in > > > `AsyncScalarFunction` > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> I’m just curious why you don’t use conf(global) and query > > > > >>>>> hint(individual async udx) to mark the output > > > > >>>>>> mode 'order' or 'unorder' like async look join [1] and async > > > > udtf[2], > > > > >>>>> but chose to introduce a new enum > > > > >>>>>> in AsyncScalarFunction. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> 2. In some scenarios with semantic correctness, async > operators > > > must > > > > >> be > > > > >>>>> executed in sync mode. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> What about throwing an exception to make it clear to users > that > > > > using > > > > >>>>> async scalar functions in this situation > > > > >>>>>> is problematic instead of executing silently in sync mode? > > Because > > > > >> users > > > > >>>>> may be confused about > > > > >>>>>> the final actual job graph. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> [1] > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > > > >>>>>> [2] > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> -- > > > > >>>>>> > > > > >>>>>> Best! > > > > >>>>>> Xuyang > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道: > > > > >>>>>>> Hi Alan, > > > > >>>>>>> Nice FLIP, I also explore leveraging the async table > > > > >> function[1] to > > > > >>>>>>> improve the throughput before. > > > > >>>>>>> > > > > >>>>>>> About the configs, what do you think using hints as mentioned > > in > > > > [1]. > > > > >>>>>>> > > > > >>>>>>> [1]: > > > > >>>>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > > > >>>>>>> > > > > >>>>>>> Thanks, > > > > >>>>>>> Aitozi. > > > > >>>>>>> > > > > >>>>>>> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道: > > > > >>>>>>> > > > > >>>>>>>> Hi Alan, > > > > >>>>>>>> > > > > >>>>>>>> thanks for proposing this FLIP. It's a great addition to > Flink > > > and > > > > >> has > > > > >>>>>>>> been requested multiple times. It will be in particular > > > > interesting > > > > >> for > > > > >>>>>>>> accessing REST endpoints and other remote services. > > > > >>>>>>>> > > > > >>>>>>>> Great that we can generalize and reuse parts of the Python > > > planner > > > > >>>>> rules > > > > >>>>>>>> and code for this. > > > > >>>>>>>> > > > > >>>>>>>> I have some feedback regarding the API: > > > > >>>>>>>> > > > > >>>>>>>> 1) Configuration > > > > >>>>>>>> > > > > >>>>>>>> Configuration keys like > > > > >>>>>>>> > > > > >>>>>>>> > `table.exec.async-scalar.catalog.db.func-name.buffer-capacity` > > > > >>>>>>>> > > > > >>>>>>>> are currently not supported in the configuration stack. The > > key > > > > >> space > > > > >>>>>>>> should remain constant. Only a constant key space enables > the > > > use > > > > of > > > > >>>>> the > > > > >>>>>>>> ConfigOption class which is required in the layered > > > configuration. > > > > >> For > > > > >>>>>>>> now I would suggest to only allow a global setting for > buffer > > > > >> capacity, > > > > >>>>>>>> timeout, and retry-strategy. We can later work on a > > per-function > > > > >>>>>>>> configuration (potentially also needed for other use cases). > > > > >>>>>>>> > > > > >>>>>>>> 2) Semantical declaration > > > > >>>>>>>> > > > > >>>>>>>> Regarding > > > > >>>>>>>> > > > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.output-mode` > > > > >>>>>>>> > > > > >>>>>>>> this is a semantical property of a function and should be > > > defined > > > > >>>>>>>> per-function. It impacts the query result and potentially > the > > > > >> behavior > > > > >>>>>>>> of planner rules. > > > > >>>>>>>> > > > > >>>>>>>> I see two options for this either: (a) an additional method > in > > > > >>>>>>>> AsyncScalarFunction or (b) adding this to the function's > > > > >> requirements. > > > > >>>>> I > > > > >>>>>>>> vote for (b), because a FunctionDefinition should be fully > > self > > > > >>>>>>>> contained and sufficient for planning. > > > > >>>>>>>> > > > > >>>>>>>> Thus, for `FunctionDefinition.getRequirements(): > > > > >>>>>>>> Set<FunctionRequirement>` we can add a new requirement > > `ORDERED` > > > > >> which > > > > >>>>>>>> should also be the default for AsyncScalarFunction. > > > > >> `getRequirements()` > > > > >>>>>>>> can be overwritten and return a set without this requirement > > if > > > > the > > > > >>>>> user > > > > >>>>>>>> intents to do this. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Thanks, > > > > >>>>>>>> Timo > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On 11.12.23 18:43, Piotr Nowojski wrote: > > > > >>>>>>>>> +1 to the idea, I don't have any comments. > > > > >>>>>>>>> > > > > >>>>>>>>> Best, > > > > >>>>>>>>> Piotrek > > > > >>>>>>>>> > > > > >>>>>>>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg < > > > asheinb...@confluent.io > > > > >>>>>>>> .invalid> > > > > >>>>>>>>> napisał(a): > > > > >>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Nicely written and makes sense. The only feedback I have > > is > > > > >> around > > > > >>>>> the > > > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically, > > > > >>>>>>>> PythonCalcSplitRuleBase > > > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase." This > > > naming > > > > >>>>> seems > > > > >>>>>>>> to > > > > >>>>>>>>>>> imply/suggest that all Async functions are remote. I > > wonder > > > if > > > > >> we > > > > >>>>> can > > > > >>>>>>>>>> find > > > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe > > > > >>>>>>>>>>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which > > > > handles > > > > >>>>>>>> Python > > > > >>>>>>>>>>> and Async functions seems reasonable.) > > > > >>>>>>>>>>> > > > > >>>>>>>>>> Thanks. That's fair. I agree that "Remote" isn't always > > > > >> accurate. > > > > >>>>> I > > > > >>>>>>>>>> believe that the python calls are also done > asynchronously, > > so > > > > >> that > > > > >>>>>>>> might > > > > >>>>>>>>>> be a reasonable name, so long as there's no confusion > > between > > > > the > > > > >>>>> base > > > > >>>>>>>> and > > > > >>>>>>>>>> async child class. > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes > > > > >>>>> <jhug...@confluent.io.invalid > > > > >>>>>>>>> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hi Alan, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Nicely written and makes sense. The only feedback I have > > is > > > > >> around > > > > >>>>> the > > > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically, > > > > >>>>>>>> PythonCalcSplitRuleBase > > > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase." This > > > naming > > > > >>>>> seems > > > > >>>>>>>> to > > > > >>>>>>>>>>> imply/suggest that all Async functions are remote. I > > wonder > > > if > > > > >> we > > > > >>>>> can > > > > >>>>>>>>>> find > > > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe > > > > >>>>>>>>>>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which > > > > handles > > > > >>>>>>>> Python > > > > >>>>>>>>>>> and Async functions seems reasonable.) > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Cheers, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Jim > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg > > > > >>>>>>>>>>> <asheinb...@confluent.io.invalid> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> I'd like to start a discussion of FLIP-400: > > > > AsyncScalarFunction > > > > >> for > > > > >>>>>>>>>>>> asynchronous scalar function support [1] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> This feature proposes adding a new UDF type > > > > AsyncScalarFunction > > > > >>>>> which > > > > >>>>>>>>>> is > > > > >>>>>>>>>>>> invoked just like a normal ScalarFunction, but is > > > implemented > > > > >> with > > > > >>>>> an > > > > >>>>>>>>>>>> asynchronous eval method. I had brought this up > including > > > the > > > > >>>>>>>>>> motivation > > > > >>>>>>>>>>>> in a previous discussion thread [2]. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> The purpose is to achieve high throughput scalar > function > > > UDFs > > > > >>>>> while > > > > >>>>>>>>>>>> allowing that an individual call may have high latency. > > It > > > > >> allows > > > > >>>>>>>>>>> scaling > > > > >>>>>>>>>>>> up the parallelism of just these calls without having to > > > > >> increase > > > > >>>>> the > > > > >>>>>>>>>>>> parallelism of the whole query (which could be rather > > > resource > > > > >>>>>>>>>>>> inefficient). > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> In practice, it should enable SQL integration with > > external > > > > >>>>> services > > > > >>>>>>>>>> and > > > > >>>>>>>>>>>> systems, which Flink has limited support for at the > > moment. > > > It > > > > >>>>> should > > > > >>>>>>>>>>> also > > > > >>>>>>>>>>>> allow easier integration with existing libraries which > use > > > > >>>>>>>> asynchronous > > > > >>>>>>>>>>>> APIs. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Looking forward to your feedback and suggestions. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> [1] > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > > > > >>>>>>>>>>>> < > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> [2] > > > > >>>>> > https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs > > > > >>>>>>>>>>>> < > > > > >> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>> Alan > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > >