This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch xiaozhen-sync-region-kill
in repository https://gitbox.apache.org/repos/asf/texera.git

commit e0a28667ce03d789f3d4e2a671bdc579a4be42e6
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Feb 13 15:44:16 2026 -0800

    fix(amber): improve region kill behavior error handling and add synchronous 
kill logic.
---
 .../controller/WorkflowScheduler.scala             |  2 +
 .../WorkerExecutionCompletedHandler.scala          |  6 ++-
 .../scheduling/RegionExecutionCoordinator.scala    | 55 ++++++++++++++++++----
 .../scheduling/WorkflowExecutionCoordinator.scala  | 45 ++++++++++++------
 .../worker/promisehandlers/EndHandler.scala        |  2 +-
 5 files changed, 83 insertions(+), 27 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
index 9dcf3ad4bf..b1acb3c065 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala
@@ -54,4 +54,6 @@ class WorkflowScheduler(
 
   def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else 
schedule.next()
 
+  def hasPendingRegions: Boolean = schedule != null && schedule.hasNext
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
index 594673caa5..9f6871dcb8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala
@@ -60,7 +60,11 @@ trait WorkerExecutionCompletedHandler {
       .collect(Seq(statsRequest))
       .flatMap(_ => {
         // if entire workflow is completed, clean up
-        if (cp.workflowExecution.isCompleted) {
+        val isWorkflowTerminal =
+          cp.workflowExecution.isCompleted &&
+            !cp.workflowScheduler.hasPendingRegions &&
+            !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
+        if (isWorkflowTerminal) {
           // after query result come back: send completed event, cleanup ,and 
kill workflow
           sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
           cp.controllerTimerService.disableStatusUpdate()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 7e5b228801..9fee0226b6 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
-import com.twitter.util.{Future, Return, Throw}
+import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -60,7 +60,7 @@ import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutions
 
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration => ScalaDuration}
 
 /**
   * The executor of a region.
@@ -108,10 +108,14 @@ class RegionExecutionCoordinator(
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
     Unexecuted
   )
+  private val terminationFutureRef: AtomicReference[Future[Unit]] = new 
AtomicReference(null)
+  private val killRetryTimer: Timer = new JavaTimer(true)
+  private val killRetryDelay: TwitterDuration = 
TwitterDuration.fromMilliseconds(200)
 
   /**
     * Sync the status of `RegionExecution` and transition this coordinator's 
phase to `Completed` only when the
-    * coordinator is currently in `ExecutingNonDependeePortsPhase` and all the 
ports of this region are completed.
+    * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the 
ports of this region are completed, and
+    * all workers in this region are terminated.
     *
     * Additionally, this method will also terminate all the workers of this 
region:
     *
@@ -134,12 +138,22 @@ class RegionExecutionCoordinator(
       return Future.Unit
     }
 
-    // Set this coordinator's status to be completed so that subsequent 
regions can be started by
-    // WorkflowExecutionCoordinator.
-    setPhase(Completed)
-
-    // Terminate all the workers in this region.
-    terminateWorkers(regionExecution)
+    val existingTerminationFuture = terminationFutureRef.get
+    if (existingTerminationFuture != null) {
+      existingTerminationFuture
+    } else {
+      val terminationFuture = 
terminateWorkersWithRetry(regionExecution).flatMap { _ =>
+        // Set this coordinator's status to be completed so that subsequent 
regions can be started by
+        // WorkflowExecutionCoordinator.
+        setPhase(Completed)
+        Future.Unit
+      }
+      if (terminationFutureRef.compareAndSet(null, terminationFuture)) {
+        terminationFuture
+      } else {
+        terminationFutureRef.get
+      }
+    }
   }
 
   private def terminateWorkers(regionExecution: RegionExecution) = {
@@ -166,7 +180,7 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
-                gracefulStop(actorRef, Duration(5, 
TimeUnit.SECONDS)).asTwitter()
+                gracefulStop(actorRef, ScalaDuration(5, 
TimeUnit.SECONDS)).asTwitter()
               }
           }.toSeq
 
@@ -190,8 +204,29 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def terminateWorkersWithRetry(
+      regionExecution: RegionExecution,
+      attempt: Int = 1
+  ): Future[Unit] = {
+    terminateWorkers(regionExecution).rescue { case err =>
+      logger.warn(
+        s"Failed to terminate region ${region.id.id} on attempt $attempt. 
Retrying in ${killRetryDelay.inMilliseconds} ms.",
+        err
+      )
+      Future
+        .sleep(killRetryDelay)(killRetryTimer)
+        .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1))
+    }
+  }
+
   def isCompleted: Boolean = currentPhaseRef.get == Completed
 
+  /**
+    * Returns the region termination future if termination has been initiated.
+    * This is only set by `tryCompleteRegionExecution()`.
+    */
+  def getTerminationFutureOpt: Option[Future[Unit]] = 
Option(terminationFutureRef.get)
+
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
     *
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 05585f88d8..1c3ae89471 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -27,9 +27,11 @@ import org.apache.texera.amber.engine.architecture.common.{
   AkkaActorService
 }
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
+import 
org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
 import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
+import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
 
 class WorkflowExecutionCoordinator(
@@ -44,6 +46,7 @@ class WorkflowExecutionCoordinator(
   private val regionExecutionCoordinators
       : mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] =
     mutable.HashMap()
+  private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
 
   @transient var actorRefService: AkkaActorRefMappingService = _
 
@@ -59,18 +62,19 @@ class WorkflowExecutionCoordinator(
     * After the syncs, if there are no running region(s), it will start new 
regions (if available).
     */
   def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] 
= {
-    if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
-      // As this method is invoked by the completion of each port in a region, 
and regionExecutionCoordinator only
-      // lanuches each phase asynchronously, we need to let each current 
unfinished regionExecutionCoordinator
-      // sync its status and proceed with next phases if needed.
-      Future
-        .collect({
-          regionExecutionCoordinators.values
-            .filter(!_.isCompleted)
-            .map(_.syncStatusAndTransitionRegionExecutionPhase())
-            .toSeq
-        })
+    val unfinishedRegionCoordinators =
+      regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
+
+    // Trigger sync for each unfinished region.
+    
unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase())
+
+    // Wait only for region termination futures (kill path), then re-run 
coordination.
+    val terminationFutures = 
unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt)
+    if (terminationFutures.nonEmpty) {
+      return Future
+        .collect(terminationFutures)
         .unit
+        .flatMap(_ => coordinateRegionExecutors(actorService))
     }
 
     if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
@@ -79,10 +83,17 @@ class WorkflowExecutionCoordinator(
     }
 
     // All existing regions are completed. Start the next region (if any).
+    val nextRegions = getNextRegions()
+    if (nextRegions.isEmpty) {
+      if (workflowExecution.isCompleted && 
completionNotified.compareAndSet(false, true)) {
+        
asyncRPCClient.sendToClient(ExecutionStateUpdate(workflowExecution.getState))
+      }
+      return Future.Unit
+    }
+
+    executedRegions.append(nextRegions)
     Future
-      .collect({
-        val nextRegions = getNextRegions()
-        executedRegions.append(nextRegions)
+      .collect(
         nextRegions
           .map(region => {
             workflowExecution.initRegionExecution(region)
@@ -98,7 +109,7 @@ class WorkflowExecutionCoordinator(
           })
           .map(_.syncStatusAndTransitionRegionExecutionPhase())
           .toSeq
-      })
+      )
       .unit
   }
 
@@ -116,4 +127,8 @@ class WorkflowExecutionCoordinator(
       .toSet
   }
 
+  def hasUnfinishedRegionCoordinators: Boolean = {
+    regionExecutionCoordinators.values.exists(!_.isCompleted)
+  }
+
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
index 2a6a20b3d3..0504e66f52 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala
@@ -48,8 +48,8 @@ trait EndHandler {
         s"Received EndHandler before all messages are processed. Unprocessed 
messages: " +
           s"${dp.inputManager.inputMessageQueue.peek()}"
       )
+      return Future.exception(new IllegalStateException("worker still has 
unprocessed messages"))
     }
-    assert(dp.inputManager.inputMessageQueue.isEmpty)
     // Now we can safely acknowledge that this worker can be terminated.
     EmptyReturn()
   }

Reply via email to