Zach created FLINK-38605:
----------------------------

             Summary: Feature: Stateless Ordered Wait 
                 Key: FLINK-38605
                 URL: https://issues.apache.org/jira/browse/FLINK-38605
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream
         Environment: *  
            Reporter: Zach
         Attachments: image-2025-10-31-10-43-34-532.png, 
image-2025-10-31-10-57-10-971.png

Today, Flink built-in async operations require storing [operator 
state.|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L226]
 This happens in the 
[AsyncWaitOperator|https://github.com/apache/flink/blob/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java]
 level, meaning it applies to ALL async functions using the built-in Flink 
async solution. This isn not user defined state and is fundamental to core 
working of all Flink async functions today, whether those functions use 
additional user state or not. 

In my experience in enterprise, storing this kind of state is disruptive to 
users, especially cross functional users just trying to data pipeline without 
in-depth Flink experience. My claim is that a version of async can be 
implemented There issues with the current approach in large enterprise 
applications with cross-functional data pipeline partners are:
 * *Schema Can't Change*
 ** The current async state implementation requires storing entire records in 
Flink state
 ** Records in my enterprise are often [Flink Rows 
|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/types/Row.html]or
 strongly typed columnar storage. 
 ** These rows have[ Flink 
RowTypeInfo,|https://nightlies.apache.org/flink/flink-docs-release-2.1/api/java/org/apache/flink/api/java/typeutils/RowTypeInfo.html]
 or information about each Column and its type and how to serialize and 
deserialize them 
 ** Users and cross functional partners makes schema (RowTypeInfo) changes all 
the time when data pipelining. Making a schema change without a state migration 
or replay results in an ambiguous "serialization failed" or "deserialization 
failed" error message 
 ** Users don't understand what's different. If they'd like to do a database 
read, hit a Large Language Model (LLM), or do anything else stateful, its not 
clear to users why they'd need to handle schema migrations differently. The 
operations seem innocuous and don't lend themselves intuitively to significant 
restrictions for future pipeline manipulations
 * *Managing state is complicated*
 ** State requires assigning a unique unchanging operator ID to the the 
operator, such that state can know where to restore. 
 ** Large state, especially in external file system state storage, is 
expensive. Storing entire rows in state can build to a large size relatively 
fast. State gets written every checkpoint, and in exactly once mode this means 
large processing delays. 
 ** Users don't expect their seemingly simple async function to cause 
processing delays or long checkpoints
 * *There is risk of data loss* 
 ** If a user changes an async operation, say they change the API endpoint 
they'd like to hit, what do we with existing state? What if the pipeline hasn't 
run for a week, so the data in state is a week old? Do we try to restore it, 
meaning old data will be past to the new endpoint? Do we enable non-restored 
state and change the operator UID to drop the old data resulting in data loss? 
 ** We could ask users, but many are cross functional and aren't familiar with 
the inner workings of Flink. If we asked them if they'd like to restore old 
state, they'd be confused why such a question is even necessary to hit a 
database. 
 * *Starting over is expensive* 
 ** One way to fix the problem with state and schema changes is to discard all 
progress and start reading over from the beginning. This way all stored state 
will contain records with the correct schema 
 ** Starting over is very expensive for users, especially with large data or 
with high realtime uptime requirements 

 

Given these constraints, I'd like to propose a *Stateless Async* operator that 
achieves similar async processing to the existing async implementation, but 
without storing any built-in state. This solution work's in my enterprise's 
local fork and has proven effective.

 

*Why the existing solution needs state:*

Today, the existing solution processes elements by adding them [to a 
queue|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L256],
 then storing the [queue in Flink 
state|https://github.com/apache/flink/blame/release-2.1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L300].
 

Here is the flow today:

!image-2025-10-31-10-43-34-532.png! As you can see, Flink state is needed today 
because records can be *in-progress* (in the queue and being executed in a 
separate thread) meaning not notated anywhere else in where standard Flink is 
able to make data guarantees like exactly once. 

 

When the record is in progress in the queue, from Flink's perspective that data 
is {*}not{*}{*}{*}
 * In an preceding input or output buffer.
 ** The record has been read out of the preceding buffers and the 
"processElement" call has succeeded (meaning we stored the record in the queue 
and called an async invoke)
 ** If the data was in a buffer *before* the operator, the checkpoint barrier 
would not have arrived to this operator until we finished processing the row. 
However, since we technically did finish processing the row (processElement 
finished), we can recieve a checkpoint barrier before we are done processing 
all rows that came before it 
 * In an downstream buffer or downstream operator 
 ** The record is in progress in a separate thread and has not been collected 
yet

For these reasons, in the existing implementation, we can't allow the 
checkpoint barrier to processes without storing all in-progress records in 
state, otherwise we risk a checkpoint succeeding, and async call failing, and 
having no way easy way to backtrack and process the data the failed. 

 

*My proposal:*

The current approach prevents us from losing data after a checkpoint by storing 
all in progress data in state. There is another way to prevent losing data that 
does not involve state, and that is halting the checkpoint until the queue 
empties. Here is my proposed flow:

!image-2025-10-31-10-57-10-971.png! Why this works?
 * Either the entire queue succeeds in a timely manner, or we fail the 
checkpoint 
 * If the entire queue succeeds, it does so before the checkpoint barrier gets 
processed. So all records will be in a downstream operator or a downstream 
buffer BEFORE the checkpoint barrier, and since all downstream operators also 
ensure all their records are processed before accepting a checkpoint barrier 
(this is standard Flink behavior), we are sure they will reach the sinks of the 
pipeline before this checkpoint is succeeded by every task. 
 * If any records in the queue fails (after async retry policy) or times out, 
the entire checkpoint fails and we start over reading from the sources at the 
next record after the last successful checkpoint, so we are guaranteed to 
process each record into the output exactly once (if enabled). 

How this solves the problems?
 * Users can change schema (like add or remove a column) because the schema of 
the queue in java memory doesn't require serialization and determined at 
runtime each time on startup. There are no records in state that can have a 
different schema than the new one, since there is no state 
 * No state means no complex state management, and no long checkpoint times due 
to storing state in blob store 

Cons of this approach:
 * Large duration outliers are more expensive. If one request takes much longer 
than the rest, it can hold up the entire pipeline, waiting for the one request 
to succeed to allow the checkpoint barrier to pass. In practice, most 
enterprise apps don't tend to have these outliers, but some might and should 
probably stay using the existing async solution. 
 * More redundant computation of async requests. Currently, one failing request 
does not cause all the ones nearby to be re-computer. In the proposed approach, 
any single failure (failed all retries) in a checkpoint will cause all records 
in that checkpoint to be recomputed. Usually not an issue, since things can 
already get retried in normal Flink execution.  

 

Final tentative recommendation:

Leave the existing async as an option, and implement a new stateless async. 
Stateless async will have different tradeoffs, but is arguably better for large 
enterprises with cross functional partners doing pipelining. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to