Hi Xuyang and Alan,

thanks for this productive discussion.

> Would it make a difference if it were exposed by the explain

@Alan: I think this is great idea. +1 on exposing the sync/async behavior thought EXPLAIN.


> Is there an easy way to determine if the output of an async function
> would be problematic or not?

Clear "no" on this. Changelog semantics make the planner complex and we need to be careful. Therefore I would strongly suggest we introduce ORDERED and slowly enable UNORDERED whenever we see a good fit for it in plans with appropriate planner rules that guard it.

> If the input to the operator is append-only, it seems fine, because
> this implies that each row is effectively independent and ordering is
> unimportant.

As @Xuyang pointed out, it's not only the input that decides whether append-only is safe. It's also the subsequent operators in the pipeline. The example of Xuyang is a good one, when the sink operates in upsert mode. Append-only source, append-only operators, and append-only sink are safer.

However, even in this combination, a row is not fully "independent" there are still watermarks flowing between rows:

R(5), W(4), R(3), R(4), R(2), R(1), W(0)

So unordering should be fine *within* watermarks. This is also what watermarks are good for, a trade-off between strict ordering and making progress. The async operator from DataStream API also supports this if I remember correctly. However, it assumes a timestamp is present in StreamRecord on which it can work. But this is not the case within the SQL engine.

TLDR: Let's focus on ORDERED first.

If we want to use UNORDERED, I would suggest to check the input operator for exactly 1 time attribute column. If there is exactly 1 time attribute column, we could insert it into the StreamRecord and allow UNORDERED mode. If this condition is not met, we go with ORDERED.

Regards,
Timo




On 18.12.23 07:05, Xuyang wrote:
Hi, Alan and Timo. Thanks for your reply.
Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?
@Alan: I think this is a good way to tell the user what mode these async udx 
are currently in.
A regular SQL user doesn't care whether the function is sync or async.
@Timo: I agree that the planner should throw as few exceptions as possible 
rather than confusing users. So I think
it is a good way to expose syncMode through explain syntax.
If the input to the operator is append-only, it seems fine,
because this implies that each row is effectively independent and ordering is 
unimportant.


For example, if the query is > an append-only `SELECT FUNC(c) FROM t`,
I don't see a reason why the > operator is not allowed to produce unordered 
results.


@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as 
source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk when 
writing to mysql. If async udx is executed
  in an unordered mode, there may be problems with the data in mysql in the 
end. In this case, we need to ensure that
the sink-based pk is in order actually.



--

     Best!
     Xuyang





At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io.INVALID> 
wrote:
Thanks for the replies everyone.  My responses are inline:

About the configs, what do you think using hints as mentioned in [1].

@Aitozi: I think hints could be a good way to do this, similar to lookup
joins or the proposal in FLIP-313.  One benefit of hints is that they allow
for the highest granularity of configuration because you can decide at
each and every call site just what parameters to use.  The downside of
hints is that there's more syntax to learn and more verbosity.  I'm
somewhat partial to a configuration like this with a class definition level
of granularity (similar to how metrics reporters are defined [1]):

table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
table.exec.async-scalar.myfunc.buffer-capacity: 10
...

As Timo mentioned, the downside to this is that there's not a nice static
way to do this at the moment, unless you extend ConfigOption.  It would be
good ultimately if Lookup joins, async scalar functions, and other future
configurable UDFs shared the same methodology, but maybe a unified approach
is a followup discussion.

I’m just curious why you don’t use conf(global) and query hint(individual
async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
chose to introduce a new enum
in AsyncScalarFunction.


@Xuyang: I'm open to adding hints. I think the important part is that we
have some method for the user to have a class definition level way to
define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
have a strong sense yet for what would be most appropriately exposed as a
FunctionRequirement vs a simple configuration/hint.

What about throwing an exception to make it clear to users that using async
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.


@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?  I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.

You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output.


@Timo: Is there an easy way to determine if the output of an async function
would be problematic or not?  If the input to the operator is append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows, you
wouldn't want to swap order with other +U rows for the same key because the
last one should win.  For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons.  Is it as simple as
looking for the changlelog mode to determine whether it's safe to run async
functions UNORDERED?  I had considered analyzing various query forms (join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue.  Any code
pointers and explanation for similar analysis would be great to understand
this more.

The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.

@Timo: Should we throw errors or run in sync mode?  It seems like running
in sync mode is an option to ensure correctness in all changelog modes.

Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.

@Timo: That seems like a reasonable approach to me.

-Alan

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/

On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org> wrote:

1. Override the function `getRequirements` in `AsyncScalarFunction`

  > If the user overrides `requirements()` to omit the `ORDERED`
  > requirement, do we allow the operator to return out-of-order results
  > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
  > behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output. The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.

2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.

  > What about throwing an exception to make it clear to users that using
async scalar functions

@Xuyang: A regular SQL user doesn't care whether the function is sync or
async. The planner should simply give its best to make the execution
performant. I would not throw an exception here. There more exceptions
the, the more struggles and questions from the user. Conceptually, we
can run async code also sync, and that's why we should also do it to
avoid errors.

3. Hints

@Aitozi: Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.

Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:
Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the
improvement
effect is obvious, so I think it makes sense to support async scalar
function.  Big +1 for this flip.
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query
hint(individual async udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2],
but chose to introduce a new enum
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.


What about throwing an exception to make it clear to users that using
async scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

      Best!
      Xuyang





在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
Hi Alan,
     Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:

Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.

Great that we can generalize and reuse parts of the Python planner
rules
and code for this.

I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of
the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.

I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements.
I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the
user
intents to do this.


Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:
+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
.invalid>
napisał(a):


Nicely written and makes sense.  The only feedback I have is around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming
seems
to
imply/suggest that all Async functions are remote.  I wonder if we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)

Thanks.  That's fair.  I agree that "Remote" isn't always accurate.
I
believe that the python calls are also done asynchronously, so that
might
be a reasonable name, so long as there's no confusion between the
base
and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
<jhug...@confluent.io.invalid

wrote:

Hi Alan,

Nicely written and makes sense.  The only feedback I have is around
the
naming of the generalization, e.g. "Specifically,
PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming
seems
to
imply/suggest that all Async functions are remote.  I wonder if we
can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:

I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]

This feature proposes adding a new UDF type AsyncScalarFunction
which
is
invoked just like a normal ScalarFunction, but is implemented with
an
asynchronous eval method.  I had brought this up including the
motivation
in a previous discussion thread [2].

The purpose is to achieve high throughput scalar function UDFs
while
allowing that an individual call may have high latency.  It allows
scaling
up the parallelism of just these calls without having to increase
the
parallelism of the whole query (which could be rather resource
inefficient).

In practice, it should enable SQL integration with external
services
and
systems, which Flink has limited support for at the moment. It
should
also
allow easier integration with existing libraries which use
asynchronous
APIs.

Looking forward to your feedback and suggestions.

[1]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
<




https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support


[2]
https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
<https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>

Thanks,
Alan









Reply via email to