This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git

commit f5a0c38a84d6852e9af52bf163acc0b4756db70c
Author: Shengquan Ni <[email protected]>
AuthorDate: Sun Feb 1 19:22:17 2026 -0800

    add rough impl
---
 .../engine/architecture/rpc/controlcommands.proto  |   21 +-
 .../architecture/rpc/controllerservice.proto       |    1 +
 .../engine/architecture/rpc/workerservice.proto    |    1 +
 .../handlers/control/update_executor_handler.py    |   29 +-
 .../amber/engine/architecture/rpc/__init__.py      | 1527 ++++++++++----------
 .../apache/texera/amber/engine/common/__init__.py  |   42 +-
 .../ControllerAsyncRPCHandlerInitializer.scala     |    3 +-
 .../promisehandlers/ReconfigurationHandler.scala   |  105 ++
 .../DataProcessorRPCHandlerInitializer.scala       |   27 +-
 .../InitializeExecutorHandler.scala                |    1 -
 ...orHandler.scala => UpdateExecutorHandler.scala} |   59 +-
 .../common}/FriesReconfigurationAlgorithm.scala    |   56 +-
 common/config/src/main/resources/storage.conf      |    4 +-
 common/config/src/main/resources/udf.conf          |    2 +-
 14 files changed, 1010 insertions(+), 868 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index ed17be236d..2ce7f9be8e 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -25,7 +25,6 @@ import 
"org/apache/texera/amber/engine/architecture/worker/statistics.proto";
 import 
"org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto";
 import "scalapb/scalapb.proto";
 import "google/protobuf/timestamp.proto";
-import "google/protobuf/any.proto";
 
 option (scalapb.options) = {
   scope: FILE,
@@ -40,12 +39,11 @@ message ControlRequest {
     TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
     DebugCommandRequest debugCommandRequest = 3;
     EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4;
-    ModifyLogicRequest modifyLogicRequest = 5;
-    RetryWorkflowRequest retryWorkflowRequest = 6;
-    ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8;
-    PortCompletedRequest portCompletedRequest = 9;
-    WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
-    LinkWorkersRequest linkWorkersRequest = 11;
+    RetryWorkflowRequest retryWorkflowRequest = 5;
+    ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 6;
+    PortCompletedRequest portCompletedRequest = 7;
+    WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
+    LinkWorkersRequest linkWorkersRequest = 9;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -119,7 +117,7 @@ message TakeGlobalCheckpointRequest {
 }
 
 message WorkflowReconfigureRequest{
-  ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true];
+  repeated UpdateExecutorRequest reconfiguration = 1;
   string reconfigurationId = 2;
 }
 
@@ -134,10 +132,6 @@ message EvaluatePythonExpressionRequest {
   string operatorId = 2;
 }
 
-message ModifyLogicRequest {
-  repeated UpdateExecutorRequest updateRequest = 1;
-}
-
 message RetryWorkflowRequest {
   repeated core.ActorVirtualIdentity workers = 1;
 }
@@ -260,8 +254,7 @@ message InitializeExecutorRequest {
 
 message UpdateExecutorRequest {
   core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
-  google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true];
-  google.protobuf.Any stateTransferFunc = 3;
+  core.OpExecInitInfo newExecInitInfo = 2;
 }
 
 message PrepareCheckpointRequest{
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 70d189a341..27b4727ee9 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -45,4 +45,5 @@ service ControllerService {
   rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
   rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns 
(EmptyReturn);
   rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
+  rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
 }
\ No newline at end of file
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
index dbcd6d8a5e..21944ffefc 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
@@ -50,4 +50,5 @@ service WorkerService {
   rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
   rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns 
(EvaluatedValue);
   rpc NoOperation(EmptyRequest) returns (EmptyReturn);
+  rpc UpdateExecutor(UpdateExecutorRequest) returns (EmptyReturn);
 }
\ No newline at end of file
diff --git 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index fe33b2dec0..a5bc1a6c0f 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -1,28 +1 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# from proto.org.apache.texera.amber.engine.architecture.worker import 
UpdateExecutorV2
-# from core.architecture.handlers.control.control_handler_base import 
ControlHandler
-# from core.architecture.managers.context import Context
-#
-#
-# class UpdateExecutorHandler(ControlHandler):
-#     cmd = UpdateExecutorV2
-#
-#     def __call__(self, context: Context, command: cmd, *args, **kwargs):
-#         context.executor_manager.update_executor(command.code, 
command.is_source)
-#         return None
+# Licensed to the Apache Software Foundation (ASF) under one # or more 
contributor license agreements.  See the NOTICE file # distributed with this 
work for additional information # regarding copyright ownership.  The ASF 
licenses this file # to you under the Apache License, Version 2.0 (the # 
"License"); you may not use this file except in compliance # with the License.  
You may obtain a copy of the License at # #   
http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable 
law or agreed to in writing, # software distributed under the License is 
distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # 
KIND, either express or implied.  See the License for the # specific language 
governing permissions and limitations # under the License.  from 
core.architecture.handlers.control.control_handler_base import ControlHandler 
from proto.org.apache.texera.amber.engine.architecture.rpc import (     
EmptyReturn,     UpdateExecutorRequest, )   class Update
 ExecutorHandler(ControlHandler):     async def update_executor(self, req: 
UpdateExecutorRequest) -> EmptyReturn:         
self.context.executor_manager.update_executor(req.code, req.is_source)         
return EmptyReturn()
\ No newline at end of file
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index ea6ddc5e43..f7b89f13e6 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -13,7 +13,6 @@ from typing import (
 )
 
 import betterproto
-import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf
 import grpclib
 from betterproto.grpc.grpclib_server import ServiceBase
 
@@ -23,7 +22,6 @@ from .. import (
     worker as _worker__,
 )
 
-
 if TYPE_CHECKING:
     import grpclib.server
     from betterproto.grpc.grpclib_client import MetadataLike
@@ -78,23 +76,20 @@ class ControlRequest(betterproto.Message):
     evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = (
         betterproto.message_field(4, group="sealed_value")
     )
-    modify_logic_request: "ModifyLogicRequest" = betterproto.message_field(
-        5, group="sealed_value"
-    )
     retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field(
-        6, group="sealed_value"
+        5, group="sealed_value"
     )
     console_message_triggered_request: "ConsoleMessageTriggeredRequest" = (
-        betterproto.message_field(8, group="sealed_value")
+        betterproto.message_field(6, group="sealed_value")
     )
     port_completed_request: "PortCompletedRequest" = betterproto.message_field(
-        9, group="sealed_value"
+        7, group="sealed_value"
     )
     worker_state_updated_request: "WorkerStateUpdatedRequest" = (
-        betterproto.message_field(10, group="sealed_value")
+        betterproto.message_field(8, group="sealed_value")
     )
     link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
-        11, group="sealed_value"
+        9, group="sealed_value"
     )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
@@ -192,7 +187,7 @@ class TakeGlobalCheckpointRequest(betterproto.Message):
 
 @dataclass(eq=False, repr=False)
 class WorkflowReconfigureRequest(betterproto.Message):
-    reconfiguration: "ModifyLogicRequest" = betterproto.message_field(1)
+    reconfiguration: List["UpdateExecutorRequest"] = 
betterproto.message_field(1)
     reconfiguration_id: str = betterproto.string_field(2)
 
 
