xintongsong commented on code in PR #80: URL: https://github.com/apache/flink-agents/pull/80#discussion_r2250795038
########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java: ########## @@ -112,75 +150,167 @@ public void open() throws Exception { metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup()); builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan); - runnerContext = new RunnerContextImpl(shortTermMemState, metricGroup); + // init agent processing related state + actionTasksKState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "actionTasks", TypeInformation.of(ActionTask.class))); + pendingInputEventsKState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "pendingInputEvents", TypeInformation.of(Event.class))); + // We use UnionList here to ensure that the task can access all keys after parallelism + // modifications. + // Subsequent steps {@link #tryResumeProcessActionTasks} will then filter out keys that do + // not belong to the key range of current task. + currentProcessingKeysOpState = + getOperatorStateBackend() + .getUnionListState( + new ListStateDescriptor<>( + "currentProcessingKeys", TypeInformation.of(Object.class))); // init PythonActionExecutor initPythonActionExecutor(); + + mailboxProcessor = getMailboxProcessor(); + + // Since an operator restart may change the key range it manages due to changes in + // parallelism, + // and {@link tryProcessActionTaskForKey} mails might be lost, + // it is necessary to reprocess all keys to ensure correctness. + tryResumeProcessActionTasks(); } @Override public void processElement(StreamRecord<IN> record) throws Exception { IN input = record.getValue(); LOG.debug("Receive an element {}", input); - // 1. wrap to InputEvent first + // wrap to InputEvent first Event inputEvent = wrapToInputEvent(input); - // 2. execute action - LinkedList<Event> events = new LinkedList<>(); - events.push(inputEvent); - while (!events.isEmpty()) { - Event event = events.pop(); - builtInMetrics.markEventProcessed(); - List<Action> actions = getActionsTriggeredBy(event); - if (actions != null && !actions.isEmpty()) { - for (Action action : actions) { - // TODO: Support multi-action execution for a single event. Example: A Java - // event - // should be processable by both Java and Python actions. - // TODO: Implement asynchronous action execution. - - // execute action and collect output events - String actionName = action.getName(); - LOG.debug("Try execute action {} for event {}.", actionName, event); - List<Event> actionOutputEvents; - if (action.getExec() instanceof JavaFunction) { - runnerContext.setActionName(actionName); - action.getExec().call(event, runnerContext); - actionOutputEvents = runnerContext.drainEvents(); - } else if (action.getExec() instanceof PythonFunction) { - checkState(event instanceof PythonEvent); - actionOutputEvents = - pythonActionExecutor.executePythonFunction( - (PythonFunction) action.getExec(), - (PythonEvent) event, - actionName); - } else { - throw new RuntimeException("Unsupported action type: " + action.getClass()); - } - builtInMetrics.markActionExecuted(actionName); - - for (Event actionOutputEvent : actionOutputEvents) { - if (EventUtil.isOutputEvent(actionOutputEvent)) { - builtInMetrics.markEventProcessed(); - OUT outputData = getOutputFromOutputEvent(actionOutputEvent); - LOG.debug( - "Collect output data {} for input {} in action {}.", - outputData, - input, - action.getName()); - output.collect(reusedStreamRecord.replace(outputData)); - } else { - LOG.debug( - "Collect event {} for event {} in action {}.", - actionOutputEvent, - event, - action.getName()); - events.add(actionOutputEvent); - } - } + if (currentKeyHasMoreActionTask()) { + // If there are already actions being processed for the current key, the newly incoming + // event should be queued and processed later. Therefore, we add it to + // pendingInputEventsState. + pendingInputEventsKState.add(inputEvent); + } else { + // Otherwise, the new event is processed immediately. + Object key = getCurrentKey(); + boolean shouldSubmitMail = processEvent(key, inputEvent); + if (shouldSubmitMail) { + mailboxExecutor.submit( + () -> tryProcessActionTaskForKey(key), "process action task"); + } + } + } + + /** + * Processes an incoming event for the given key and determines whether a new mail + * `tryProcessActionTaskForKey` should be submitted to continue processing. + * + * <p>The return value `shouldSubmitMail` is set to true only if the action tasks list + * (actionTasksKState) was previously empty and is now non-empty. + * + * @param key The key associated with the event. + * @param event The event to process. + * @return True if a new mail should be submitted, false otherwise. + * @throws Exception If any error occurs during processing. + */ + private boolean processEvent(Object key, Event event) throws Exception { + boolean shouldSubmitMail = false; + builtInMetrics.markEventProcessed(); + if (EventUtil.isOutputEvent(event)) { + // If the event is an OutputEvent, we send it downstream. + OUT outputData = getOutputFromOutputEvent(event); + output.collect(reusedStreamRecord.replace(outputData)); + } else { + if (EventUtil.isInputEvent(event)) { + // If the event is an InputEvent, we mark that the key is currently being processed. + currentProcessingKeysOpState.add(key); + } + // We then obtain the triggered action and add ActionTasks to the waiting processing + // queue. + List<Action> triggerActions = getActionsTriggeredBy(event); + if (triggerActions != null && !triggerActions.isEmpty()) { + shouldSubmitMail = !listStateNotEmpty(actionTasksKState); + for (Action triggerAction : triggerActions) { + actionTasksKState.add(createActionTask(key, triggerAction, event)); + } + } + } + + return shouldSubmitMail; Review Comment: Why not submit the mail here? ########## runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.operator; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.plan.Action; +import org.apache.flink.agents.runtime.context.RunnerContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class represents a task related to the execution of an action in {@link + * ActionExecutionOperator}. + * + * <p>An action is split into multiple code blocks, and each code block is represented by an {@code + * ActionTask}. You can call {@link #invoke()} to execute a code block and obtain invoke result + * {@link ActionTaskResult}. If the action contains additional code blocks, you can obtain the next + * {@code ActionTask} via {@link ActionTaskResult#getGeneratedActionTask()} and continue executing + * it. + */ +public abstract class ActionTask { + + protected static final Logger LOG = LoggerFactory.getLogger(ActionTask.class); + + protected final Object key; + protected final Event event; + protected final Action action; + /** + * Since RunnerContextImpl contains references to the Operator and state, it should not be + * serialized and included in the state with ActionTask. Instead, we should check if a valid + * RunnerContext exists before each ActionTask invocation and create a new one if necessary. + */ + protected transient RunnerContextImpl runnerContext; + + public ActionTask(Object key, Event event, Action action) { + this.key = key; + this.event = event; + this.action = action; + } + + public RunnerContextImpl getRunnerContext() { + return runnerContext; + } + + public void setRunnerContext(RunnerContextImpl runnerContext) { + this.runnerContext = runnerContext; + } + + public Object getKey() { + return key; + } + + /** Invokes the action task. */ + public abstract ActionTaskResult invoke() throws Exception; + + public class ActionTaskResult { + private final boolean finished; + private final List<Event> outputEvents; + private final Optional<ActionTask> generatedActionTaskOpt; + + public ActionTaskResult( + boolean finished, + List<Event> outputEvents, + Optional<ActionTask> generatedActionTaskOpt) { + this.finished = finished; + this.outputEvents = outputEvents; + this.generatedActionTaskOpt = generatedActionTaskOpt; + } Review Comment: ```suggestion public ActionTaskResult( boolean finished, List<Event> outputEvents, @Nullable ActionTask generatedActionTaskOpt) { this.finished = finished; this.outputEvents = outputEvents; this.generatedActionTaskOpt = Optional.ofNullable(generatedActionTaskOpt); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org