This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/shengquan-add-reconfigration 
by this push:
     new 97e45aeaae bug fix
97e45aeaae is described below

commit 97e45aeaae6294ab319d29553e9a4894cd1f09d0
Author: Shengquan Ni <[email protected]>
AuthorDate: Sun Feb 1 20:36:49 2026 -0800

    bug fix
---
 .../promisehandlers/ReconfigurationHandler.scala   | 40 ++++++++--------------
 .../DataProcessorRPCHandlerInitializer.scala       |  4 ++-
 .../InitializeExecutorHandler.scala                | 13 ++-----
 .../promisehandlers/UpdateExecutorHandler.scala    | 28 +++------------
 4 files changed, 23 insertions(+), 62 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 9470336efb..573306bc06 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -24,15 +24,19 @@ import 
org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, EmbeddedCo
 import 
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 ExecutionStatsUpdate}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 WorkflowReconfigureRequest}
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, 
PropagateEmbeddedControlMessageResponse}
 import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.util.VirtualIdentityUtils
+import 
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_UPDATE_EXECUTOR
+
+import scala.collection.mutable
 
 trait ReconfigurationHandler {
   this: ControllerAsyncRPCHandlerInitializer =>
 
   override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx: 
AsyncRPCContext): Future[EmptyReturn] = {
+    val futures = mutable.ArrayBuffer[Future[_]]()
     
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
 msg).foreach{
       friesComponent =>
         if(friesComponent.scope.size == 1){
@@ -40,7 +44,7 @@ trait ReconfigurationHandler {
           val workerIds = 
cp.workflowExecution.getLatestOperatorExecution(updateExecutorRequest.targetOpId).getWorkerIds
           workerIds.foreach{
             worker =>
-              workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker))
+              
futures.append(workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)))
           }
         }else{
           val channelScope = cp.workflowExecution.getRunningRegionExecutions
@@ -64,42 +68,26 @@ trait ReconfigurationHandler {
           }
           val finalScope = channelScope ++ controlChannels
           val cmdMapping =
-            friesComponent.reconfigurations
-              .flatMap { updateReq =>
-                val workers =
-                  cp.workflowExecution
-                    .getLatestOperatorExecution(updateReq.targetOpId)
-                    .getWorkerIds
-
-                workers.map { worker =>
-                  worker.name ->
-                    createInvocation(
-                      METHOD_UPDATE_EXECUTOR.getBareMethodName,
-                      updateReq.newExecInitInfo,
-                      worker
-                    )
-                }
-              }
-              .groupBy(_._1)
-              .map {
-                case (worker, entries) =>
-                  worker -> entries.head._2
-              }
+            friesComponent.reconfigurations .flatMap { updateReq =>
+              val workers = cp.workflowExecution 
.getLatestOperatorExecution(updateReq.targetOpId) .getWorkerIds
+              workers.map(worker => worker.name -> 
createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, updateReq, worker)) 
} .toMap
+          futures += cmdMapping.map(_._2._2)
           friesComponent.sources.foreach { source =>
             
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { 
worker =>
               sendECM(
                 EmbeddedControlMessageIdentity(msg.reconfigurationId),
                 ALL_ALIGNMENT,
                 finalScope.toSet,
-                cmdMapping,
+                cmdMapping.map(x => (x._1, x._2._1)),
                 ChannelIdentity(actorId, worker, isControl = true)
               )
             }
           }
         }
-
     }
-    EmptyReturn()
+    Future.collect(futures.toList).map { _ =>
+      EmptyReturn()
+    }
   }
 
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index 545d0c60f1..d2837860f5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -55,6 +55,8 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
     with UpdateExecutorHandler {
   val actorId: ActorVirtualIdentity = dp.actorId
 
+  var cachedTotalWorkerCount = 0
+
   override def debugCommand(
       request: DebugCommandRequest,
       ctx: AsyncRPCContext
@@ -70,7 +72,7 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
 
   override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): 
Future[EmptyReturn] = ???
 
-  def initializeExecutor(execInitInfo: OpExecInitInfo): Unit = {
+  def initializeExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, 
workerCount: Int): Unit = {
     dp.executor = execInitInfo match {
       case OpExecWithClassName(className, descString) =>
         ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 1204710181..f6e7f3d8b2 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -41,17 +41,8 @@ trait InitializeExecutorHandler {
   ): Future[EmptyReturn] = {
     dp.serializationManager.setOpInitialization(req)
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
-    val workerCount = req.totalWorkerCount
-    dp.executor = req.opExecInitInfo match {
-      case OpExecWithClassName(className, descString) =>
-        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
-      case OpExecWithCode(code, _) =>
-        ExecFactory.newExecFromJavaCode(code)
-      case OpExecSource(storageUri, _) =>
-        new CacheSourceOpExec(URI.create(storageUri))
-      case OpExecInitInfo.Empty =>
-        throw new IllegalArgumentException("Empty executor initialization 
info")
-    }
+    cachedTotalWorkerCount = req.totalWorkerCount
+    initializeExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
     EmptyReturn()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 7f9e1fa31d..8cb0e30be2 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -23,6 +23,8 @@ import com.twitter.util.Future
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 UpdateExecutorRequest}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
 import scala.reflect.runtime.universe._
 
 trait UpdateExecutorHandler {
@@ -32,32 +34,10 @@ trait UpdateExecutorHandler {
                                request: UpdateExecutorRequest,
                                ctx: AsyncRPCContext
                            ): Future[EmptyReturn] = {
-    val oldOpExecState = dp.executor
-    initializeExecutor(request.newExecInitInfo)
+    val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+    initializeExecutor(request.newExecInitInfo, workerIdx, 
cachedTotalWorkerCount)
     dp.executor.open()
-    copyMatchingFields(oldOpExecState, dp.executor) //TBD if we really need 
this
     EmptyReturn()
   }
 
-  private[this] def copyMatchingFields[A: TypeTag, B: TypeTag](from: A, to: 
B): Unit = {
-    val mirror = runtimeMirror(from.getClass.getClassLoader)
-
-    val fromFields = typeOf[A].members.collect {
-      case m: MethodSymbol if m.isGetter => m
-    }
-
-    val toFields = typeOf[B].members.collect {
-      case m: MethodSymbol if m.isVar => m
-    }.map(m => m.name.toString -> m).toMap
-
-    fromFields.foreach { f =>
-      val name = f.name.toString
-      toFields.get(name).foreach { setter =>
-        if (f.returnType =:= setter.returnType) {
-          val value = mirror.reflect(from).reflectMethod(f).apply()
-          mirror.reflect(to).reflectField(setter).set(value)
-        }
-      }
-    }
-  }
 }

Reply via email to