Zach created FLINK-38605:
----------------------------
Summary: Feature: Stateless Ordered Wait
Key: FLINK-38605
URL: https://issues.apache.org/jira/browse/FLINK-38605
Project: Flink
Issue Type: New Feature
Components: API / DataStream
Environment: *
Reporter: Zach
Attachments: image-2025-10-31-10-43-34-532.png,
image-2025-10-31-10-57-10-971.png
Today, Flink built-in async operations require storing [operator
state.|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L226]
This happens in the
[AsyncWaitOperator|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java]
level, meaning it applies to ALL async functions using the built-in Flink
async solution. This isn not user defined state and is fundamental to core
working of all Flink async functions today, whether those functions use
additional user state or not.
In my experience in enterprise, storing this kind of state is disruptive to
users, especially cross functional users just trying to data pipeline without
in-depth Flink experience. My claim is that a version of async can be
implemented There issues with the current approach in large enterprise
applications with cross-functional data pipeline partners are:
* *Schema Can't Change*
** The current async state implementation requires storing entire records in
Flink state
** Records in my enterprise are often [Flink Rows
|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/types/Row.html]or
strongly typed columnar storage.
** These rows have[ Flink
RowTypeInfo,|https://nightlies.apache.org/flink/flink-docs-release-2.1/api/java/org/apache/flink/api/java/typeutils/RowTypeInfo.html]
or information about each Column and its type and how to serialize and
deserialize them
** Users and cross functional partners makes schema (RowTypeInfo) changes all
the time when data pipelining. Making a schema change without a state migration
or replay results in an ambiguous "serialization failed" or "deserialization
failed" error message
** Users don't understand what's different. If they'd like to do a database
read, hit a Large Language Model (LLM), or do anything else stateful, its not
clear to users why they'd need to handle schema migrations differently. The
operations seem innocuous and don't lend themselves intuitively to significant
restrictions for future pipeline manipulations
* *Managing state is complicated*
** State requires assigning a unique unchanging operator ID to the the
operator, such that state can know where to restore.
** Large state, especially in external file system state storage, is
expensive. Storing entire rows in state can build to a large size relatively
fast. State gets written every checkpoint, and in exactly once mode this means
large processing delays.
** Users don't expect their seemingly simple async function to cause
processing delays or long checkpoints
* *There is risk of data loss*
** If a user changes an async operation, say they change the API endpoint
they'd like to hit, what do we with existing state? What if the pipeline hasn't
run for a week, so the data in state is a week old? Do we try to restore it,
meaning old data will be past to the new endpoint? Do we enable non-restored
state and change the operator UID to drop the old data resulting in data loss?
** We could ask users, but many are cross functional and aren't familiar with
the inner workings of Flink. If we asked them if they'd like to restore old
state, they'd be confused why such a question is even necessary to hit a
database.
* *Starting over is expensive*
** One way to fix the problem with state and schema changes is to discard all
progress and start reading over from the beginning. This way all stored state
will contain records with the correct schema
** Starting over is very expensive for users, especially with large data or
with high realtime uptime requirements
Given these constraints, I'd like to propose a *Stateless Async* operator that
achieves similar async processing to the existing async implementation, but
without storing any built-in state. This solution work's in my enterprise's
local fork and has proven effective.
*Why the existing solution needs state:*
Today, the existing solution processes elements by adding them [to a
queue|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L256],
then storing the [queue in Flink
state|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L300].
Here is the flow today:
!image-2025-10-31-10-43-34-532.png! As you can see, Flink state is needed today
because records can be *in-progress* (in the queue and being executed in a
separate thread) meaning not notated anywhere else in where standard Flink is
able to make data guarantees like exactly once.
When the record is in progress in the queue, from Flink's perspective that data
is {*}not{*}{*}{*}
* In an preceding input or output buffer.
** The record has been read out of the preceding buffers and the
"processElement" call has succeeded (meaning we stored the record in the queue
and called an async invoke)
** If the data was in a buffer *before* the operator, the checkpoint barrier
would not have arrived to this operator until we finished processing the row.
However, since we technically did finish processing the row (processElement
finished), we can recieve a checkpoint barrier before we are done processing
all rows that came before it
* In an downstream buffer or downstream operator
** The record is in progress in a separate thread and has not been collected
yet
For these reasons, in the existing implementation, we can't allow the
checkpoint barrier to processes without storing all in-progress records in
state, otherwise we risk a checkpoint succeeding, and async call failing, and
having no way easy way to backtrack and process the data the failed.
*My proposal:*
The current approach prevents us from losing data after a checkpoint by storing
all in progress data in state. There is another way to prevent losing data that
does not involve state, and that is halting the checkpoint until the queue
empties. Here is my proposed flow:
!image-2025-10-31-10-57-10-971.png! Why this works?
* Either the entire queue succeeds in a timely manner, or we fail the
checkpoint
* If the entire queue succeeds, it does so before the checkpoint barrier gets
processed. So all records will be in a downstream operator or a downstream
buffer BEFORE the checkpoint barrier, and since all downstream operators also
ensure all their records are processed before accepting a checkpoint barrier
(this is standard Flink behavior), we are sure they will reach the sinks of the
pipeline before this checkpoint is succeeded by every task.
* If any records in the queue fails (after async retry policy) or times out,
the entire checkpoint fails and we start over reading from the sources at the
next record after the last successful checkpoint, so we are guaranteed to
process each record into the output exactly once (if enabled).
How this solves the problems?
* Users can change schema (like add or remove a column) because the schema of
the queue in java memory doesn't require serialization and determined at
runtime each time on startup. There are no records in state that can have a
different schema than the new one, since there is no state
* No state means no complex state management, and no long checkpoint times due
to storing state in blob store
Cons of this approach:
* Large duration outliers are more expensive. If one request takes much longer
than the rest, it can hold up the entire pipeline, waiting for the one request
to succeed to allow the checkpoint barrier to pass. In practice, most
enterprise apps don't tend to have these outliers, but some might and should
probably stay using the existing async solution.
* More redundant computation of async requests. Currently, one failing request
does not cause all the ones nearby to be re-computer. In the proposed approach,
any single failure (failed all retries) in a checkpoint will cause all records
in that checkpoint to be recomputed. Usually not an issue, since things can
already get retried in normal Flink execution.
Final tentative recommendation:
Leave the existing async as an option, and implement a new stateless async.
Stateless async will have different tradeoffs, but is arguably better for large
enterprises with cross functional partners doing pipelining.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)