This is an automated email from the ASF dual-hosted git repository.
shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/shengquan-add-reconfigration
by this push:
new 97e45aeaae bug fix
97e45aeaae is described below
commit 97e45aeaae6294ab319d29553e9a4894cd1f09d0
Author: Shengquan Ni <[email protected]>
AuthorDate: Sun Feb 1 20:36:49 2026 -0800
bug fix
---
.../promisehandlers/ReconfigurationHandler.scala | 40 ++++++++--------------
.../DataProcessorRPCHandlerInitializer.scala | 4 ++-
.../InitializeExecutorHandler.scala | 13 ++-----
.../promisehandlers/UpdateExecutorHandler.scala | 28 +++------------
4 files changed, 23 insertions(+), 62 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 9470336efb..573306bc06 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -24,15 +24,19 @@ import
org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, EmbeddedCo
import
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
ExecutionStatsUpdate}
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
WorkflowReconfigureRequest}
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn,
PropagateEmbeddedControlMessageResponse}
import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.texera.amber.util.VirtualIdentityUtils
+import
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_UPDATE_EXECUTOR
+
+import scala.collection.mutable
trait ReconfigurationHandler {
this: ControllerAsyncRPCHandlerInitializer =>
override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx:
AsyncRPCContext): Future[EmptyReturn] = {
+ val futures = mutable.ArrayBuffer[Future[_]]()
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
msg).foreach{
friesComponent =>
if(friesComponent.scope.size == 1){
@@ -40,7 +44,7 @@ trait ReconfigurationHandler {
val workerIds =
cp.workflowExecution.getLatestOperatorExecution(updateExecutorRequest.targetOpId).getWorkerIds
workerIds.foreach{
worker =>
- workerInterface.updateExecutor(updateExecutorRequest,
mkContext(worker))
+
futures.append(workerInterface.updateExecutor(updateExecutorRequest,
mkContext(worker)))
}
}else{
val channelScope = cp.workflowExecution.getRunningRegionExecutions
@@ -64,42 +68,26 @@ trait ReconfigurationHandler {
}
val finalScope = channelScope ++ controlChannels
val cmdMapping =
- friesComponent.reconfigurations
- .flatMap { updateReq =>
- val workers =
- cp.workflowExecution
- .getLatestOperatorExecution(updateReq.targetOpId)
- .getWorkerIds
-
- workers.map { worker =>
- worker.name ->
- createInvocation(
- METHOD_UPDATE_EXECUTOR.getBareMethodName,
- updateReq.newExecInitInfo,
- worker
- )
- }
- }
- .groupBy(_._1)
- .map {
- case (worker, entries) =>
- worker -> entries.head._2
- }
+ friesComponent.reconfigurations .flatMap { updateReq =>
+ val workers = cp.workflowExecution
.getLatestOperatorExecution(updateReq.targetOpId) .getWorkerIds
+ workers.map(worker => worker.name ->
createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, updateReq, worker))
} .toMap
+ futures += cmdMapping.map(_._2._2)
friesComponent.sources.foreach { source =>
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach {
worker =>
sendECM(
EmbeddedControlMessageIdentity(msg.reconfigurationId),
ALL_ALIGNMENT,
finalScope.toSet,
- cmdMapping,
+ cmdMapping.map(x => (x._1, x._2._1)),
ChannelIdentity(actorId, worker, isControl = true)
)
}
}
}
-
}
- EmptyReturn()
+ Future.collect(futures.toList).map { _ =>
+ EmptyReturn()
+ }
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index 545d0c60f1..d2837860f5 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -55,6 +55,8 @@ class DataProcessorRPCHandlerInitializer(val dp:
DataProcessor)
with UpdateExecutorHandler {
val actorId: ActorVirtualIdentity = dp.actorId
+ var cachedTotalWorkerCount = 0
+
override def debugCommand(
request: DebugCommandRequest,
ctx: AsyncRPCContext
@@ -70,7 +72,7 @@ class DataProcessorRPCHandlerInitializer(val dp:
DataProcessor)
override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext):
Future[EmptyReturn] = ???
- def initializeExecutor(execInitInfo: OpExecInitInfo): Unit = {
+ def initializeExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int,
workerCount: Int): Unit = {
dp.executor = execInitInfo match {
case OpExecWithClassName(className, descString) =>
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx,
workerCount)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 1204710181..f6e7f3d8b2 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -41,17 +41,8 @@ trait InitializeExecutorHandler {
): Future[EmptyReturn] = {
dp.serializationManager.setOpInitialization(req)
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
- val workerCount = req.totalWorkerCount
- dp.executor = req.opExecInitInfo match {
- case OpExecWithClassName(className, descString) =>
- ExecFactory.newExecFromJavaClassName(className, descString, workerIdx,
workerCount)
- case OpExecWithCode(code, _) =>
- ExecFactory.newExecFromJavaCode(code)
- case OpExecSource(storageUri, _) =>
- new CacheSourceOpExec(URI.create(storageUri))
- case OpExecInitInfo.Empty =>
- throw new IllegalArgumentException("Empty executor initialization
info")
- }
+ cachedTotalWorkerCount = req.totalWorkerCount
+ initializeExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
EmptyReturn()
}
}
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 7f9e1fa31d..8cb0e30be2 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -23,6 +23,8 @@ import com.twitter.util.Future
import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
UpdateExecutorRequest}
import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
import scala.reflect.runtime.universe._
trait UpdateExecutorHandler {
@@ -32,32 +34,10 @@ trait UpdateExecutorHandler {
request: UpdateExecutorRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
- val oldOpExecState = dp.executor
- initializeExecutor(request.newExecInitInfo)
+ val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+ initializeExecutor(request.newExecInitInfo, workerIdx,
cachedTotalWorkerCount)
dp.executor.open()
- copyMatchingFields(oldOpExecState, dp.executor) //TBD if we really need
this
EmptyReturn()
}
- private[this] def copyMatchingFields[A: TypeTag, B: TypeTag](from: A, to:
B): Unit = {
- val mirror = runtimeMirror(from.getClass.getClassLoader)
-
- val fromFields = typeOf[A].members.collect {
- case m: MethodSymbol if m.isGetter => m
- }
-
- val toFields = typeOf[B].members.collect {
- case m: MethodSymbol if m.isVar => m
- }.map(m => m.name.toString -> m).toMap
-
- fromFields.foreach { f =>
- val name = f.name.toString
- toFields.get(name).foreach { setter =>
- if (f.returnType =:= setter.returnType) {
- val value = mirror.reflect(from).reflectMethod(f).apply()
- mirror.reflect(to).reflectField(setter).set(value)
- }
- }
- }
- }
}