Hi Xuyang and Alan,
thanks for this productive discussion.
> Would it make a difference if it were exposed by the explain
@Alan: I think this is great idea. +1 on exposing the sync/async
behavior thought EXPLAIN.
> Is there an easy way to determine if the output of an async function
> would be problematic or not?
Clear "no" on this. Changelog semantics make the planner complex and we
need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.
> If the input to the operator is append-only, it seems fine, because
> this implies that each row is effectively independent and ordering is
> unimportant.
As @Xuyang pointed out, it's not only the input that decides whether
append-only is safe. It's also the subsequent operators in the pipeline.
The example of Xuyang is a good one, when the sink operates in upsert
mode. Append-only source, append-only operators, and append-only sink
are safer.
However, even in this combination, a row is not fully "independent"
there are still watermarks flowing between rows:
R(5), W(4), R(3), R(4), R(2), R(1), W(0)
So unordering should be fine *within* watermarks. This is also what
watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a timestamp is present in
StreamRecord on which it can work. But this is not the case within the
SQL engine.
TLDR: Let's focus on ORDERED first.
If we want to use UNORDERED, I would suggest to check the input operator
for exactly 1 time attribute column. If there is exactly 1 time
attribute column, we could insert it into the StreamRecord and allow
UNORDERED mode. If this condition is not met, we go with ORDERED.
Regards,
Timo
On 18.12.23 07:05, Xuyang wrote:
Hi, Alan and Timo. Thanks for your reply.
Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?
@Alan: I think this is a good way to tell the user what mode these async
udx are currently in.
A regular SQL user doesn't care whether the function is sync or async.
@Timo: I agree that the planner should throw as few exceptions as
possible rather than confusing users. So I think
it is a good way to expose syncMode through explain syntax.
If the input to the operator is append-only, it seems fine,
because this implies that each row is effectively independent and
ordering is unimportant.
For example, if the query is > an append-only `SELECT FUNC(c) FROM t`,
I don't see a reason why the > operator is not allowed to produce
unordered results.
@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka
as source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk
when writing to mysql. If async udx is executed
in an unordered mode, there may be problems with the data in mysql in
the end. In this case, we need to ensure that
the sink-based pk is in order actually.
--
Best!
Xuyang
At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io.INVALID>
wrote:
Thanks for the replies everyone. My responses are inline:
About the configs, what do you think using hints as mentioned in [1].
@Aitozi: I think hints could be a good way to do this, similar to lookup
joins or the proposal in FLIP-313. One benefit of hints is that they
allow
for the highest granularity of configuration because you can decide at
each and every call site just what parameters to use. The downside of
hints is that there's more syntax to learn and more verbosity. I'm
somewhat partial to a configuration like this with a class definition
level
of granularity (similar to how metrics reporters are defined [1]):
table.exec.async-scalar.myfunc.class:
org.apache.flink.MyAsyncScalarFunction
table.exec.async-scalar.myfunc.buffer-capacity: 10
...
As Timo mentioned, the downside to this is that there's not a nice
static
way to do this at the moment, unless you extend ConfigOption. It would
be
good ultimately if Lookup joins, async scalar functions, and other
future
configurable UDFs shared the same methodology, but maybe a unified
approach
is a followup discussion.
I’m just curious why you don’t use conf(global) and query
hint(individual
async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2],
but
chose to introduce a new enum
in AsyncScalarFunction.
@Xuyang: I'm open to adding hints. I think the important part is that we
have some method for the user to have a class definition level way to
define whether ORDERED or ALLOW_UNORDERED is most appropriate. I don't
have a strong sense yet for what would be most appropriately exposed as
a
FunctionRequirement vs a simple configuration/hint.
What about throwing an exception to make it clear to users that using
async
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because
users
may be confused about
the final actual job graph.
@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)? I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.
You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query
is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output.
@Timo: Is there an easy way to determine if the output of an async
function
would be problematic or not? If the input to the operator is
append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows,
you
wouldn't want to swap order with other +U rows for the same key because
the
last one should win. For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons. Is it as simple as
looking for the changlelog mode to determine whether it's safe to run
async
functions UNORDERED? I had considered analyzing various query forms
(join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue. Any
code
pointers and explanation for similar analysis would be great to
understand
this more.
The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.
@Timo: Should we throw errors or run in sync mode? It seems like
running
in sync mode is an option to ensure correctness in all changelog modes.
Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.
@Timo: That seems like a reasonable approach to me.
-Alan
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org>
wrote:
1. Override the function `getRequirements` in `AsyncScalarFunction`
> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order
results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?
You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query
is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output. The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.
2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.
> What about throwing an exception to make it clear to users that
using
async scalar functions
@Xuyang: A regular SQL user doesn't care whether the function is sync
or
async. The planner should simply give its best to make the execution
performant. I would not throw an exception here. There more exceptions
the, the more struggles and questions from the user. Conceptually, we
can run async code also sync, and that's why we should also do it to
avoid errors.
3. Hints
@Aitozi: Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.
Regards,
Timo
On 15.12.23 04:51, Xuyang wrote:
Hi, Alan. Thanks for driving this.
Using async to improve throughput has been done on look join, and the
improvement
effect is obvious, so I think it makes sense to support async scalar
function. Big +1 for this flip.
I have some questions below.
1. Override the function `getRequirements` in `AsyncScalarFunction`
I’m just curious why you don’t use conf(global) and query
hint(individual async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2],
but chose to introduce a new enum
in AsyncScalarFunction.
2. In some scenarios with semantic correctness, async operators must
be
executed in sync mode.
What about throwing an exception to make it clear to users that using
async scalar functions in this situation
is problematic instead of executing silently in sync mode? Because
users
may be confused about
the final actual job graph.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
--
Best!
Xuyang
在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
Hi Alan,
Nice FLIP, I also explore leveraging the async table
function[1] to
improve the throughput before.
About the configs, what do you think using hints as mentioned in [1].
[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
Thanks,
Aitozi.
Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
Hi Alan,
thanks for proposing this FLIP. It's a great addition to Flink and
has
been requested multiple times. It will be in particular interesting
for
accessing REST endpoints and other remote services.
Great that we can generalize and reuse parts of the Python planner
rules
and code for this.
I have some feedback regarding the API:
1) Configuration
Configuration keys like
`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
are currently not supported in the configuration stack. The key
space
should remain constant. Only a constant key space enables the use of
the
ConfigOption class which is required in the layered configuration.
For
now I would suggest to only allow a global setting for buffer
capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).
2) Semantical declaration
Regarding
`table.exec.async-scalar.catalog.db.func-name.output-mode`
this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the
behavior
of planner rules.
I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's
requirements.
I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.
Thus, for `FunctionDefinition.getRequirements():
Set<FunctionRequirement>` we can add a new requirement `ORDERED`
which
should also be the default for AsyncScalarFunction.
`getRequirements()`
can be overwritten and return a set without this requirement if the
user
intents to do this.
Thanks,
Timo
On 11.12.23 18:43, Piotr Nowojski wrote:
+1 to the idea, I don't have any comments.
Best,
Piotrek
czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
.invalid>
napisał(a):
Nicely written and makes sense. The only feedback I have is
around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming
seems
to
imply/suggest that all Async functions are remote. I wonder if
we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)
Thanks. That's fair. I agree that "Remote" isn't always
accurate.
I
believe that the python calls are also done asynchronously, so
that
might
be a reasonable name, so long as there's no confusion between the
base
and
async child class.
On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
<jhug...@confluent.io.invalid
wrote:
Hi Alan,
Nicely written and makes sense. The only feedback I have is
around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming
seems
to
imply/suggest that all Async functions are remote. I wonder if
we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)
Cheers,
Jim
On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:
I'd like to start a discussion of FLIP-400: AsyncScalarFunction
for
asynchronous scalar function support [1]
This feature proposes adding a new UDF type AsyncScalarFunction
which
is
invoked just like a normal ScalarFunction, but is implemented
with
an
asynchronous eval method. I had brought this up including the
motivation
in a previous discussion thread [2].
The purpose is to achieve high throughput scalar function UDFs
while
allowing that an individual call may have high latency. It
allows
scaling
up the parallelism of just these calls without having to
increase
the
parallelism of the whole query (which could be rather resource
inefficient).
In practice, it should enable SQL integration with external
services
and
systems, which Flink has limited support for at the moment. It
should
also
allow easier integration with existing libraries which use
asynchronous
APIs.
Looking forward to your feedback and suggestions.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
[2]
https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
<
https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
Thanks,
Alan