+1 for this useful feature! Hope this reply isn't too late. Agree that we start with global async-scalar configuration and ordered mode first.
@Alan Only one question with the async `timeout` parameter[1](since I haven't seen the POC code), current description is: 'The time which can pass before a restart strategy is triggered', but in the previous flip-232[2] and flip-234[3], in retry scenario, this timeout is the total time, do we keep the behavior of the parameter consistent? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems Best, Lincoln Lee Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年12月20日周三 08:41写道: > Thanks for the comments Timo. > > > > Can you remove the necessary parts? Esp.: > > @Override > > public Set<FunctionRequirement> getRequirements() { > > return Collections.singleton(FunctionRequirement.ORDERED); > > } > > > I removed this section from the FLIP since presumably, there's no use in > adding to the public API if it's ignored, with handling just ORDERED for > the first version. I'm not sure how quickly I'll want to add UNORDERED > support, but I guess I can always do another FLIP. > > Otherwise I have no objections to start a VOTE soonish. If others are > > fine as well? > > That would be great. Any areas that people are interested in discussing > further before a vote? > > -Alan > > On Tue, Dec 19, 2023 at 5:49 AM Timo Walther <twal...@apache.org> wrote: > > > > I would be totally fine with the first version only having ORDERED > > > mode. For a v2, we could attempt to do the next most conservative > > > thing > > > > Sounds good to me. > > > > I also cheked AsyncWaitOperator and could not find n access of > > StreamRecord's timestamp but only watermarks. But as we said, let's > > focus on ORDERED first. > > > > Can you remove the necessary parts? Esp.: > > > > @Override > > public Set<FunctionRequirement> getRequirements() { > > return Collections.singleton(FunctionRequirement.ORDERED); > > } > > > > Otherwise I have no objections to start a VOTE soonish. If others are > > fine as well? > > > > Regards, > > Timo > > > > > > On 19.12.23 07:32, Alan Sheinberg wrote: > > > Thanks for the helpful comments, Xuyang and Timo. > > > > > > @Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka > as > > >> source and mysql as sink as an example. > > >> Although kafka is an append-only source, one of its fields is used as > pk > > >> when writing to mysql. If async udx is executed > > >> in an unordered mode, there may be problems with the data in mysql > in > > the > > >> end. In this case, we need to ensure that > > >> the sink-based pk is in order actually. > > > > > > > > > @Xuyang: That's a great point. If some node downstream of my operator > > > cares about ordering, there's no way for it to reconstruct the original > > > ordering of the rows as they were input to my operator. So even if > they > > > want to preserve ordering by key, the order in which they see it may > > > already be incorrect. Somehow I thought that maybe the analysis of the > > > changelog mode at a given operator was aware of downstream operations, > > but > > > it seems not. > > > > > > Clear "no" on this. Changelog semantics make the planner complex and we > > >> need to be careful. Therefore I would strongly suggest we introduce > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for it > in > > >> plans with appropriate planner rules that guard it. > > > > > > > > > @Timo: The better I understand the complexity, the more I agree with > > this. > > > I would be totally fine with the first version only having ORDERED > mode. > > > For a v2, we could attempt to do the next most conservative thing and > > only > > > allow UNORDERED when the whole graph is in *INSERT *changelog mode. > The > > > next best type of optimization might understand what's the key required > > > downstream, and allow breaking the original order only between > unrelated > > > keys, but maintaining it between rows of the same key. Of course if > the > > > key used downstream is computed in some manner, that makes it all the > > > harder to know this beforehand. > > > > > > So unordering should be fine *within* watermarks. This is also what > > >> watermarks are good for, a trade-off between strict ordering and > making > > >> progress. The async operator from DataStream API also supports this > if I > > >> remember correctly. However, it assumes a timestamp is present in > > >> StreamRecord on which it can work. But this is not the case within the > > >> SQL engine. > > > > > > > > > *AsyncWaitOperator* and *UnorderedStreamElementQueue* (the > > implementations > > > I plan on using) seem to support exactly this behavior. I don't think > it > > > makes assumptions about the record's timestamp, but just preserves > > whatever > > > the input order is w.r.t watermarks. I'd be curious to understand the > > > timestamp use in more detail and see if it's required with the > mentioned > > > classes. > > > > > > TLDR: Let's focus on ORDERED first. > > > > > > > > > I'm more than happy to start here and we can consider UNORDERED as a > > > followup. Then maybe we consider only INSERT mode graphs and ones > where > > we > > > can solve the watermark constraints. > > > > > > Thanks, > > > Alan > > > > > > > > > On Mon, Dec 18, 2023 at 2:36 AM Timo Walther <twal...@apache.org> > wrote: > > > > > >> Hi Xuyang and Alan, > > >> > > >> thanks for this productive discussion. > > >> > > >> > Would it make a difference if it were exposed by the explain > > >> > > >> @Alan: I think this is great idea. +1 on exposing the sync/async > > >> behavior thought EXPLAIN. > > >> > > >> > > >> > Is there an easy way to determine if the output of an async > function > > >> > would be problematic or not? > > >> > > >> Clear "no" on this. Changelog semantics make the planner complex and > we > > >> need to be careful. Therefore I would strongly suggest we introduce > > >> ORDERED and slowly enable UNORDERED whenever we see a good fit for it > in > > >> plans with appropriate planner rules that guard it. > > >> > > >> > If the input to the operator is append-only, it seems fine, > because > > >> > this implies that each row is effectively independent and ordering > > is > > >> > unimportant. > > >> > > >> As @Xuyang pointed out, it's not only the input that decides whether > > >> append-only is safe. It's also the subsequent operators in the > pipeline. > > >> The example of Xuyang is a good one, when the sink operates in upsert > > >> mode. Append-only source, append-only operators, and append-only sink > > >> are safer. > > >> > > >> However, even in this combination, a row is not fully "independent" > > >> there are still watermarks flowing between rows: > > >> > > >> R(5), W(4), R(3), R(4), R(2), R(1), W(0) > > >> > > >> So unordering should be fine *within* watermarks. This is also what > > >> watermarks are good for, a trade-off between strict ordering and > making > > >> progress. The async operator from DataStream API also supports this > if I > > >> remember correctly. However, it assumes a timestamp is present in > > >> StreamRecord on which it can work. But this is not the case within the > > >> SQL engine. > > >> > > >> TLDR: Let's focus on ORDERED first. > > >> > > >> If we want to use UNORDERED, I would suggest to check the input > operator > > >> for exactly 1 time attribute column. If there is exactly 1 time > > >> attribute column, we could insert it into the StreamRecord and allow > > >> UNORDERED mode. If this condition is not met, we go with ORDERED. > > >> > > >> Regards, > > >> Timo > > >> > > >> > > >> > > >> > > >> On 18.12.23 07:05, Xuyang wrote: > > >>> Hi, Alan and Timo. Thanks for your reply. > > >>>> Would it make a difference if it were exposed by the explain > > >>>> method (the operator having "syncMode" vs not)? > > >>> @Alan: I think this is a good way to tell the user what mode these > > async > > >> udx are currently in. > > >>>> A regular SQL user doesn't care whether the function is sync or > async. > > >>> @Timo: I agree that the planner should throw as few exceptions as > > >> possible rather than confusing users. So I think > > >>> it is a good way to expose syncMode through explain syntax. > > >>>> If the input to the operator is append-only, it seems fine, > > >>>> because this implies that each row is effectively independent and > > >> ordering is unimportant. > > >>> > > >>> > > >>>> For example, if the query is > an append-only `SELECT FUNC(c) FROM > t`, > > >>>> I don't see a reason why the > operator is not allowed to produce > > >> unordered results. > > >>> > > >>> > > >>> @Timo, @Alan: IIUC, there seems to be something wrong here. Take > kafka > > >> as source and mysql as sink as an example. > > >>> Although kafka is an append-only source, one of its fields is used as > > pk > > >> when writing to mysql. If async udx is executed > > >>> in an unordered mode, there may be problems with the data in mysql > > in > > >> the end. In this case, we need to ensure that > > >>> the sink-based pk is in order actually. > > >>> > > >>> > > >>> > > >>> -- > > >>> > > >>> Best! > > >>> Xuyang > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io > > .INVALID> > > >> wrote: > > >>>> Thanks for the replies everyone. My responses are inline: > > >>>> > > >>>> About the configs, what do you think using hints as mentioned in > [1]. > > >>>> > > >>>> @Aitozi: I think hints could be a good way to do this, similar to > > lookup > > >>>> joins or the proposal in FLIP-313. One benefit of hints is that > they > > >> allow > > >>>> for the highest granularity of configuration because you can decide > at > > >>>> each and every call site just what parameters to use. The downside > of > > >>>> hints is that there's more syntax to learn and more verbosity. I'm > > >>>> somewhat partial to a configuration like this with a class > definition > > >> level > > >>>> of granularity (similar to how metrics reporters are defined [1]): > > >>>> > > >>>> table.exec.async-scalar.myfunc.class: > > >> org.apache.flink.MyAsyncScalarFunction > > >>>> table.exec.async-scalar.myfunc.buffer-capacity: 10 > > >>>> ... > > >>>> > > >>>> As Timo mentioned, the downside to this is that there's not a nice > > >> static > > >>>> way to do this at the moment, unless you extend ConfigOption. It > > would > > >> be > > >>>> good ultimately if Lookup joins, async scalar functions, and other > > >> future > > >>>> configurable UDFs shared the same methodology, but maybe a unified > > >> approach > > >>>> is a followup discussion. > > >>>> > > >>>> I’m just curious why you don’t use conf(global) and query > > >> hint(individual > > >>>>> async udx) to mark the output > > >>>>> mode 'order' or 'unorder' like async look join [1] and async > udtf[2], > > >> but > > >>>>> chose to introduce a new enum > > >>>>> in AsyncScalarFunction. > > >>>> > > >>>> > > >>>> @Xuyang: I'm open to adding hints. I think the important part is > that > > we > > >>>> have some method for the user to have a class definition level way > to > > >>>> define whether ORDERED or ALLOW_UNORDERED is most appropriate. I > > don't > > >>>> have a strong sense yet for what would be most appropriately exposed > > as > > >> a > > >>>> FunctionRequirement vs a simple configuration/hint. > > >>>> > > >>>> What about throwing an exception to make it clear to users that > using > > >> async > > >>>>> scalar functions in this situation > > >>>>> is problematic instead of executing silently in sync mode? Because > > >> users > > >>>>> may be confused about > > >>>>> the final actual job graph. > > >>>> > > >>>> > > >>>> @Xuyang: Would it make a difference if it were exposed by the > explain > > >>>> method (the operator having "syncMode" vs not)? I'd be fine to do > it > > >>>> either way -- certainly throwing an error is a bit simpler. > > >>>> > > >>>> You are right. Actually it should be the planner that fully decides > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the > query > > >> is > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why > the > > >>>>> operator is not allowed to produce unordered results. By global > > >>>>> configuration, we can set ORDERED such that users don't get > confused > > >>>>> about the unordered output. > > >>>> > > >>>> > > >>>> @Timo: Is there an easy way to determine if the output of an async > > >> function > > >>>> would be problematic or not? If the input to the operator is > > >> append-only, > > >>>> it seems fine, because this implies that each row is effectively > > >>>> independent and ordering is unimportant. For upsert mode with +U > rows, > > >> you > > >>>> wouldn't want to swap order with other +U rows for the same key > > because > > >> the > > >>>> last one should win. For -D or -U rows, you wouldn't want to swap > > with > > >>>> other rows for the same key for similar reasons. Is it as simple as > > >>>> looking for the changlelog mode to determine whether it's safe to > run > > >> async > > >>>> functions UNORDERED? I had considered analyzing various query forms > > >> (join > > >>>> vs aggregation vs whatever), but it seems like changelog mode could > be > > >>>> sufficient to understand what works and what would be an issue. Any > > >> code > > >>>> pointers and explanation for similar analysis would be great to > > >> understand > > >>>> this more. > > >>>> > > >>>> The mode UNORDERED however should only have > > >>>>> effect for these simply use cases and throw an exception if > UNORDERED > > >>>>> would mess up a changelog or other subsequent operators. > > >>>> > > >>>> @Timo: Should we throw errors or run in sync mode? It seems like > > >> running > > >>>> in sync mode is an option to ensure correctness in all changelog > > modes. > > >>>> > > >>>> Let's go with global configuration first and later introduce > > >>>>> hints. I feel the more hints we introduce, the harder SQL queries > get > > >>>>> when maintaining them. > > >>>> > > >>>> @Timo: That seems like a reasonable approach to me. > > >>>> > > >>>> -Alan > > >>>> > > >>>> [1] > > >>>> > > >> > > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/ > > >>>> > > >>>> On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org> > > >> wrote: > > >>>> > > >>>>> 1. Override the function `getRequirements` in `AsyncScalarFunction` > > >>>>> > > >>>>> > If the user overrides `requirements()` to omit the `ORDERED` > > >>>>> > requirement, do we allow the operator to return out-of-order > > >> results > > >>>>> > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` > type > > >>>>> > behavior (where we allow out-of-order only if it's deemed > > correct)? > > >>>>> > > >>>>> You are right. Actually it should be the planner that fully decides > > >>>>> whether ORDERED or UNORDERED is safe to do. For example, if the > query > > >> is > > >>>>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why > the > > >>>>> operator is not allowed to produce unordered results. By global > > >>>>> configuration, we can set ORDERED such that users don't get > confused > > >>>>> about the unordered output. The mode UNORDERED however should only > > have > > >>>>> effect for these simply use cases and throw an exception if > UNORDERED > > >>>>> would mess up a changelog or other subsequent operators. > > >>>>> > > >>>>> 2. In some scenarios with semantic correctness, async operators > must > > be > > >>>>> executed in sync mode. > > >>>>> > > >>>>> > What about throwing an exception to make it clear to users > that > > >> using > > >>>>> async scalar functions > > >>>>> > > >>>>> @Xuyang: A regular SQL user doesn't care whether the function is > sync > > >> or > > >>>>> async. The planner should simply give its best to make the > execution > > >>>>> performant. I would not throw an exception here. There more > > exceptions > > >>>>> the, the more struggles and questions from the user. Conceptually, > we > > >>>>> can run async code also sync, and that's why we should also do it > to > > >>>>> avoid errors. > > >>>>> > > >>>>> 3. Hints > > >>>>> > > >>>>> @Aitozi: Let's go with global configuration first and later > introduce > > >>>>> hints. I feel the more hints we introduce, the harder SQL queries > get > > >>>>> when maintaining them. > > >>>>> > > >>>>> Regards, > > >>>>> Timo > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On 15.12.23 04:51, Xuyang wrote: > > >>>>>> Hi, Alan. Thanks for driving this. > > >>>>>> > > >>>>>> > > >>>>>> Using async to improve throughput has been done on look join, and > > the > > >>>>> improvement > > >>>>>> effect is obvious, so I think it makes sense to support async > scalar > > >>>>> function. Big +1 for this flip. > > >>>>>> I have some questions below. > > >>>>>> > > >>>>>> > > >>>>>> 1. Override the function `getRequirements` in > `AsyncScalarFunction` > > >>>>>> > > >>>>>> > > >>>>>> I’m just curious why you don’t use conf(global) and query > > >>>>> hint(individual async udx) to mark the output > > >>>>>> mode 'order' or 'unorder' like async look join [1] and async > > udtf[2], > > >>>>> but chose to introduce a new enum > > >>>>>> in AsyncScalarFunction. > > >>>>>> > > >>>>>> > > >>>>>> 2. In some scenarios with semantic correctness, async operators > must > > >> be > > >>>>> executed in sync mode. > > >>>>>> > > >>>>>> > > >>>>>> What about throwing an exception to make it clear to users that > > using > > >>>>> async scalar functions in this situation > > >>>>>> is problematic instead of executing silently in sync mode? Because > > >> users > > >>>>> may be confused about > > >>>>>> the final actual job graph. > > >>>>>> > > >>>>>> > > >>>>>> [1] > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems > > >>>>>> [2] > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> > > >>>>>> Best! > > >>>>>> Xuyang > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道: > > >>>>>>> Hi Alan, > > >>>>>>> Nice FLIP, I also explore leveraging the async table > > >> function[1] to > > >>>>>>> improve the throughput before. > > >>>>>>> > > >>>>>>> About the configs, what do you think using hints as mentioned in > > [1]. > > >>>>>>> > > >>>>>>> [1]: > > >>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Aitozi. > > >>>>>>> > > >>>>>>> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道: > > >>>>>>> > > >>>>>>>> Hi Alan, > > >>>>>>>> > > >>>>>>>> thanks for proposing this FLIP. It's a great addition to Flink > and > > >> has > > >>>>>>>> been requested multiple times. It will be in particular > > interesting > > >> for > > >>>>>>>> accessing REST endpoints and other remote services. > > >>>>>>>> > > >>>>>>>> Great that we can generalize and reuse parts of the Python > planner > > >>>>> rules > > >>>>>>>> and code for this. > > >>>>>>>> > > >>>>>>>> I have some feedback regarding the API: > > >>>>>>>> > > >>>>>>>> 1) Configuration > > >>>>>>>> > > >>>>>>>> Configuration keys like > > >>>>>>>> > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity` > > >>>>>>>> > > >>>>>>>> are currently not supported in the configuration stack. The key > > >> space > > >>>>>>>> should remain constant. Only a constant key space enables the > use > > of > > >>>>> the > > >>>>>>>> ConfigOption class which is required in the layered > configuration. > > >> For > > >>>>>>>> now I would suggest to only allow a global setting for buffer > > >> capacity, > > >>>>>>>> timeout, and retry-strategy. We can later work on a per-function > > >>>>>>>> configuration (potentially also needed for other use cases). > > >>>>>>>> > > >>>>>>>> 2) Semantical declaration > > >>>>>>>> > > >>>>>>>> Regarding > > >>>>>>>> > > >>>>>>>> `table.exec.async-scalar.catalog.db.func-name.output-mode` > > >>>>>>>> > > >>>>>>>> this is a semantical property of a function and should be > defined > > >>>>>>>> per-function. It impacts the query result and potentially the > > >> behavior > > >>>>>>>> of planner rules. > > >>>>>>>> > > >>>>>>>> I see two options for this either: (a) an additional method in > > >>>>>>>> AsyncScalarFunction or (b) adding this to the function's > > >> requirements. > > >>>>> I > > >>>>>>>> vote for (b), because a FunctionDefinition should be fully self > > >>>>>>>> contained and sufficient for planning. > > >>>>>>>> > > >>>>>>>> Thus, for `FunctionDefinition.getRequirements(): > > >>>>>>>> Set<FunctionRequirement>` we can add a new requirement `ORDERED` > > >> which > > >>>>>>>> should also be the default for AsyncScalarFunction. > > >> `getRequirements()` > > >>>>>>>> can be overwritten and return a set without this requirement if > > the > > >>>>> user > > >>>>>>>> intents to do this. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> Timo > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 11.12.23 18:43, Piotr Nowojski wrote: > > >>>>>>>>> +1 to the idea, I don't have any comments. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Piotrek > > >>>>>>>>> > > >>>>>>>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg < > asheinb...@confluent.io > > >>>>>>>> .invalid> > > >>>>>>>>> napisał(a): > > >>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Nicely written and makes sense. The only feedback I have is > > >> around > > >>>>> the > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically, > > >>>>>>>> PythonCalcSplitRuleBase > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase." This > naming > > >>>>> seems > > >>>>>>>> to > > >>>>>>>>>>> imply/suggest that all Async functions are remote. I wonder > if > > >> we > > >>>>> can > > >>>>>>>>>> find > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe > > >>>>>>>>>>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which > > handles > > >>>>>>>> Python > > >>>>>>>>>>> and Async functions seems reasonable.) > > >>>>>>>>>>> > > >>>>>>>>>> Thanks. That's fair. I agree that "Remote" isn't always > > >> accurate. > > >>>>> I > > >>>>>>>>>> believe that the python calls are also done asynchronously, so > > >> that > > >>>>>>>> might > > >>>>>>>>>> be a reasonable name, so long as there's no confusion between > > the > > >>>>> base > > >>>>>>>> and > > >>>>>>>>>> async child class. > > >>>>>>>>>> > > >>>>>>>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes > > >>>>> <jhug...@confluent.io.invalid > > >>>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Alan, > > >>>>>>>>>>> > > >>>>>>>>>>> Nicely written and makes sense. The only feedback I have is > > >> around > > >>>>> the > > >>>>>>>>>>> naming of the generalization, e.g. "Specifically, > > >>>>>>>> PythonCalcSplitRuleBase > > >>>>>>>>>>> will be generalized into RemoteCalcSplitRuleBase." This > naming > > >>>>> seems > > >>>>>>>> to > > >>>>>>>>>>> imply/suggest that all Async functions are remote. I wonder > if > > >> we > > >>>>> can > > >>>>>>>>>> find > > >>>>>>>>>>> another name which doesn't carry that connotation; maybe > > >>>>>>>>>>> AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which > > handles > > >>>>>>>> Python > > >>>>>>>>>>> and Async functions seems reasonable.) > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> > > >>>>>>>>>>> Jim > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg > > >>>>>>>>>>> <asheinb...@confluent.io.invalid> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> I'd like to start a discussion of FLIP-400: > > AsyncScalarFunction > > >> for > > >>>>>>>>>>>> asynchronous scalar function support [1] > > >>>>>>>>>>>> > > >>>>>>>>>>>> This feature proposes adding a new UDF type > > AsyncScalarFunction > > >>>>> which > > >>>>>>>>>> is > > >>>>>>>>>>>> invoked just like a normal ScalarFunction, but is > implemented > > >> with > > >>>>> an > > >>>>>>>>>>>> asynchronous eval method. I had brought this up including > the > > >>>>>>>>>> motivation > > >>>>>>>>>>>> in a previous discussion thread [2]. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The purpose is to achieve high throughput scalar function > UDFs > > >>>>> while > > >>>>>>>>>>>> allowing that an individual call may have high latency. It > > >> allows > > >>>>>>>>>>> scaling > > >>>>>>>>>>>> up the parallelism of just these calls without having to > > >> increase > > >>>>> the > > >>>>>>>>>>>> parallelism of the whole query (which could be rather > resource > > >>>>>>>>>>>> inefficient). > > >>>>>>>>>>>> > > >>>>>>>>>>>> In practice, it should enable SQL integration with external > > >>>>> services > > >>>>>>>>>> and > > >>>>>>>>>>>> systems, which Flink has limited support for at the moment. > It > > >>>>> should > > >>>>>>>>>>> also > > >>>>>>>>>>>> allow easier integration with existing libraries which use > > >>>>>>>> asynchronous > > >>>>>>>>>>>> APIs. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Looking forward to your feedback and suggestions. > > >>>>>>>>>>>> > > >>>>>>>>>>>> [1] > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > > >>>>>>>>>>>> < > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> [2] > > >>>>> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs > > >>>>>>>>>>>> < > > >> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs> > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks, > > >>>>>>>>>>>> Alan > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>> > > >>>>> > > >> > > >> > > > > > > > >