yanand0909 commented on code in PR #422:
URL: https://github.com/apache/flink-agents/pull/422#discussion_r2683796877


##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java:
##########
@@ -48,4 +78,120 @@ public void sendEvent(String type, byte[] event, String 
eventString) {
         // this method will be invoked by PythonActionExecutor's python 
interpreter.
         sendEvent(new PythonEvent(event, type, eventString));
     }
+
+    /**
+     * Initializes the recovery state for the current action.
+     *
+     * @param actionState the ActionState for the current action (used for 
adding CallRecords)
+     * @param actionStatePersister callback to persist ActionState after each 
code block completion
+     */
+    public void initRecoveryState(ActionState actionState, Runnable 
actionStatePersister) {
+        this.currentCallIndex = 0;
+        this.currentActionState = actionState;
+        this.actionStatePersister = actionStatePersister;
+        this.recoveryCallRecords =
+                actionState != null && actionState.getCallRecords() != null
+                        ? new ArrayList<>(actionState.getCallRecords())
+                        : new ArrayList<>();
+    }
+
+    /**
+     * Gets the current call index.
+     *
+     * @return the current call index
+     */
+    public int getCurrentCallIndex() {
+        return currentCallIndex;
+    }
+
+    /**
+     * Tries to get a cached CallRecord for recovery.
+     *
+     * <p>This method is called by Python's execute/execute_async to check if 
a previous result
+     * exists. If found and validated, the cached result is returned.
+     *
+     * @param functionId the function identifier
+     * @param argsDigest the digest of serialized arguments
+     * @return array containing [isHit (boolean), resultPayload (byte[]), 
exceptionPayload
+     *     (byte[])], or null if miss
+     */
+    public Object[] tryGetCachedCallRecord(String functionId, String 
argsDigest) {
+        mailboxThreadChecker.run();
+
+        if (currentCallIndex < recoveryCallRecords.size()) {
+            CallRecord record = recoveryCallRecords.get(currentCallIndex);
+
+            if (record.matches(functionId, argsDigest)) {
+                LOG.debug(
+                        "CallRecord hit at index {}: functionId={}, 
argsDigest={}",
+                        currentCallIndex,
+                        functionId,
+                        argsDigest);
+                currentCallIndex++;
+                return new Object[] {true, record.getResultPayload(), 
record.getExceptionPayload()};
+            } else {
+                // Non-deterministic call detected, clear subsequent records
+                LOG.warn(

Review Comment:
   Should we give an option to fail fast here? some config like strict_mode?



##########
python/flink_agents/runtime/flink_runner_context.py:
##########
@@ -194,8 +436,37 @@ def execute_async(
     ) -> AsyncExecutionResult:
         """Asynchronously execute the provided function. Access to memory
         is prohibited within the function.
+
+        The result of the function will be stored and returned when the same
+        execute_async call is made again during job recovery. The arguments
+        and the result must be serializable.
         """
-        return AsyncExecutionResult(self.executor, func, args, kwargs)
+        # Try to get cached result for recovery
+        is_hit, cached_result = self._try_get_cached_result(func, args, kwargs)
+        if is_hit:
+            # Return a pre-completed AsyncExecutionResult
+            return _CachedAsyncExecutionResult(cached_result)
+
+        # Create a wrapper function that records completion
+        def wrapped_func(*a: Any, **kw: Any) -> Any:
+            exception = None
+            result = None
+            try:
+                result = func(*a, **kw)
+            except BaseException as e:
+                exception = e
+
+            # Note: This runs in a thread pool, so we need to be careful

Review Comment:
   We should add this in documentation that the recording will happen when the 
result is awaited and will not work for fire and forget function calls 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to