GitHub user Sxnan created a discussion: Support Fine-Grain Durable Execution

## Motivation

### Async Execution

Flink Agents currently supports async execution for Python Actions. With 
`execute_async`, Actions can submit functions to a thread pool when handling 
high-latency I/O, allowing the operator to yield execution. This enables 
non-blocking progress within the same key sequence and parallel execution 
across multiple Actions. The current Python async execution API is as follows:

```python
class RunnerContext(ABC):
    @abstractmethod
    def execute_async(self, func: Callable[[Any], Any], *args: Tuple[Any, ...], 
**kwargs: Dict[str, Any]) -> Any:
        """Asynchronously execute the provided function. Access to memory is 
prohibited within the function.

        Parameters
        ----------
        func : Callable
            The function need to be asynchronously processing.
        *args : tuple
            Positional arguments to pass to the function.
        **kwargs : dict
            Keyword arguments to pass to the function.
        Returns:
        -------
        Any
            The result of the function.
        """
```

Through this API, an Action is divided into multiple code blocks (i.e., a 
single function call wrapped by `execute` or `execute_async`); the code block 
becomes the minimum granularity for Action execution and scheduling.

### Per-Action State Consistency

Per-Action State Consistency works by writing Memory changes after an Action 
completes to the ActionStateStore. When a job fails over and recovers from a 
checkpoint, it can replay data from the ActionStateStore to avoid re-executing 
Actions.

### The Problem

The minimum granularity of these two mechanisms is inconsistent: recovery and 
deduplication operate at the "Action" level, while execution and scheduling 
operate at the "code block" level. When an async code block has completed but 
the Action hasn't finished and a failover occurs, that code block will be 
re-executed upon recovery, causing:

* Duplicate external calls (potential side effects)

* Unnecessary waiting and costs

To address this, we want to reduce the granularity of consistency and recovery 
to the "code block" level. This introduces fine-grained durable execution: 
record and persist the return result of each call to `execute_async` (and 
synchronous `execute`); during job recovery, if the same call is encountered 
(same function and arguments, same call order), directly return the stored 
result to avoid re-execution. This mechanism requires calls to be deterministic 
(same inputs and order) and that the function body doesn't access Memory, 
ensuring results can be cached and replayed.

## Public Interface

### Update RunnerContext Interface

This section enhances the existing `execute_async` and introduces the `execute` 
interface for `RunnerContext`: when a job recovers from a checkpoint and 
encounters the "same call" as before the failure (same function and arguments, 
same call order, same key and sequence number), it directly returns the 
historical result to avoid re-execution.

The core difference between the two interfaces is the **execution mode**:

* `execute_async` is an **asynchronous method** that submits the function to a 
thread pool and yields execution, allowing the operator to process other events 
while waiting.

* `execute` is a **synchronous method** that executes directly in the current 
thread synchronously, blocking the operator during execution. The return value 
is obtained directly upon calling.

```python
class FlinkRunnerContext(RunnerContext):
    @override
    def execute_async(
        self,
        func: Callable[[Any], Any],
        *args: Tuple[Any, ...],
        **kwargs: Dict[str, Any],
    ) -> Any:
        """Asynchronously execute the provided function. Access to memory
        is prohibited within the function. The result of the function will be 
        stored and returned when the same execute_async call is made again 
during
        job recovery. The arguments and the result must be serializable.

        The action that calls this API should be deterministic, meaning that the
        it will always make the execute_async call with the same arguments and 
in 
        the same order during job recovery. Otherwise, the behavior is 
undefined.

        Parameters
        ----------
        func : Callable
            The function need to be asynchronously processing.
        *args : tuple
            Positional arguments to pass to the function.
        **kwargs : dict
            Keyword arguments to pass to the function.

        Returns:
        -------
        Any
            The result of the function.
        """

    @override
    def execute(
        self,
        func: Callable[[Any], Any],
        *args: Tuple[Any, ...],
        **kwargs: Dict[str, Any],
    ) -> Any:
        """Synchronously execute the provided function. Access to memory is 
        prohibited within the function. The result of the function will be 
stored 
        and returned when the same execute call is made again during job 
recovery. 
        The arguments and the result must be serializable.

        The function is executed synchronously in the current thread, blocking 
        the operator until completion.

        The action that calls this API should be deterministic, meaning that the
        it will always make the execute call with the same arguments and in the 
        same order during job recovery. Otherwise, the behavior is undefined.

        Parameters
        ----------
        func : Callable
            The function need to be executing.
        *args : tuple
            Positional arguments to pass to the function.
        **kwargs : dict
            Keyword arguments to pass to the function.

        Returns:
        -------
        Any
            The result of the function.
        """
```

#### Key Points

* `execute` and `execute_async` have different execution modes, but **identical 
persistence semantics**: results from both calls are cached, and during 
recovery, they are matched by call order and the cached result is returned 
directly.