@@ -208,11 +203,6 @@ class EvaluatePythonExpressionRequest(betterproto.Message):
     operator_id: str = betterproto.string_field(2)
 
 
-@dataclass(eq=False, repr=False)
-class ModifyLogicRequest(betterproto.Message):
-    update_request: List["UpdateExecutorRequest"] = 
betterproto.message_field(1)
-
-
 @dataclass(eq=False, repr=False)
 class RetryWorkflowRequest(betterproto.Message):
     workers: List["___core__.ActorVirtualIdentity"] = 
betterproto.message_field(1)
@@ -366,10 +356,7 @@ class InitializeExecutorRequest(betterproto.Message):
 @dataclass(eq=False, repr=False)
 class UpdateExecutorRequest(betterproto.Message):
     target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1)
-    new_executor: "betterproto_lib_google_protobuf.Any" = 
betterproto.message_field(2)
-    state_transfer_func: "betterproto_lib_google_protobuf.Any" = (
-        betterproto.message_field(3)
-    )
+    new_exec_init_info: "___core__.OpExecInitInfo" = 
betterproto.message_field(2)
 
 
 @dataclass(eq=False, repr=False)
@@ -517,503 +504,522 @@ class WorkerMetricsResponse(betterproto.Message):
     metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1)
 
 
