Thanks for the replies everyone. My responses are inline:
About the configs, what do you think using hints as mentioned in [1].
@Aitozi: I think hints could be a good way to do this, similar to lookup
joins or the proposal in FLIP-313. One benefit of hints is that they allow
for the highest granularity of configuration because you can decide at
each and every call site just what parameters to use. The downside of
hints is that there's more syntax to learn and more verbosity. I'm
somewhat partial to a configuration like this with a class definition level
of granularity (similar to how metrics reporters are defined [1]):
table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
table.exec.async-scalar.myfunc.buffer-capacity: 10
...
As Timo mentioned, the downside to this is that there's not a nice static
way to do this at the moment, unless you extend ConfigOption. It would be
good ultimately if Lookup joins, async scalar functions, and other future
configurable UDFs shared the same methodology, but maybe a unified approach
is a followup discussion.
I’m just curious why you don’t use conf(global) and query hint(individual
async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
chose to introduce a new enum
in AsyncScalarFunction.
@Xuyang: I'm open to adding hints. I think the important part is that we
have some method for the user to have a class definition level way to
define whether ORDERED or ALLOW_UNORDERED is most appropriate. I don't
have a strong sense yet for what would be most appropriately exposed as a
FunctionRequirement vs a simple configuration/hint.
What about throwing an exception to make it clear to users that using async
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.
@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)? I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.
You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output.
@Timo: Is there an easy way to determine if the output of an async function
would be problematic or not? If the input to the operator is append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows, you
wouldn't want to swap order with other +U rows for the same key because the
last one should win. For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons. Is it as simple as
looking for the changlelog mode to determine whether it's safe to run async
functions UNORDERED? I had considered analyzing various query forms (join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue. Any code
pointers and explanation for similar analysis would be great to understand
this more.
The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.
@Timo: Should we throw errors or run in sync mode? It seems like running
in sync mode is an option to ensure correctness in all changelog modes.
Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.
@Timo: That seems like a reasonable approach to me.
-Alan
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org> wrote:
1. Override the function `getRequirements` in `AsyncScalarFunction`
> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?
You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output. The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.
2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.
> What about throwing an exception to make it clear to users that using
async scalar functions
@Xuyang: A regular SQL user doesn't care whether the function is sync or
async. The planner should simply give its best to make the execution
performant. I would not throw an exception here. There more exceptions
the, the more struggles and questions from the user. Conceptually, we
can run async code also sync, and that's why we should also do it to
avoid errors.
3. Hints
@Aitozi: Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.
Regards,
Timo
On 15.12.23 04:51, Xuyang wrote:
Hi, Alan. Thanks for driving this.
Using async to improve throughput has been done on look join, and the
improvement
effect is obvious, so I think it makes sense to support async scalar
function. Big +1 for this flip.
I have some questions below.
1. Override the function `getRequirements` in `AsyncScalarFunction`
I’m just curious why you don’t use conf(global) and query
hint(individual async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2],
but chose to introduce a new enum
in AsyncScalarFunction.
2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.
What about throwing an exception to make it clear to users that using
async scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
--
Best!
Xuyang
在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.
About the configs, what do you think using hints as mentioned in [1].
[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
Thanks,
Aitozi.
Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
Hi Alan,
thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.
Great that we can generalize and reuse parts of the Python planner
rules
and code for this.
I have some feedback regarding the API:
1) Configuration
Configuration keys like
`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of
the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).
2) Semantical declaration
Regarding
`table.exec.async-scalar.catalog.db.func-name.output-mode`
this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.
I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements.
I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.
Thus, for `FunctionDefinition.getRequirements():
Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the
user
intents to do this.
Thanks,
Timo
On 11.12.23 18:43, Piotr Nowojski wrote:
+1 to the idea, I don't have any comments.
Best,
Piotrek
czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
.invalid>
napisał(a):
Nicely written and makes sense. The only feedback I have is around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming
seems
to
imply/suggest that all Async functions are remote. I wonder if we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)
Thanks. That's fair. I agree that "Remote" isn't always accurate.
I
believe that the python calls are also done asynchronously, so that
might
be a reasonable name, so long as there's no confusion between the
base
and
async child class.
On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
<jhug...@confluent.io.invalid
wrote:
Hi Alan,
Nicely written and makes sense. The only feedback I have is around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming
seems
to
imply/suggest that all Async functions are remote. I wonder if we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)
Cheers,
Jim
On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:
I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]
This feature proposes adding a new UDF type AsyncScalarFunction
which
is
invoked just like a normal ScalarFunction, but is implemented with
an
asynchronous eval method. I had brought this up including the
motivation
in a previous discussion thread [2].
The purpose is to achieve high throughput scalar function UDFs
while
allowing that an individual call may have high latency. It allows
scaling
up the parallelism of just these calls without having to increase
the
parallelism of the whole query (which could be rather resource
inefficient).
In practice, it should enable SQL integration with external
services
and
systems, which Flink has limited support for at the moment. It
should
also
allow easier integration with existing libraries which use
asynchronous
APIs.
Looking forward to your feedback and suggestions.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
[2]
https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
<https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
Thanks,
Alan