* Calls must be **deterministic**: given the same input, the call order and 
arguments of `execute` / `execute_async` must be consistent. If inconsistency 
is detected during recovery, the framework clears the cache and re-executes.

* Upon first completion, the return value (or exception) is persisted; when hit 
during recovery, it's returned directly (or the same exception is re-raised), 
without re-executing the function body.

* **Accessing Memory is prohibited** within the function body; Memory changes 
are persisted by the framework after the Action completes.

* Currently, code block level automatic retry is not provided; users need to 
handle it in their defined function body. Future plans include providing 
framework-level retry mechanisms (see [Future Work](#future-work)).

#### Example Explanation

The following example demonstrates code block partitioning and recovery 
behavior:

```python
@action(InputEvent)
@staticmethod
async def process(event: Event, ctx: RunnerContext):
    user_id = event.input
    
    # ════════════════════════════════════════════════════════════════════
    # Code Block 1: functionId="fetch_user_profile", argsDigest=hash(user_id)
    # - First execution: call fetch_user_profile → persist result to 
callRecords[0]
    # - On recovery: validate functionId/argsDigest → if hit, skip execution, 
return cached result
    # ════════════════════════════════════════════════════════════════════
    def fetch_user_profile(uid: str) -> dict:
        print(f"Fetching profile for {uid}")  # Will not execute if recovery 
hits cache
        return requests.get(f"https://api.example.com/users/{uid}";).json()
    
    profile = await ctx.execute_async(fetch_user_profile, user_id)
    
    # The following code is outside the code block, will be re-executed on 
recovery
    print(f"Got profile: {profile['name']}")  # Will execute again on recovery
    
    # ════════════════════════════════════════════════════════════════════
    # Code Block 2: functionId="compute_score", argsDigest=hash(profile)
    # - First execution: call compute_score → persist result to callRecords[1]
    # - On recovery: validate functionId/argsDigest → if hit, skip execution, 
return cached result
    # ════════════════════════════════════════════════════════════════════
    def compute_score(p: dict) -> int:
        print(f"Computing score for {p['name']}")  # Will not execute if 
recovery hits cache
        return p['activity_count'] * 10 + p['follower_count']
    
    score = ctx.execute(compute_score, profile)
    
    # The following code is outside the code block, will be re-executed on 
recovery
    print(f"Computed score: {score}")  # Will execute again on recovery
    
    # ════════════════════════════════════════════════════════════════════
    # Memory changes: not part of code block, will be re-executed on recovery
    # - First execution: write to Memory → persisted by framework after Action 
completes
    # - On recovery: framework first replays Memory to the state at Action 
start, then re-executes the following code
    # ════════════════════════════════════════════════════════════════════
    ctx.short_term_memory.set("last_user", user_id)  # Will be re-executed on 
recovery
    ctx.short_term_memory.set("score", score)        # Will be re-executed on 
recovery
    
    ctx.send_event(OutputEvent(output=score))
```

**Summary**:
- Function calls wrapped by `execute` / `execute_async` are code blocks; 
results are persisted, and on recovery, if hit, execution is skipped
- Code outside code blocks (including Memory changes) will be re-executed on 
recovery

## Implementation Details

### State Model

This section defines the storage structure for function calls in ActionState.

* Introduce `CallRecord` to record the execution result of function calls.

```java
// NEW: embedded record for fine-grained calls
    public static final class CallRecord {
        private final String functionId;       // module+qualname or Java 
signature
        private final String argsDigest;       // stable digest of serialized 
args
        private byte[] resultPayload;          // serialized return value 
(nullable)
        private byte[] exceptionPayload;       // serialized exception info 
(nullable)
    }
```

During recovery, success/failure is determined by whether `exceptionPayload` is 
null: if `exceptionPayload != null`, throw the exception; otherwise, return 
`resultPayload`.

* Add `callRecords` and `completed` fields to `ActionState`.

```java
public class ActionState {
    ...

    // NEW: fine-grained durable execution call records
    private final List<CallRecord> callRecords;
    
    // NEW: indicates whether the Action is completed
    private boolean completed = false;
}
```

### Persistence Timing

To persist results after each code block completes, the timing of ActionState 
writes needs to be adjusted:

| Timing | Content Written | `completed` | `callRecords` |
|--------|----------------|-------------|---------------|
| After each code block completes | `callRecords` update | `false` | Retained |
| After Action completes | `memoryUpdates` + `outputEvents` | `true` | Cleared |

* **When code block completes**: Write ActionState containing the new 
`CallRecord` to ActionStateStore (e.g., Kafka).

* **When Action completes**: Set `completed = true` and write the final Memory 
changes and Output Events together. Also **clear `callRecords`**, since the 
Action is complete and recovery will skip the entire Action, so code block 
level recovery information is no longer needed. This reduces storage space and 
serialization overhead.

### Recovery Logic

The current implementation determines whether an Action needs execution by 
checking `whether ActionState exists`. Since the new design writes ActionState 
after code block completion, the judgment logic needs to be modified:

```plaintext
ActionState actionState = actionStateStore.get(key, seqNum, action, event);

if (actionState != null && actionState.isCompleted()) {
    // Action completed, skip execution, directly replay Memory and OutputEvents
} else {
    // Action not completed or ActionState doesn't exist
    // Re-execute Action, use callRecords to skip completed code blocks
}
```

* **ActionState exists and `completed = true`**: Action completed, skip 
execution, directly replay Memory and OutputEvents.

* **ActionState exists but `completed = false`**: Action execution was 
interrupted (failover), re-execute Action. During execution, if an existing 
`CallRecord` is encountered, validate and return the cached result, skipping 
actual execution.

* **ActionState doesn't exist**: First execution, normal flow.

### Write and Recovery Matching

* Write: When a call completes (success or failure), record the corresponding 
`CallRecord` and persist `resultPayload` or `exceptionPayload`.

* Hit: During recovery, query the corresponding `ActionState` by `(key, 
sequenceNumber, actionName)`, and get the `CallRecord` by the current call's 
order position (the Nth submission corresponds to the Nth record); after a hit, 
first validate that `functionId` and `argsDigest` match:

    * Validation fails: Determined as non-deterministic call order, clear this 
`CallRecord` and subsequent records, output WARN log, then re-execute

    * Validation passes: Check if `exceptionPayload` is null

        * `exceptionPayload == null`: Directly return the decoded 
`resultPayload`

        * `exceptionPayload != null`: Decode and throw the equivalent exception

* Miss: Execute normally and add a new record upon completion.

* Error handling: Deserialization failures or content inconsistencies should 
fail fast, outputting diagnostic information including the call sequence number 
and validation digest.

### Failure Semantics

* Decoding failures

    * When decoding `resultPayload`/`exceptionPayload` fails, fail fast 
immediately, outputting information including `(key, sequenceNumber, 
actionName)` and the call sequence number for diagnosis.

* At-least-once semantics

    * The current design tries to avoid duplicate side effects but cannot 
guarantee it. For example, if a code block completes execution but the 
CallRecord hasn't been persisted to ActionStateStore when a failover occurs, 
that code block will be re-executed, potentially causing duplicate side effects 
(such as duplicate LLM calls, additional costs, etc.).

    * For operations with side effects, users should ensure operations are 
idempotent or can tolerate occasional duplicate execution.

    * **Framework provides unique Call ID**: Automatically generate a unique 
identifier for each `execute` / `execute_async` call (based on `key + 
sequenceNumber + actionName + callIndex`), which users can pass to external 
systems as an idempotency key. Example:

```python
async def make_payment(event: Event, ctx: RunnerContext):
    def charge(call_id: str, amount: int) -> dict:
        # External system implements idempotency via call_id
        return payment_api.charge(idempotency_key=call_id, amount=amount)
    
    # Framework automatically injects call_id as the first argument
    result = await ctx.execute_async(charge, amount=100)
```

## Future Work

### Framework-Level Retry Support

The current design doesn't provide code block level automatic retry; users need 
to implement retry logic in the function body themselves. Future considerations 
include supporting user-specified retry policies at the framework level:

```python
# Possible API form
result = await ctx.execute_async(
    func, 
    arg,
    retry=RetryPolicy(max_retries=3, backoff="exponential")
)
```

Implementation needs to consider the following issues:

* **Diversity of retry policies**: Different scenarios require different retry 
policies (retry count, backoff strategy, retry conditions, etc.), requiring a 
flexible `RetryPolicy` interface design.

* **Compatibility with existing semantics**: After a successful retry, only the 
final successful result is stored; after all retries fail, the last exception 
is stored. During recovery, the final result is returned directly or the 
exception is thrown, maintaining consistency with current behavior.
### Exactly-once Semantics Support

The current design is based on at-least-once semantics, relying on users to 
implement idempotency to avoid side effects from duplicate execution. Future 
work may consider extending the capabilities of `execute_async` to support true 
exactly-once semantics.

The core idea is: `execute_async` would no longer be limited to receiving a 
simple `Callable`, but could accept objects implementing specific protocols, 
choosing the appropriate commit strategy based on the external system's 
transactional capabilities:

```python
# Possible API form
class TwoPhaseCommitCallable(Protocol):
    def prepare(self) -> PrepareResult:
        """Prepare phase: execute the operation but don't commit yet"""
        ...
    
    def commit(self, prepare_result: PrepareResult) -> None:
        """Commit phase: confirm the prepared operation"""
        ...
    
    def abort(self, prepare_result: PrepareResult) -> None:
        """Abort phase: cancel the prepared operation"""
        ...

# Usage example
result = await ctx.execute_async(MyTwoPhaseCommitOperation(...))
```



GitHub link: https://github.com/apache/flink-agents/discussions/404

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to