-class RpcTesterStub(betterproto.ServiceStub):
-    async def send_ping(
+class ControllerServiceStub(betterproto.ServiceStub):
+    async def retrieve_workflow_state(
         self,
-        ping: "Ping",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "RetrieveWorkflowStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
-            ping,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
+            empty_request,
+            RetrieveWorkflowStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_pong(
+    async def propagate_embedded_control_message(
         self,
-        pong: "Pong",
+        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "PropagateEmbeddedControlMessageResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
-            pong,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
+            propagate_embedded_control_message_request,
+            PropagateEmbeddedControlMessageResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_nested(
+    async def take_global_checkpoint(
         self,
-        nested: "Nested",
+        take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "TakeGlobalCheckpointResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
-            nested,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
+            take_global_checkpoint_request,
+            TakeGlobalCheckpointResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_pass(
+    async def debug_command(
         self,
-        pass_: "Pass",
+        debug_command_request: "DebugCommandRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
-            pass_,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
+            debug_command_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_error_command(
+    async def evaluate_python_expression(
         self,
-        error_command: "ErrorCommand",
+        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EvaluatePythonExpressionResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
-            error_command,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
+            evaluate_python_expression_request,
+            EvaluatePythonExpressionResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_recursion(
+    async def console_message_triggered(
         self,
-        recursion: "Recursion",
+        console_message_triggered_request: "ConsoleMessageTriggeredRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
-            recursion,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
+            console_message_triggered_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_collect(
+    async def port_completed(
         self,
-        collect: "Collect",
+        port_completed_request: "PortCompletedRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
-            collect,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
+            port_completed_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_generate_number(
+    async def start_workflow(
         self,
-        generate_number: "GenerateNumber",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "StartWorkflowResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
-            generate_number,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
+            empty_request,
+            StartWorkflowResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_multi_call(
+    async def resume_workflow(
         self,
-        multi_call: "MultiCall",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
-            multi_call,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_chain(
+    async def pause_workflow(
         self,
-        chain: "Chain",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
-            chain,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-
-class WorkerServiceStub(betterproto.ServiceStub):
-    async def add_input_channel(
+    async def worker_state_updated(
         self,
-        add_input_channel_request: "AddInputChannelRequest",
+        worker_state_updated_request: "WorkerStateUpdatedRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
-            add_input_channel_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
+            worker_state_updated_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def add_partitioning(
+    async def worker_execution_completed(
         self,
-        add_partitioning_request: "AddPartitioningRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
-            add_partitioning_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def assign_port(
+    async def link_workers(
         self,
-        assign_port_request: "AssignPortRequest",
+        link_workers_request: "LinkWorkersRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
-            assign_port_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
+            link_workers_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def finalize_checkpoint(
+    async def controller_initiate_query_statistics(
         self,
-        finalize_checkpoint_request: "FinalizeCheckpointRequest",
+        query_statistics_request: "QueryStatisticsRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "FinalizeCheckpointResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
-            finalize_checkpoint_request,
-            FinalizeCheckpointResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
+            query_statistics_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def flush_network_buffer(
+    async def retry_workflow(
         self,
-        empty_request: "EmptyRequest",
+        retry_workflow_request: "RetryWorkflowRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
+            retry_workflow_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def initialize_executor(
+    async def reconfigure_workflow(
         self,
-        initialize_executor_request: "InitializeExecutorRequest",
+        workflow_reconfigure_request: "WorkflowReconfigureRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
-            initialize_executor_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
+            workflow_reconfigure_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def open_executor(
+
+class RpcTesterStub(betterproto.ServiceStub):
+    async def send_ping(
         self,
-        empty_request: "EmptyRequest",
+        ping: "Ping",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
+            ping,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def pause_worker(
+    async def send_pong(
         self,
-        empty_request: "EmptyRequest",
+        pong: "Pong",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
-            empty_request,
-            WorkerStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
+            pong,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def prepare_checkpoint(
+    async def send_nested(
         self,
-        prepare_checkpoint_request: "PrepareCheckpointRequest",
+        nested: "Nested",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
-            prepare_checkpoint_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
+            nested,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def query_statistics(
+    async def send_pass(
         self,
-        empty_request: "EmptyRequest",
+        pass_: "Pass",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerMetricsResponse":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
-            empty_request,
-            WorkerMetricsResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
+            pass_,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def resume_worker(
+    async def send_error_command(
         self,
-        empty_request: "EmptyRequest",
+        error_command: "ErrorCommand",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
-            empty_request,
-            WorkerStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
+            error_command,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retrieve_state(
+    async def send_recursion(
         self,
-        empty_request: "EmptyRequest",
+        recursion: "Recursion",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
+            recursion,
+            StringResponse,
+            timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retry_current_tuple(
+    async def send_collect(
         self,
-        empty_request: "EmptyRequest",
+        collect: "Collect",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
+            collect,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def start_worker(
+    async def send_generate_number(
         self,
-        empty_request: "EmptyRequest",
+        generate_number: "GenerateNumber",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
-            empty_request,
-            WorkerStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
+            generate_number,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def end_worker(
+    async def send_multi_call(
         self,
-        empty_request: "EmptyRequest",
+        multi_call: "MultiCall",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
+            multi_call,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def start_channel(
+    async def send_chain(
         self,
-        empty_request: "EmptyRequest",
+        chain: "Chain",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "StringResponse":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
+            chain,
+            StringResponse,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
+
+class WorkerServiceStub(betterproto.ServiceStub):
+    async def add_input_channel(
+        self,
+        add_input_channel_request: "AddInputChannelRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
+            add_input_channel_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def end_channel(
+    async def add_partitioning(
         self,
-        empty_request: "EmptyRequest",
+        add_partitioning_request: "AddPartitioningRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
+            add_partitioning_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def debug_command(
+    async def assign_port(
         self,
-        debug_command_request: "DebugCommandRequest",
+        assign_port_request: "AssignPortRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
-            debug_command_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
+            assign_port_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def evaluate_python_expression(
+    async def finalize_checkpoint(
         self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+        finalize_checkpoint_request: "FinalizeCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatedValue":
+    ) -> "FinalizeCheckpointResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatedValue,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
+            finalize_checkpoint_request,
+            FinalizeCheckpointResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def no_operation(
+    async def flush_network_buffer(
         self,
         empty_request: "EmptyRequest",
         *,
@@ -1022,7 +1028,7 @@ class WorkerServiceStub(betterproto.ServiceStub):
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
             empty_request,
             EmptyReturn,
             timeout=timeout,
@@ -1030,145 +1036,160 @@ class WorkerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def initialize_executor(
+        self,
+        initialize_executor_request: "InitializeExecutorRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
+            initialize_executor_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
 
-class ControllerServiceStub(betterproto.ServiceStub):
-    async def retrieve_workflow_state(
+    async def open_executor(
         self,
         empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "RetrieveWorkflowStateResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
             empty_request,
-            RetrieveWorkflowStateResponse,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def propagate_embedded_control_message(
+    async def pause_worker(
         self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "PropagateEmbeddedControlMessageResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
-            propagate_embedded_control_message_request,
-            PropagateEmbeddedControlMessageResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
+            empty_request,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def take_global_checkpoint(
+    async def prepare_checkpoint(
         self,
-        take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
+        prepare_checkpoint_request: "PrepareCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "TakeGlobalCheckpointResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
-            take_global_checkpoint_request,
-            TakeGlobalCheckpointResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
+            prepare_checkpoint_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def debug_command(
+    async def query_statistics(
         self,
-        debug_command_request: "DebugCommandRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "WorkerMetricsResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
-            debug_command_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
+            empty_request,
+            WorkerMetricsResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def evaluate_python_expression(
+    async def resume_worker(
         self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatePythonExpressionResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatePythonExpressionResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
+            empty_request,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def console_message_triggered(
+    async def retrieve_state(
         self,
-        console_message_triggered_request: "ConsoleMessageTriggeredRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
-            console_message_triggered_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def port_completed(
+    async def retry_current_tuple(
         self,
-        port_completed_request: "PortCompletedRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
-            port_completed_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def start_workflow(
+    async def start_worker(
         self,
         empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StartWorkflowResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
             empty_request,
-            StartWorkflowResponse,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def resume_workflow(
+    async def end_worker(
         self,
         empty_request: "EmptyRequest",
         *,
@@ -1177,7 +1198,7 @@ class ControllerServiceStub(betterproto.ServiceStub):
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
             empty_request,
             EmptyReturn,
             timeout=timeout,
@@ -1185,7 +1206,7 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
-    async def pause_workflow(
+    async def start_channel(
         self,
         empty_request: "EmptyRequest",
         *,
@@ -1194,7 +1215,7 @@ class ControllerServiceStub(betterproto.ServiceStub):
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
             empty_request,
             EmptyReturn,
             timeout=timeout,
@@ -1202,85 +1223,85 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
-    async def worker_state_updated(
+    async def end_channel(
         self,
-        worker_state_updated_request: "WorkerStateUpdatedRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
-            worker_state_updated_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def worker_execution_completed(
+    async def debug_command(
         self,
-        empty_request: "EmptyRequest",
+        debug_command_request: "DebugCommandRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
+            debug_command_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def link_workers(
+    async def evaluate_python_expression(
         self,
-        link_workers_request: "LinkWorkersRequest",
+        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "EvaluatedValue":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
-            link_workers_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
+            evaluate_python_expression_request,
+            EvaluatedValue,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def controller_initiate_query_statistics(
+    async def no_operation(
         self,
-        query_statistics_request: "QueryStatisticsRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
-            query_statistics_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retry_workflow(
+    async def update_executor(
         self,
-        retry_workflow_request: "RetryWorkflowRequest",
+        update_executor_request: "UpdateExecutorRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
-            retry_workflow_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
+            update_executor_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
@@ -1288,806 +1309,842 @@ class ControllerServiceStub(betterproto.ServiceStub):
         )
 
 
-class RpcTesterBase(ServiceBase):
+class ControllerServiceBase(ServiceBase):
 
-    async def send_ping(self, ping: "Ping") -> "IntResponse":
+    async def retrieve_workflow_state(
+        self, empty_request: "EmptyRequest"
+    ) -> "RetrieveWorkflowStateResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_pong(self, pong: "Pong") -> "IntResponse":
+    async def propagate_embedded_control_message(
+        self,
+        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
+    ) -> "PropagateEmbeddedControlMessageResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_nested(self, nested: "Nested") -> "StringResponse":
+    async def take_global_checkpoint(
+        self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
+    ) -> "TakeGlobalCheckpointResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_pass(self, pass_: "Pass") -> "StringResponse":
+    async def debug_command(
+        self, debug_command_request: "DebugCommandRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_error_command(
-        self, error_command: "ErrorCommand"
-    ) -> "StringResponse":
+    async def evaluate_python_expression(
+        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
+    ) -> "EvaluatePythonExpressionResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
+    async def console_message_triggered(
+        self, console_message_triggered_request: 
"ConsoleMessageTriggeredRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_collect(self, collect: "Collect") -> "StringResponse":
+    async def port_completed(
+        self, port_completed_request: "PortCompletedRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_generate_number(
-        self, generate_number: "GenerateNumber"
-    ) -> "IntResponse":
+    async def start_workflow(
+        self, empty_request: "EmptyRequest"
+    ) -> "StartWorkflowResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_multi_call(self, multi_call: "MultiCall") -> 
"StringResponse":
+    async def resume_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_chain(self, chain: "Chain") -> "StringResponse":
+    async def pause_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_send_ping(
-        self, stream: "grpclib.server.Stream[Ping, IntResponse]"
+    async def worker_state_updated(
+        self, worker_state_updated_request: "WorkerStateUpdatedRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def worker_execution_completed(
+        self, empty_request: "EmptyRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def link_workers(
+        self, link_workers_request: "LinkWorkersRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def controller_initiate_query_statistics(
+        self, query_statistics_request: "QueryStatisticsRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def retry_workflow(
+        self, retry_workflow_request: "RetryWorkflowRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def reconfigure_workflow(
+        self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def __rpc_retrieve_workflow_state(
+        self,
+        stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_ping(request)
+        response = await self.retrieve_workflow_state(request)
         await stream.send_message(response)
 
-    async def __rpc_send_pong(
-        self, stream: "grpclib.server.Stream[Pong, IntResponse]"
+    async def __rpc_propagate_embedded_control_message(
+        self,
+        stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, 
PropagateEmbeddedControlMessageResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_pong(request)
+        response = await self.propagate_embedded_control_message(request)
         await stream.send_message(response)
 
-    async def __rpc_send_nested(
-        self, stream: "grpclib.server.Stream[Nested, StringResponse]"
+    async def __rpc_take_global_checkpoint(
+        self,
+        stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, 
TakeGlobalCheckpointResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_nested(request)
+        response = await self.take_global_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_send_pass(
-        self, stream: "grpclib.server.Stream[Pass, StringResponse]"
+    async def __rpc_debug_command(
+        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_pass(request)
+        response = await self.debug_command(request)
         await stream.send_message(response)
 
-    async def __rpc_send_error_command(
-        self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
+    async def __rpc_evaluate_python_expression(
+        self,
+        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatePythonExpressionResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_error_command(request)
+        response = await self.evaluate_python_expression(request)
         await stream.send_message(response)
 
-    async def __rpc_send_recursion(
-        self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
+    async def __rpc_console_message_triggered(
+        self,
+        stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, 
EmptyReturn]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_recursion(request)
+        response = await self.console_message_triggered(request)
         await stream.send_message(response)
 
-    async def __rpc_send_collect(
-        self, stream: "grpclib.server.Stream[Collect, StringResponse]"
+    async def __rpc_port_completed(
+        self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_collect(request)
+        response = await self.port_completed(request)
         await stream.send_message(response)
 
-    async def __rpc_send_generate_number(
-        self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
+    async def __rpc_start_workflow(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
StartWorkflowResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_generate_number(request)
+        response = await self.start_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_send_multi_call(
-        self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
+    async def __rpc_resume_workflow(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_multi_call(request)
+        response = await self.resume_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_send_chain(
-        self, stream: "grpclib.server.Stream[Chain, StringResponse]"
+    async def __rpc_pause_workflow(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_chain(request)
+        response = await self.pause_workflow(request)
+        await stream.send_message(response)
+
+    async def __rpc_worker_state_updated(
+        self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.worker_state_updated(request)
+        await stream.send_message(response)
+
+    async def __rpc_worker_execution_completed(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.worker_execution_completed(request)
+        await stream.send_message(response)
+
+    async def __rpc_link_workers(
+        self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.link_workers(request)
+        await stream.send_message(response)
+
+    async def __rpc_controller_initiate_query_statistics(
+        self, stream: "grpclib.server.Stream[QueryStatisticsRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.controller_initiate_query_statistics(request)
+        await stream.send_message(response)
+
+    async def __rpc_retry_workflow(
+        self, stream: "grpclib.server.Stream[RetryWorkflowRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.retry_workflow(request)
+        await stream.send_message(response)
+
+    async def __rpc_reconfigure_workflow(
+        self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.reconfigure_workflow(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": 
grpclib.const.Handler(
-                self.__rpc_send_ping,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
+                self.__rpc_retrieve_workflow_state,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Ping,
-                IntResponse,
+                EmptyRequest,
+                RetrieveWorkflowStateResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": 
grpclib.const.Handler(
-                self.__rpc_send_pong,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
 grpclib.const.Handler(
+                self.__rpc_propagate_embedded_control_message,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Pong,
-                IntResponse,
+                PropagateEmbeddedControlMessageRequest,
+                PropagateEmbeddedControlMessageResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": 
grpclib.const.Handler(
-                self.__rpc_send_nested,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_take_global_checkpoint,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Nested,
-                StringResponse,
+                TakeGlobalCheckpointRequest,
+                TakeGlobalCheckpointResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": 
grpclib.const.Handler(
-                self.__rpc_send_pass,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
 grpclib.const.Handler(
+                self.__rpc_debug_command,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Pass,
-                StringResponse,
+                DebugCommandRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": 
grpclib.const.Handler(
-                self.__rpc_send_error_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
 grpclib.const.Handler(
+                self.__rpc_evaluate_python_expression,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                ErrorCommand,
-                StringResponse,
+                EvaluatePythonExpressionRequest,
+                EvaluatePythonExpressionResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": 
grpclib.const.Handler(
-                self.__rpc_send_recursion,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
 grpclib.const.Handler(
+                self.__rpc_console_message_triggered,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Recursion,
-                StringResponse,
+                ConsoleMessageTriggeredRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": 
grpclib.const.Handler(
-                self.__rpc_send_collect,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
 grpclib.const.Handler(
+                self.__rpc_port_completed,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Collect,
-                StringResponse,
+                PortCompletedRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
 grpclib.const.Handler(
-                self.__rpc_send_generate_number,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
 grpclib.const.Handler(
+                self.__rpc_start_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                GenerateNumber,
-                IntResponse,
+                EmptyRequest,
+                StartWorkflowResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": 
grpclib.const.Handler(
-                self.__rpc_send_multi_call,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
 grpclib.const.Handler(
+                self.__rpc_resume_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                MultiCall,
-                StringResponse,
+                EmptyRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": 
grpclib.const.Handler(
-                self.__rpc_send_chain,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
 grpclib.const.Handler(
+                self.__rpc_pause_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Chain,
-                StringResponse,
+                EmptyRequest,
+                EmptyReturn,
             ),
-        }
-
-
-class WorkerServiceBase(ServiceBase):
-
-    async def add_input_channel(
-        self, add_input_channel_request: "AddInputChannelRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def add_partitioning(
-        self, add_partitioning_request: "AddPartitioningRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def assign_port(
-        self, assign_port_request: "AssignPortRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def finalize_checkpoint(
-        self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
-    ) -> "FinalizeCheckpointResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def flush_network_buffer(
-        self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def initialize_executor(
-        self, initialize_executor_request: "InitializeExecutorRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def open_executor(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def pause_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
 grpclib.const.Handler(
+                self.__rpc_worker_state_updated,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                WorkerStateUpdatedRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
 grpclib.const.Handler(
+                self.__rpc_worker_execution_completed,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
+                self.__rpc_link_workers,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                LinkWorkersRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
 grpclib.const.Handler(
+                self.__rpc_controller_initiate_query_statistics,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                QueryStatisticsRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
 grpclib.const.Handler(
+                self.__rpc_retry_workflow,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                RetryWorkflowRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
 grpclib.const.Handler(
+                self.__rpc_reconfigure_workflow,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                WorkflowReconfigureRequest,
+                EmptyReturn,
+            ),
+        }
 
-    async def prepare_checkpoint(
-        self, prepare_checkpoint_request: "PrepareCheckpointRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def query_statistics(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerMetricsResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+class RpcTesterBase(ServiceBase):
 
-    async def resume_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
+    async def send_ping(self, ping: "Ping") -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retrieve_state(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_pong(self, pong: "Pong") -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retry_current_tuple(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_nested(self, nested: "Nested") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def start_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
+    async def send_pass(self, pass_: "Pass") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
+    async def send_error_command(
+        self, error_command: "ErrorCommand"
+    ) -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def start_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def end_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_collect(self, collect: "Collect") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
-    ) -> "EmptyReturn":
+    async def send_generate_number(
+        self, generate_number: "GenerateNumber"
+    ) -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatedValue":
+    async def send_multi_call(self, multi_call: "MultiCall") -> 
"StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_chain(self, chain: "Chain") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_add_input_channel(
-        self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_input_channel(request)
-        await stream.send_message(response)
-
-    async def __rpc_add_partitioning(
-        self, stream: "grpclib.server.Stream[AddPartitioningRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_partitioning(request)
-        await stream.send_message(response)
-
-    async def __rpc_assign_port(
-        self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.assign_port(request)
-        await stream.send_message(response)
-
-    async def __rpc_finalize_checkpoint(
-        self,
-        stream: "grpclib.server.Stream[FinalizeCheckpointRequest, 
FinalizeCheckpointResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.finalize_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_flush_network_buffer(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.flush_network_buffer(request)
-        await stream.send_message(response)
-
-    async def __rpc_initialize_executor(
-        self, stream: "grpclib.server.Stream[InitializeExecutorRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.initialize_executor(request)
-        await stream.send_message(response)
-
-    async def __rpc_open_executor(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.open_executor(request)
-        await stream.send_message(response)
-
-    async def __rpc_pause_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.pause_worker(request)
-        await stream.send_message(response)
-
-    async def __rpc_prepare_checkpoint(
-        self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.prepare_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_query_statistics(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerMetricsResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.query_statistics(request)
-        await stream.send_message(response)
-
-    async def __rpc_resume_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    async def __rpc_send_ping(
+        self, stream: "grpclib.server.Stream[Ping, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.resume_worker(request)
+        response = await self.send_ping(request)
         await stream.send_message(response)
 
-    async def __rpc_retrieve_state(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_pong(
+        self, stream: "grpclib.server.Stream[Pong, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retrieve_state(request)
+        response = await self.send_pong(request)
         await stream.send_message(response)
 
-    async def __rpc_retry_current_tuple(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_nested(
+        self, stream: "grpclib.server.Stream[Nested, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retry_current_tuple(request)
+        response = await self.send_nested(request)
         await stream.send_message(response)
 
-    async def __rpc_start_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    async def __rpc_send_pass(
+        self, stream: "grpclib.server.Stream[Pass, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.start_worker(request)
+        response = await self.send_pass(request)
         await stream.send_message(response)
 
-    async def __rpc_end_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_error_command(
+        self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.end_worker(request)
+        response = await self.send_error_command(request)
         await stream.send_message(response)
 
-    async def __rpc_start_channel(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_recursion(
+        self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.start_channel(request)
+        response = await self.send_recursion(request)
         await stream.send_message(response)
 
-    async def __rpc_end_channel(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_collect(
+        self, stream: "grpclib.server.Stream[Collect, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.end_channel(request)
+        response = await self.send_collect(request)
         await stream.send_message(response)
 
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+    async def __rpc_send_generate_number(
+        self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.debug_command(request)
+        response = await self.send_generate_number(request)
         await stream.send_message(response)
 
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatedValue]",
+    async def __rpc_send_multi_call(
+        self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
+        response = await self.send_multi_call(request)
         await stream.send_message(response)
 
-    async def __rpc_no_operation(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_chain(
+        self, stream: "grpclib.server.Stream[Chain, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.no_operation(request)
+        response = await self.send_chain(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
-                self.__rpc_add_input_channel,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AddInputChannelRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
 grpclib.const.Handler(
-                self.__rpc_add_partitioning,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AddPartitioningRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": 
grpclib.const.Handler(
-                self.__rpc_assign_port,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AssignPortRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_finalize_checkpoint,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                FinalizeCheckpointRequest,
-                FinalizeCheckpointResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
 grpclib.const.Handler(
-                self.__rpc_flush_network_buffer,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
 grpclib.const.Handler(
-                self.__rpc_initialize_executor,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                InitializeExecutorRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": 
grpclib.const.Handler(
-                self.__rpc_open_executor,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": 
grpclib.const.Handler(
-                self.__rpc_pause_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_prepare_checkpoint,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                PrepareCheckpointRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_query_statistics,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerMetricsResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": 
grpclib.const.Handler(
-                self.__rpc_resume_worker,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": 
grpclib.const.Handler(
+                self.__rpc_send_ping,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
+                Ping,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": 
grpclib.const.Handler(
-                self.__rpc_retrieve_state,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": 
grpclib.const.Handler(
+                self.__rpc_send_pong,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Pong,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
 grpclib.const.Handler(
-                self.__rpc_retry_current_tuple,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": 
grpclib.const.Handler(
+                self.__rpc_send_nested,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Nested,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": 
grpclib.const.Handler(
-                self.__rpc_start_worker,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": 
grpclib.const.Handler(
+                self.__rpc_send_pass,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
+                Pass,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": 
grpclib.const.Handler(
-                self.__rpc_end_worker,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": 
grpclib.const.Handler(
+                self.__rpc_send_error_command,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                ErrorCommand,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": 
grpclib.const.Handler(
-                self.__rpc_start_channel,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": 
grpclib.const.Handler(
+                self.__rpc_send_recursion,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Recursion,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": 
grpclib.const.Handler(
-                self.__rpc_end_channel,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": 
grpclib.const.Handler(
+                self.__rpc_send_collect,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Collect,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": 
grpclib.const.Handler(
-                self.__rpc_debug_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
 grpclib.const.Handler(
+                self.__rpc_send_generate_number,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
-                EmptyReturn,
+                GenerateNumber,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": 
grpclib.const.Handler(
+                self.__rpc_send_multi_call,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatedValue,
+                MultiCall,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": 
grpclib.const.Handler(
-                self.__rpc_no_operation,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": 
grpclib.const.Handler(
+                self.__rpc_send_chain,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Chain,
+                StringResponse,
             ),
         }
 
 
-class ControllerServiceBase(ServiceBase):
+class WorkerServiceBase(ServiceBase):
 
-    async def retrieve_workflow_state(
-        self, empty_request: "EmptyRequest"
-    ) -> "RetrieveWorkflowStateResponse":
+    async def add_input_channel(
+        self, add_input_channel_request: "AddInputChannelRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def propagate_embedded_control_message(
-        self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
-    ) -> "PropagateEmbeddedControlMessageResponse":
+    async def add_partitioning(
+        self, add_partitioning_request: "AddPartitioningRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def take_global_checkpoint(
-        self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
-    ) -> "TakeGlobalCheckpointResponse":
+    async def assign_port(
+        self, assign_port_request: "AssignPortRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
-    ) -> "EmptyReturn":
+    async def finalize_checkpoint(
+        self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
+    ) -> "FinalizeCheckpointResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatePythonExpressionResponse":
+    async def flush_network_buffer(
+        self, empty_request: "EmptyRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def console_message_triggered(
-        self, console_message_triggered_request: 
"ConsoleMessageTriggeredRequest"
+    async def initialize_executor(
+        self, initialize_executor_request: "InitializeExecutorRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def port_completed(
-        self, port_completed_request: "PortCompletedRequest"
+    async def open_executor(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def pause_worker(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerStateResponse":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def prepare_checkpoint(
+        self, prepare_checkpoint_request: "PrepareCheckpointRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def start_workflow(
+    async def query_statistics(
         self, empty_request: "EmptyRequest"
-    ) -> "StartWorkflowResponse":
+    ) -> "WorkerMetricsResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def resume_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def resume_worker(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerStateResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def pause_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def retrieve_state(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def worker_state_updated(
-        self, worker_state_updated_request: "WorkerStateUpdatedRequest"
-    ) -> "EmptyReturn":
+    async def retry_current_tuple(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def worker_execution_completed(
+    async def start_worker(
         self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
+    ) -> "WorkerStateResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def link_workers(
-        self, link_workers_request: "LinkWorkersRequest"
-    ) -> "EmptyReturn":
+    async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def controller_initiate_query_statistics(
-        self, query_statistics_request: "QueryStatisticsRequest"
+    async def start_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def end_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def debug_command(
+        self, debug_command_request: "DebugCommandRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retry_workflow(
-        self, retry_workflow_request: "RetryWorkflowRequest"
+    async def evaluate_python_expression(
+        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
+    ) -> "EvaluatedValue":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def update_executor(
+        self, update_executor_request: "UpdateExecutorRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_retrieve_workflow_state(
-        self,
-        stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
+    async def __rpc_add_input_channel(
+        self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retrieve_workflow_state(request)
+        response = await self.add_input_channel(request)
         await stream.send_message(response)
 
-    async def __rpc_propagate_embedded_control_message(
-        self,
-        stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, 
PropagateEmbeddedControlMessageResponse]",
+    async def __rpc_add_partitioning(
+        self, stream: "grpclib.server.Stream[AddPartitioningRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.propagate_embedded_control_message(request)
+        response = await self.add_partitioning(request)
         await stream.send_message(response)
 
-    async def __rpc_take_global_checkpoint(
+    async def __rpc_assign_port(
+        self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.assign_port(request)
+        await stream.send_message(response)
+
+    async def __rpc_finalize_checkpoint(
         self,
-        stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, 
TakeGlobalCheckpointResponse]",
+        stream: "grpclib.server.Stream[FinalizeCheckpointRequest, 
FinalizeCheckpointResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.take_global_checkpoint(request)
+        response = await self.finalize_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+    async def __rpc_flush_network_buffer(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.debug_command(request)
+        response = await self.flush_network_buffer(request)
         await stream.send_message(response)
 
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatePythonExpressionResponse]",
+    async def __rpc_initialize_executor(
+        self, stream: "grpclib.server.Stream[InitializeExecutorRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
+        response = await self.initialize_executor(request)
         await stream.send_message(response)
 
-    async def __rpc_console_message_triggered(
-        self,
-        stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, 
EmptyReturn]",
+    async def __rpc_open_executor(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.console_message_triggered(request)
+        response = await self.open_executor(request)
         await stream.send_message(response)
 
-    async def __rpc_port_completed(
-        self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
+    async def __rpc_pause_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.port_completed(request)
+        response = await self.pause_worker(request)
         await stream.send_message(response)
 
-    async def __rpc_start_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
StartWorkflowResponse]"
+    async def __rpc_prepare_checkpoint(
+        self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.start_workflow(request)
+        response = await self.prepare_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_resume_workflow(
+    async def __rpc_query_statistics(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerMetricsResponse]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.query_statistics(request)
+        await stream.send_message(response)
+
+    async def __rpc_resume_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.resume_worker(request)
+        await stream.send_message(response)
+
+    async def __rpc_retrieve_state(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.resume_workflow(request)
+        response = await self.retrieve_state(request)
         await stream.send_message(response)
 
-    async def __rpc_pause_workflow(
+    async def __rpc_retry_current_tuple(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.pause_workflow(request)
+        response = await self.retry_current_tuple(request)
         await stream.send_message(response)
 
-    async def __rpc_worker_state_updated(
-        self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, 
EmptyReturn]"
+    async def __rpc_start_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.worker_state_updated(request)
+        response = await self.start_worker(request)
         await stream.send_message(response)
 
-    async def __rpc_worker_execution_completed(
+    async def __rpc_end_worker(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.worker_execution_completed(request)
+        response = await self.end_worker(request)
         await stream.send_message(response)
 
-    async def __rpc_link_workers(
-        self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
+    async def __rpc_start_channel(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.link_workers(request)
+        response = await self.start_channel(request)
         await stream.send_message(response)
 
-    async def __rpc_controller_initiate_query_statistics(
-        self, stream: "grpclib.server.Stream[QueryStatisticsRequest, 
EmptyReturn]"
+    async def __rpc_end_channel(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.controller_initiate_query_statistics(request)
+        response = await self.end_channel(request)
         await stream.send_message(response)
 
-    async def __rpc_retry_workflow(
-        self, stream: "grpclib.server.Stream[RetryWorkflowRequest, 
EmptyReturn]"
+    async def __rpc_debug_command(
+        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retry_workflow(request)
+        response = await self.debug_command(request)
+        await stream.send_message(response)
+
+    async def __rpc_evaluate_python_expression(
+        self,
+        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatedValue]",
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.evaluate_python_expression(request)
+        await stream.send_message(response)
+
+    async def __rpc_no_operation(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.no_operation(request)
+        await stream.send_message(response)
+
+    async def __rpc_update_executor(
+        self, stream: "grpclib.server.Stream[UpdateExecutorRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.update_executor(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
-                self.__rpc_retrieve_workflow_state,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
+                self.__rpc_add_input_channel,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                RetrieveWorkflowStateResponse,
+                AddInputChannelRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
 grpclib.const.Handler(
-                self.__rpc_propagate_embedded_control_message,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
 grpclib.const.Handler(
+                self.__rpc_add_partitioning,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                PropagateEmbeddedControlMessageRequest,
-                PropagateEmbeddedControlMessageResponse,
+                AddPartitioningRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_take_global_checkpoint,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": 
grpclib.const.Handler(
+                self.__rpc_assign_port,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                TakeGlobalCheckpointRequest,
-                TakeGlobalCheckpointResponse,
+                AssignPortRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
 grpclib.const.Handler(
-                self.__rpc_debug_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_finalize_checkpoint,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
+                FinalizeCheckpointRequest,
+                FinalizeCheckpointResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
 grpclib.const.Handler(
+                self.__rpc_flush_network_buffer,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
 grpclib.const.Handler(
+                self.__rpc_initialize_executor,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatePythonExpressionResponse,
+                InitializeExecutorRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
 grpclib.const.Handler(
-                self.__rpc_console_message_triggered,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": 
grpclib.const.Handler(
+                self.__rpc_open_executor,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                ConsoleMessageTriggeredRequest,
+                EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
 grpclib.const.Handler(
-                self.__rpc_port_completed,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": 
grpclib.const.Handler(
+                self.__rpc_pause_worker,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                PortCompletedRequest,
+                EmptyRequest,
+                WorkerStateResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_prepare_checkpoint,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                PrepareCheckpointRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
 grpclib.const.Handler(
-                self.__rpc_start_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
 grpclib.const.Handler(
+                self.__rpc_query_statistics,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
-                StartWorkflowResponse,
+                WorkerMetricsResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
 grpclib.const.Handler(
-                self.__rpc_resume_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": 
grpclib.const.Handler(
+                self.__rpc_resume_worker,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                WorkerStateResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": 
grpclib.const.Handler(
+                self.__rpc_retrieve_state,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
 grpclib.const.Handler(
-                self.__rpc_pause_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
 grpclib.const.Handler(
+                self.__rpc_retry_current_tuple,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
 grpclib.const.Handler(
-                self.__rpc_worker_state_updated,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": 
grpclib.const.Handler(
+                self.__rpc_start_worker,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                WorkerStateUpdatedRequest,
+                EmptyRequest,
+                WorkerStateResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": 
grpclib.const.Handler(
+                self.__rpc_end_worker,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
 grpclib.const.Handler(
-                self.__rpc_worker_execution_completed,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": 
grpclib.const.Handler(
+                self.__rpc_start_channel,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
-                self.__rpc_link_workers,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": 
grpclib.const.Handler(
+                self.__rpc_end_channel,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                LinkWorkersRequest,
+                EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_controller_initiate_query_statistics,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": 
grpclib.const.Handler(
+                self.__rpc_debug_command,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                QueryStatisticsRequest,
+                DebugCommandRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
 grpclib.const.Handler(
-                self.__rpc_retry_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
 grpclib.const.Handler(
+                self.__rpc_evaluate_python_expression,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                RetryWorkflowRequest,
+                EvaluatePythonExpressionRequest,
+                EvaluatedValue,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": 
grpclib.const.Handler(
+                self.__rpc_no_operation,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
 grpclib.const.Handler(
+                self.__rpc_update_executor,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                UpdateExecutorRequest,
                 EmptyReturn,
             ),
         }
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
index 55c789aa39..8c1464cc76 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
@@ -18,6 +18,27 @@ from ..architecture import (
 )
 
 
+@dataclass(eq=False, repr=False)
+class Backpressure(betterproto.Message):
+    enable_backpressure: bool = betterproto.bool_field(1)
+
+
+@dataclass(eq=False, repr=False)
+class CreditUpdate(betterproto.Message):
+    pass
+
+
+@dataclass(eq=False, repr=False)
+class ActorCommand(betterproto.Message):
+    backpressure: "Backpressure" = betterproto.message_field(1, 
group="sealed_value")
+    credit_update: "CreditUpdate" = betterproto.message_field(2, 
group="sealed_value")
+
+
+@dataclass(eq=False, repr=False)
+class PythonActorMessage(betterproto.Message):
+    payload: "ActorCommand" = betterproto.message_field(1)
+
+
 @dataclass(eq=False, repr=False)
 class DirectControlMessagePayloadV2(betterproto.Message):
     control_invocation: "_architecture_rpc__.ControlInvocation" = (
@@ -133,24 +154,3 @@ class ExecutionMetadataStore(betterproto.Message):
     fatal_errors: List["__core__.WorkflowFatalError"] = 
betterproto.message_field(2)
     execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3)
     is_recovering: bool = betterproto.bool_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class Backpressure(betterproto.Message):
-    enable_backpressure: bool = betterproto.bool_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class CreditUpdate(betterproto.Message):
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class ActorCommand(betterproto.Message):
-    backpressure: "Backpressure" = betterproto.message_field(1, 
group="sealed_value")
-    credit_update: "CreditUpdate" = betterproto.message_field(2, 
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class PythonActorMessage(betterproto.Message):
-    payload: "ActorCommand" = betterproto.message_field(1)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 4d9a36bab4..2902173364 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -46,6 +46,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with DebugCommandHandler
     with TakeGlobalCheckpointHandler
     with EmbeddedControlMessageHandler
-    with RetrieveWorkflowStateHandler {
+    with RetrieveWorkflowStateHandler
+    with ReconfigurationHandler {
   val actorId: ActorVirtualIdentity = cp.actorId
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
new file mode 100644
index 0000000000..9470336efb
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, 
EmbeddedControlMessageIdentity}
+import 
org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer,
 ExecutionStatsUpdate}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 WorkflowReconfigureRequest}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
+trait ReconfigurationHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
+
+  override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx: 
AsyncRPCContext): Future[EmptyReturn] = {
+    
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
 msg).foreach{
+      friesComponent =>
+        if(friesComponent.scope.size == 1){
+          val updateExecutorRequest = friesComponent.reconfigurations.head
+          val workerIds = 
cp.workflowExecution.getLatestOperatorExecution(updateExecutorRequest.targetOpId).getWorkerIds
+          workerIds.foreach{
+            worker =>
+              workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker))
+          }
+        }else{
+          val channelScope = cp.workflowExecution.getRunningRegionExecutions
+            .flatMap(regionExecution =>
+              regionExecution.getAllLinkExecutions
+                .map(_._2)
+                .flatMap(linkExecution => 
linkExecution.getAllChannelExecutions.map(_._1))
+            ).filter(channelId => {
+              friesComponent.scope
+                
.contains(VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId)) &&
+                friesComponent.scope
+                  
.contains(VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId))
+            })
+          val controlChannels = friesComponent.sources.flatMap { source =>
+            
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.flatMap { 
worker =>
+              Seq(
+                ChannelIdentity(CONTROLLER, worker, isControl = true),
+                ChannelIdentity(worker, CONTROLLER, isControl = true)
+              )
+            }
+          }
+          val finalScope = channelScope ++ controlChannels
+          val cmdMapping =
+            friesComponent.reconfigurations
+              .flatMap { updateReq =>
+                val workers =
+                  cp.workflowExecution
+                    .getLatestOperatorExecution(updateReq.targetOpId)
+                    .getWorkerIds
+
+                workers.map { worker =>
+                  worker.name ->
+                    createInvocation(
+                      METHOD_UPDATE_EXECUTOR.getBareMethodName,
+                      updateReq.newExecInitInfo,
+                      worker
+                    )
+                }
+              }
+              .groupBy(_._1)
+              .map {
+                case (worker, entries) =>
+                  worker -> entries.head._2
+              }
+          friesComponent.sources.foreach { source =>
+            
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { 
worker =>
+              sendECM(
+                EmbeddedControlMessageIdentity(msg.reconfigurationId),
+                ALL_ALIGNMENT,
+                finalScope.toSet,
+                cmdMapping,
+                ChannelIdentity(actorId, worker, isControl = true)
+              )
+            }
+          }
+        }
+
+    }
+    EmptyReturn()
+  }
+
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index 2abcdf6697..545d0c60f1 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -20,18 +20,17 @@
 package org.apache.texera.amber.engine.architecture.worker
 
 import com.twitter.util.Future
+import org.apache.texera.amber.core.executor.{ExecFactory, OpExecInitInfo, 
OpExecSource, OpExecWithClassName, OpExecWithCode}
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  AsyncRPCContext,
-  DebugCommandRequest,
-  EmptyRequest,
-  EvaluatePythonExpressionRequest
-}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 DebugCommandRequest, EmptyRequest, EvaluatePythonExpressionRequest}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, 
EvaluatedValue}
 import 
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceFs2Grpc
 import org.apache.texera.amber.engine.architecture.worker.promisehandlers._
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCHandlerInitializer
+import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec
+
+import java.net.URI
 
 class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
     extends AsyncRPCHandlerInitializer(dp.asyncRPCClient, dp.asyncRPCServer)
@@ -52,7 +51,8 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
     with FlushNetworkBufferHandler
     with RetrieveStateHandler
     with PrepareCheckpointHandler
-    with FinalizeCheckpointHandler {
+    with FinalizeCheckpointHandler
+    with UpdateExecutorHandler {
   val actorId: ActorVirtualIdentity = dp.actorId
 
   override def debugCommand(
@@ -69,4 +69,17 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
     ???
 
   override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): 
Future[EmptyReturn] = ???
+
+  def initializeExecutor(execInitInfo: OpExecInitInfo): Unit = {
+    dp.executor = execInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
+      case OpExecWithCode(code, _) =>
+        ExecFactory.newExecFromJavaCode(code)
+      case OpExecSource(storageUri, _) =>
+        new CacheSourceOpExec(URI.create(storageUri))
+      case OpExecInitInfo.Empty =>
+        throw new IllegalArgumentException("Empty executor initialization 
info")
+    }
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index bf45d8eff9..1204710181 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -54,5 +54,4 @@ trait InitializeExecutorHandler {
     }
     EmptyReturn()
   }
-
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
similarity index 50%
copy from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
copy to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index bf45d8eff9..7f9e1fa31d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -20,39 +20,44 @@
 package org.apache.texera.amber.engine.architecture.worker.promisehandlers
 
 import com.twitter.util.Future
-import org.apache.texera.amber.core.executor._
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  AsyncRPCContext,
-  InitializeExecutorRequest
-}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 UpdateExecutorRequest}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
-import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec
-import org.apache.texera.amber.util.VirtualIdentityUtils
-
-import java.net.URI
+import scala.reflect.runtime.universe._
 
-trait InitializeExecutorHandler {
+trait UpdateExecutorHandler {
   this: DataProcessorRPCHandlerInitializer =>
 
-  override def initializeExecutor(
-      req: InitializeExecutorRequest,
-      ctx: AsyncRPCContext
-  ): Future[EmptyReturn] = {
-    dp.serializationManager.setOpInitialization(req)
-    val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
-    val workerCount = req.totalWorkerCount
-    dp.executor = req.opExecInitInfo match {
-      case OpExecWithClassName(className, descString) =>
-        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
-      case OpExecWithCode(code, _) =>
-        ExecFactory.newExecFromJavaCode(code)
-      case OpExecSource(storageUri, _) =>
-        new CacheSourceOpExec(URI.create(storageUri))
-      case OpExecInitInfo.Empty =>
-        throw new IllegalArgumentException("Empty executor initialization 
info")
-    }
+  override def updateExecutor(
+                               request: UpdateExecutorRequest,
+                               ctx: AsyncRPCContext
+                           ): Future[EmptyReturn] = {
+    val oldOpExecState = dp.executor
+    initializeExecutor(request.newExecInitInfo)
+    dp.executor.open()
+    copyMatchingFields(oldOpExecState, dp.executor) //TBD if we really need 
this
     EmptyReturn()
   }
 
+  private[this] def copyMatchingFields[A: TypeTag, B: TypeTag](from: A, to: 
B): Unit = {
+    val mirror = runtimeMirror(from.getClass.getClassLoader)
+
+    val fromFields = typeOf[A].members.collect {
+      case m: MethodSymbol if m.isGetter => m
+    }
+
+    val toFields = typeOf[B].members.collect {
+      case m: MethodSymbol if m.isVar => m
+    }.map(m => m.name.toString -> m).toMap
+
+    fromFields.foreach { f =>
+      val name = f.name.toString
+      toFields.get(name).foreach { setter =>
+        if (f.returnType =:= setter.returnType) {
+          val value = mirror.reflect(from).reflectMethod(f).apply()
+          mirror.reflect(to).reflectField(setter).set(value)
+        }
+      }
+    }
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
similarity index 71%
rename from 
amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
index c2a15106b1..f1c82e6343 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.texera.web.service
+package org.apache.texera.amber.engine.common
 
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.core.workflow.PhysicalPlan
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  ModifyLogicRequest,
-  PropagateEmbeddedControlMessageRequest
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
 }
 import org.apache.texera.amber.engine.architecture.scheduling.{Region, 
WorkflowExecutionCoordinator}
 import org.jgrapht.alg.connectivity.ConnectivityInspector
@@ -34,28 +34,32 @@ import scala.jdk.CollectionConverters.SetHasAsScala
 
 object FriesReconfigurationAlgorithm {
 
+  case class FriesComponent(
+      sources: Set[PhysicalOpIdentity],
+      scope: Set[PhysicalOpIdentity],
+      reconfigurations: Set[UpdateExecutorRequest])
+
   private def getOneToManyOperators(region: Region): Set[PhysicalOpIdentity] = 
{
     region.getOperators.filter(op => op.isOneToManyOp).map(op => op.id)
   }
 
-  def scheduleReconfigurations(
-      workflowExecutionCoordinator: WorkflowExecutionCoordinator,
-      reconfiguration: ModifyLogicRequest,
-      epochMarkerId: String
-  ): Set[PropagateEmbeddedControlMessageRequest] = {
+  def getReconfigurations(
+                                workflowExecutionCoordinator: 
WorkflowExecutionCoordinator,
+                                reconfiguration: WorkflowReconfigureRequest
+  ): Set[FriesComponent] = {
     // independently schedule reconfigurations for each region:
     workflowExecutionCoordinator.getExecutingRegions
-      .flatMap(region => computeMCS(region, reconfiguration, epochMarkerId))
+      .flatMap(region => computeMCS(region, reconfiguration, 
reconfiguration.reconfigurationId))
   }
 
   private def computeMCS(
       region: Region,
-      reconfiguration: ModifyLogicRequest,
+      reconfiguration: WorkflowReconfigureRequest,
       epochMarkerId: String
-  ): List[PropagateEmbeddedControlMessageRequest] = {
+  ): List[FriesComponent] = {
 
     // add all reconfiguration operators to M
-    val reconfigOps = reconfiguration.updateRequest.map(req => 
req.targetOpId).toSet
+    val reconfigOps = reconfiguration.reconfiguration.map(req => 
req.targetOpId).toSet
     val M = mutable.Set.empty ++ reconfigOps
 
     // for each one-to-many operator, add it to M if its downstream has a 
reconfiguration operator
@@ -101,30 +105,20 @@ object FriesReconfigurationAlgorithm {
 
     // find the MCS components,
     // for each component, send an epoch marker to each of its source operators
-    val epochMarkers = new 
ArrayBuffer[PropagateEmbeddedControlMessageRequest]()
+    val epochMarkers = new ArrayBuffer[FriesComponent]()
 
     val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets()
     connectedSets.forEach(component => {
       val componentSet = component.asScala.toSet
       val componentPlan = mcsPlan.getSubPlan(componentSet)
-
-      // generate the reconfiguration command for this component
-      //      val reconfigCommands =
-      //        reconfiguration.updateRequest
-      //          .filter(req => component.contains(req.targetOpId))
-      //      val reconfigTargets = reconfigCommands.map(_.targetOpId)
-      //
-      //      // find the source operators of the component
-      //      val sources = 
componentSet.intersect(mcsPlan.getSourceOperatorIds)
-      //      epochMarkers += PropagateEmbeddedControlMessageRequest(
-      //        sources.toSeq,
-      //        EmbeddedControlMessageIdentity(epochMarkerId),
-      //        ALL_ALIGNMENT,
-      //        componentPlan.operators.map(_.id).toSeq,
-      //        reconfigTargets,
-      //        ModifyLogicRequest(reconfigCommands),
-      //        METHOD_MODIFY_LOGIC.getBareMethodName
-      //      )
+      val reconfigCommands =
+              reconfiguration.reconfiguration
+                .filter(req => component.contains(req.targetOpId))
+                .toSet
+
+            // find the source operators of the component
+            val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds)
+            epochMarkers += FriesComponent(sources, 
componentPlan.operators.map(_.id), reconfigCommands)
     })
     epochMarkers.toList
   }
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 85a62b77a3..f8b1868941 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -37,7 +37,7 @@ storage {
                 username = "texera"
                 username = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME}
 
-                password = "password"
+                password = "123456"
                 password = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD}
             }
         }
@@ -134,7 +134,7 @@ storage {
         username = "postgres"
         username = ${?STORAGE_JDBC_USERNAME}
 
-        password = "postgres"
+        password = "123456"
         password = ${?STORAGE_JDBC_PASSWORD}
     }
 }
diff --git a/common/config/src/main/resources/udf.conf 
b/common/config/src/main/resources/udf.conf
index 7212d1720e..c009aed0de 100644
--- a/common/config/src/main/resources/udf.conf
+++ b/common/config/src/main/resources/udf.conf
@@ -17,7 +17,7 @@
 
 python {
     # python3 executable path
-    path = ""
+    path = "C:\\Users\\12198\\IdeaProjects\\texera\\.venv\\Scripts\\python.exe"
     path = ${?UDF_PYTHON_PATH}
 
     log {

Reply via email to