This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-sync-region-kill in repository https://gitbox.apache.org/repos/asf/texera.git
commit e0a28667ce03d789f3d4e2a671bdc579a4be42e6 Author: Xiaozhen Liu <[email protected]> AuthorDate: Fri Feb 13 15:44:16 2026 -0800 fix(amber): improve region kill behavior error handling and add synchronous kill logic. --- .../controller/WorkflowScheduler.scala | 2 + .../WorkerExecutionCompletedHandler.scala | 6 ++- .../scheduling/RegionExecutionCoordinator.scala | 55 ++++++++++++++++++---- .../scheduling/WorkflowExecutionCoordinator.scala | 45 ++++++++++++------ .../worker/promisehandlers/EndHandler.scala | 2 +- 5 files changed, 83 insertions(+), 27 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bf..b1acb3c065 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -54,4 +54,6 @@ class WorkflowScheduler( def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def hasPendingRegions: Boolean = schedule != null && schedule.hasNext + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala index 594673caa5..9f6871dcb8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala @@ -60,7 +60,11 @@ trait WorkerExecutionCompletedHandler { .collect(Seq(statsRequest)) .flatMap(_ => { // if entire workflow is completed, clean up - if (cp.workflowExecution.isCompleted) { + val isWorkflowTerminal = + cp.workflowExecution.isCompleted && + !cp.workflowScheduler.hasPendingRegions && + !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators + if (isWorkflowTerminal) { // after query result come back: send completed event, cleanup ,and kill workflow sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) cp.controllerTimerService.disableStatusUpdate() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7e5b228801..9fee0226b6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -20,7 +20,7 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.pekko.pattern.gracefulStop -import com.twitter.util.{Future, Return, Throw} +import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, Return, Throw, Timer} import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity @@ -60,7 +60,7 @@ import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration => ScalaDuration} /** * The executor of a region. @@ -108,10 +108,14 @@ class RegionExecutionCoordinator( private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( Unexecuted ) + private val terminationFutureRef: AtomicReference[Future[Unit]] = new AtomicReference(null) + private val killRetryTimer: Timer = new JavaTimer(true) + private val killRetryDelay: TwitterDuration = TwitterDuration.fromMilliseconds(200) /** * Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the - * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the ports of this region are completed. + * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and + * all workers in this region are terminated. * * Additionally, this method will also terminate all the workers of this region: * @@ -134,12 +138,22 @@ class RegionExecutionCoordinator( return Future.Unit } - // Set this coordinator's status to be completed so that subsequent regions can be started by - // WorkflowExecutionCoordinator. - setPhase(Completed) - - // Terminate all the workers in this region. - terminateWorkers(regionExecution) + val existingTerminationFuture = terminationFutureRef.get + if (existingTerminationFuture != null) { + existingTerminationFuture + } else { + val terminationFuture = terminateWorkersWithRetry(regionExecution).flatMap { _ => + // Set this coordinator's status to be completed so that subsequent regions can be started by + // WorkflowExecutionCoordinator. + setPhase(Completed) + Future.Unit + } + if (terminationFutureRef.compareAndSet(null, terminationFuture)) { + terminationFuture + } else { + terminationFutureRef.get + } + } } private def terminateWorkers(regionExecution: RegionExecution) = { @@ -166,7 +180,7 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() + gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -190,8 +204,29 @@ class RegionExecutionCoordinator( } } + private def terminateWorkersWithRetry( + regionExecution: RegionExecution, + attempt: Int = 1 + ): Future[Unit] = { + terminateWorkers(regionExecution).rescue { case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + } + } + def isCompleted: Boolean = currentPhaseRef.get == Completed + /** + * Returns the region termination future if termination has been initiated. + * This is only set by `tryCompleteRegionExecution()`. + */ + def getTerminationFutureOpt: Option[Future[Unit]] = Option(terminationFutureRef.get) + /** * This will sync and transition the region execution phase from one to another depending on its current phase: * diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 05585f88d8..1c3ae89471 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -27,9 +27,11 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorService } import org.apache.texera.amber.engine.architecture.controller.ControllerConfig +import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable class WorkflowExecutionCoordinator( @@ -44,6 +46,7 @@ class WorkflowExecutionCoordinator( private val regionExecutionCoordinators : mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] = mutable.HashMap() + private val completionNotified: AtomicBoolean = new AtomicBoolean(false) @transient var actorRefService: AkkaActorRefMappingService = _ @@ -59,18 +62,19 @@ class WorkflowExecutionCoordinator( * After the syncs, if there are no running region(s), it will start new regions (if available). */ def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] = { - if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { - // As this method is invoked by the completion of each port in a region, and regionExecutionCoordinator only - // lanuches each phase asynchronously, we need to let each current unfinished regionExecutionCoordinator - // sync its status and proceed with next phases if needed. - Future - .collect({ - regionExecutionCoordinators.values - .filter(!_.isCompleted) - .map(_.syncStatusAndTransitionRegionExecutionPhase()) - .toSeq - }) + val unfinishedRegionCoordinators = + regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq + + // Trigger sync for each unfinished region. + unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase()) + + // Wait only for region termination futures (kill path), then re-run coordination. + val terminationFutures = unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt) + if (terminationFutures.nonEmpty) { + return Future + .collect(terminationFutures) .unit + .flatMap(_ => coordinateRegionExecutors(actorService)) } if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { @@ -79,10 +83,17 @@ class WorkflowExecutionCoordinator( } // All existing regions are completed. Start the next region (if any). + val nextRegions = getNextRegions() + if (nextRegions.isEmpty) { + if (workflowExecution.isCompleted && completionNotified.compareAndSet(false, true)) { + asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState)) + } + return Future.Unit + } + + executedRegions.append(nextRegions) Future - .collect({ - val nextRegions = getNextRegions() - executedRegions.append(nextRegions) + .collect( nextRegions .map(region => { workflowExecution.initRegionExecution(region) @@ -98,7 +109,7 @@ class WorkflowExecutionCoordinator( }) .map(_.syncStatusAndTransitionRegionExecutionPhase()) .toSeq - }) + ) .unit } @@ -116,4 +127,8 @@ class WorkflowExecutionCoordinator( .toSet } + def hasUnfinishedRegionCoordinators: Boolean = { + regionExecutionCoordinators.values.exists(!_.isCompleted) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala index 2a6a20b3d3..0504e66f52 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala @@ -48,8 +48,8 @@ trait EndHandler { s"Received EndHandler before all messages are processed. Unprocessed messages: " + s"${dp.inputManager.inputMessageQueue.peek()}" ) + return Future.exception(new IllegalStateException("worker still has unprocessed messages")) } - assert(dp.inputManager.inputMessageQueue.isEmpty) // Now we can safely acknowledge that this worker can be terminated. EmptyReturn() }
