GitHub user Sxnan created a discussion: Support Fine-Grain Durable Execution
## Motivation
### Async Execution
Flink Agents currently supports async execution for Python Actions. With
`execute_async`, Actions can submit functions to a thread pool when handling
high-latency I/O, allowing the operator to yield execution. This enables
non-blocking progress within the same key sequence and parallel execution
across multiple Actions. The current Python async execution API is as follows:
```python
class RunnerContext(ABC):
@abstractmethod
def execute_async(self, func: Callable[[Any], Any], *args: Tuple[Any, ...],
**kwargs: Dict[str, Any]) -> Any:
"""Asynchronously execute the provided function. Access to memory is
prohibited within the function.
Parameters
----------
func : Callable
The function need to be asynchronously processing.
*args : tuple
Positional arguments to pass to the function.
**kwargs : dict
Keyword arguments to pass to the function.
Returns:
-------
Any
The result of the function.
"""
```
Through this API, an Action is divided into multiple code blocks (i.e., a
single function call wrapped by `execute` or `execute_async`); the code block
becomes the minimum granularity for Action execution and scheduling.
### Per-Action State Consistency
Per-Action State Consistency works by writing Memory changes after an Action
completes to the ActionStateStore. When a job fails over and recovers from a
checkpoint, it can replay data from the ActionStateStore to avoid re-executing
Actions.
### The Problem
The minimum granularity of these two mechanisms is inconsistent: recovery and
deduplication operate at the "Action" level, while execution and scheduling
operate at the "code block" level. When an async code block has completed but
the Action hasn't finished and a failover occurs, that code block will be
re-executed upon recovery, causing:
* Duplicate external calls (potential side effects)
* Unnecessary waiting and costs
To address this, we want to reduce the granularity of consistency and recovery
to the "code block" level. This introduces fine-grained durable execution:
record and persist the return result of each call to `execute_async` (and
synchronous `execute`); during job recovery, if the same call is encountered
(same function and arguments, same call order), directly return the stored
result to avoid re-execution. This mechanism requires calls to be deterministic
(same inputs and order) and that the function body doesn't access Memory,
ensuring results can be cached and replayed.
## Public Interface
### Update RunnerContext Interface
This section enhances the existing `execute_async` and introduces the `execute`
interface for `RunnerContext`: when a job recovers from a checkpoint and
encounters the "same call" as before the failure (same function and arguments,
same call order, same key and sequence number), it directly returns the
historical result to avoid re-execution.
The core difference between the two interfaces is the **execution mode**:
* `execute_async` is an **asynchronous method** that submits the function to a
thread pool and yields execution, allowing the operator to process other events
while waiting.
* `execute` is a **synchronous method** that executes directly in the current
thread synchronously, blocking the operator during execution. The return value
is obtained directly upon calling.
```python
class FlinkRunnerContext(RunnerContext):
@override
def execute_async(
self,
func: Callable[[Any], Any],
*args: Tuple[Any, ...],
**kwargs: Dict[str, Any],
) -> Any:
"""Asynchronously execute the provided function. Access to memory
is prohibited within the function. The result of the function will be
stored and returned when the same execute_async call is made again
during
job recovery. The arguments and the result must be serializable.
The action that calls this API should be deterministic, meaning that the
it will always make the execute_async call with the same arguments and
in
the same order during job recovery. Otherwise, the behavior is
undefined.
Parameters
----------
func : Callable
The function need to be asynchronously processing.
*args : tuple
Positional arguments to pass to the function.
**kwargs : dict
Keyword arguments to pass to the function.
Returns:
-------
Any
The result of the function.
"""
@override
def execute(
self,
func: Callable[[Any], Any],
*args: Tuple[Any, ...],
**kwargs: Dict[str, Any],
) -> Any:
"""Synchronously execute the provided function. Access to memory is
prohibited within the function. The result of the function will be
stored
and returned when the same execute call is made again during job
recovery.
The arguments and the result must be serializable.
The function is executed synchronously in the current thread, blocking
the operator until completion.
The action that calls this API should be deterministic, meaning that the
it will always make the execute call with the same arguments and in the
same order during job recovery. Otherwise, the behavior is undefined.
Parameters
----------
func : Callable
The function need to be executing.
*args : tuple
Positional arguments to pass to the function.
**kwargs : dict
Keyword arguments to pass to the function.
Returns:
-------
Any
The result of the function.
"""
```
#### Key Points
* `execute` and `execute_async` have different execution modes, but **identical
persistence semantics**: results from both calls are cached, and during
recovery, they are matched by call order and the cached result is returned
directly.
* Calls must be **deterministic**: given the same input, the call order and
arguments of `execute` / `execute_async` must be consistent. If inconsistency
is detected during recovery, the framework clears the cache and re-executes.
* Upon first completion, the return value (or exception) is persisted; when hit
during recovery, it's returned directly (or the same exception is re-raised),
without re-executing the function body.
* **Accessing Memory is prohibited** within the function body; Memory changes
are persisted by the framework after the Action completes.
* Currently, code block level automatic retry is not provided; users need to
handle it in their defined function body. Future plans include providing
framework-level retry mechanisms (see [Future Work](#future-work)).
#### Example Explanation
The following example demonstrates code block partitioning and recovery
behavior:
```python
@action(InputEvent)
@staticmethod
async def process(event: Event, ctx: RunnerContext):
user_id = event.input
# ════════════════════════════════════════════════════════════════════
# Code Block 1: functionId="fetch_user_profile", argsDigest=hash(user_id)
# - First execution: call fetch_user_profile → persist result to
callRecords[0]
# - On recovery: validate functionId/argsDigest → if hit, skip execution,
return cached result
# ════════════════════════════════════════════════════════════════════
def fetch_user_profile(uid: str) -> dict:
print(f"Fetching profile for {uid}") # Will not execute if recovery
hits cache
return requests.get(f"https://api.example.com/users/{uid}").json()
profile = await ctx.execute_async(fetch_user_profile, user_id)
# The following code is outside the code block, will be re-executed on
recovery
print(f"Got profile: {profile['name']}") # Will execute again on recovery
# ════════════════════════════════════════════════════════════════════
# Code Block 2: functionId="compute_score", argsDigest=hash(profile)
# - First execution: call compute_score → persist result to callRecords[1]
# - On recovery: validate functionId/argsDigest → if hit, skip execution,
return cached result
# ════════════════════════════════════════════════════════════════════
def compute_score(p: dict) -> int:
print(f"Computing score for {p['name']}") # Will not execute if
recovery hits cache
return p['activity_count'] * 10 + p['follower_count']
score = ctx.execute(compute_score, profile)
# The following code is outside the code block, will be re-executed on
recovery
print(f"Computed score: {score}") # Will execute again on recovery
# ════════════════════════════════════════════════════════════════════
# Memory changes: not part of code block, will be re-executed on recovery
# - First execution: write to Memory → persisted by framework after Action
completes
# - On recovery: framework first replays Memory to the state at Action
start, then re-executes the following code
# ════════════════════════════════════════════════════════════════════
ctx.short_term_memory.set("last_user", user_id) # Will be re-executed on
recovery
ctx.short_term_memory.set("score", score) # Will be re-executed on
recovery
ctx.send_event(OutputEvent(output=score))
```
**Summary**:
- Function calls wrapped by `execute` / `execute_async` are code blocks;
results are persisted, and on recovery, if hit, execution is skipped
- Code outside code blocks (including Memory changes) will be re-executed on
recovery
## Implementation Details
### State Model
This section defines the storage structure for function calls in ActionState.
* Introduce `CallRecord` to record the execution result of function calls.
```java
// NEW: embedded record for fine-grained calls
public static final class CallRecord {
private final String functionId; // module+qualname or Java
signature
private final String argsDigest; // stable digest of serialized
args
private byte[] resultPayload; // serialized return value
(nullable)
private byte[] exceptionPayload; // serialized exception info
(nullable)
}
```
During recovery, success/failure is determined by whether `exceptionPayload` is
null: if `exceptionPayload != null`, throw the exception; otherwise, return
`resultPayload`.
* Add `callRecords` and `completed` fields to `ActionState`.
```java
public class ActionState {
...
// NEW: fine-grained durable execution call records
private final List<CallRecord> callRecords;
// NEW: indicates whether the Action is completed
private boolean completed = false;
}
```
### Persistence Timing
To persist results after each code block completes, the timing of ActionState
writes needs to be adjusted:
| Timing | Content Written | `completed` | `callRecords` |
|--------|----------------|-------------|---------------|
| After each code block completes | `callRecords` update | `false` | Retained |
| After Action completes | `memoryUpdates` + `outputEvents` | `true` | Cleared |
* **When code block completes**: Write ActionState containing the new
`CallRecord` to ActionStateStore (e.g., Kafka).
* **When Action completes**: Set `completed = true` and write the final Memory
changes and Output Events together. Also **clear `callRecords`**, since the
Action is complete and recovery will skip the entire Action, so code block
level recovery information is no longer needed. This reduces storage space and
serialization overhead.
### Recovery Logic
The current implementation determines whether an Action needs execution by
checking `whether ActionState exists`. Since the new design writes ActionState
after code block completion, the judgment logic needs to be modified:
```plaintext
ActionState actionState = actionStateStore.get(key, seqNum, action, event);
if (actionState != null && actionState.isCompleted()) {
// Action completed, skip execution, directly replay Memory and OutputEvents
} else {
// Action not completed or ActionState doesn't exist
// Re-execute Action, use callRecords to skip completed code blocks
}
```
* **ActionState exists and `completed = true`**: Action completed, skip
execution, directly replay Memory and OutputEvents.
* **ActionState exists but `completed = false`**: Action execution was
interrupted (failover), re-execute Action. During execution, if an existing
`CallRecord` is encountered, validate and return the cached result, skipping
actual execution.
* **ActionState doesn't exist**: First execution, normal flow.
### Write and Recovery Matching
* Write: When a call completes (success or failure), record the corresponding
`CallRecord` and persist `resultPayload` or `exceptionPayload`.
* Hit: During recovery, query the corresponding `ActionState` by `(key,
sequenceNumber, actionName)`, and get the `CallRecord` by the current call's
order position (the Nth submission corresponds to the Nth record); after a hit,
first validate that `functionId` and `argsDigest` match:
* Validation fails: Determined as non-deterministic call order, clear this
`CallRecord` and subsequent records, output WARN log, then re-execute
* Validation passes: Check if `exceptionPayload` is null
* `exceptionPayload == null`: Directly return the decoded
`resultPayload`
* `exceptionPayload != null`: Decode and throw the equivalent exception
* Miss: Execute normally and add a new record upon completion.
* Error handling: Deserialization failures or content inconsistencies should
fail fast, outputting diagnostic information including the call sequence number
and validation digest.
### Failure Semantics
* Decoding failures
* When decoding `resultPayload`/`exceptionPayload` fails, fail fast
immediately, outputting information including `(key, sequenceNumber,
actionName)` and the call sequence number for diagnosis.
* At-least-once semantics
* The current design tries to avoid duplicate side effects but cannot
guarantee it. For example, if a code block completes execution but the
CallRecord hasn't been persisted to ActionStateStore when a failover occurs,
that code block will be re-executed, potentially causing duplicate side effects
(such as duplicate LLM calls, additional costs, etc.).
* For operations with side effects, users should ensure operations are
idempotent or can tolerate occasional duplicate execution.
* **Framework provides unique Call ID**: Automatically generate a unique
identifier for each `execute` / `execute_async` call (based on `key +
sequenceNumber + actionName + callIndex`), which users can pass to external
systems as an idempotency key. Example:
```python
async def make_payment(event: Event, ctx: RunnerContext):
def charge(call_id: str, amount: int) -> dict:
# External system implements idempotency via call_id
return payment_api.charge(idempotency_key=call_id, amount=amount)
# Framework automatically injects call_id as the first argument
result = await ctx.execute_async(charge, amount=100)
```
## Future Work
### Framework-Level Retry Support
The current design doesn't provide code block level automatic retry; users need
to implement retry logic in the function body themselves. Future considerations
include supporting user-specified retry policies at the framework level:
```python
# Possible API form
result = await ctx.execute_async(
func,
arg,
retry=RetryPolicy(max_retries=3, backoff="exponential")
)
```
Implementation needs to consider the following issues:
* **Diversity of retry policies**: Different scenarios require different retry
policies (retry count, backoff strategy, retry conditions, etc.), requiring a
flexible `RetryPolicy` interface design.
* **Compatibility with existing semantics**: After a successful retry, only the
final successful result is stored; after all retries fail, the last exception
is stored. During recovery, the final result is returned directly or the
exception is thrown, maintaining consistency with current behavior.
### Exactly-once Semantics Support
The current design is based on at-least-once semantics, relying on users to
implement idempotency to avoid side effects from duplicate execution. Future
work may consider extending the capabilities of `execute_async` to support true
exactly-once semantics.
The core idea is: `execute_async` would no longer be limited to receiving a
simple `Callable`, but could accept objects implementing specific protocols,
choosing the appropriate commit strategy based on the external system's
transactional capabilities:
```python
# Possible API form
class TwoPhaseCommitCallable(Protocol):
def prepare(self) -> PrepareResult:
"""Prepare phase: execute the operation but don't commit yet"""
...
def commit(self, prepare_result: PrepareResult) -> None:
"""Commit phase: confirm the prepared operation"""
...
def abort(self, prepare_result: PrepareResult) -> None:
"""Abort phase: cancel the prepared operation"""
...
# Usage example
result = await ctx.execute_async(MyTwoPhaseCommitOperation(...))
```
GitHub link: https://github.com/apache/flink-agents/discussions/404
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]