This is an automated email from the ASF dual-hosted git repository.

shengquan pushed a commit to branch shengquan-add-reconfigration
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/shengquan-add-reconfigration 
by this push:
     new 4e59c4d285 fix python proto
4e59c4d285 is described below

commit 4e59c4d285b28e0567e1041b9e55c3d594d873eb
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Feb 14 19:56:52 2026 -0800

    fix python proto
---
 .../amber/engine/architecture/rpc/__init__.py      | 1526 ++++++++++----------
 .../apache/texera/amber/engine/common/__init__.py  |   42 +-
 2 files changed, 786 insertions(+), 782 deletions(-)

diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index f7b89f13e6..b341f70081 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -22,6 +22,7 @@ from .. import (
     worker as _worker__,
 )
 
+
 if TYPE_CHECKING:
     import grpclib.server
     from betterproto.grpc.grpclib_client import MetadataLike
@@ -91,6 +92,9 @@ class ControlRequest(betterproto.Message):
     link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
         9, group="sealed_value"
     )
+    workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
+        betterproto.message_field(10, group="sealed_value")
+    )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
     )
@@ -504,692 +508,692 @@ class WorkerMetricsResponse(betterproto.Message):
     metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1)
 
 
-class ControllerServiceStub(betterproto.ServiceStub):
-    async def retrieve_workflow_state(
+class RpcTesterStub(betterproto.ServiceStub):
+    async def send_ping(
         self,
-        empty_request: "EmptyRequest",
+        ping: "Ping",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "RetrieveWorkflowStateResponse":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
-            empty_request,
-            RetrieveWorkflowStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
+            ping,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def propagate_embedded_control_message(
+    async def send_pong(
         self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
+        pong: "Pong",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "PropagateEmbeddedControlMessageResponse":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
-            propagate_embedded_control_message_request,
-            PropagateEmbeddedControlMessageResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
+            pong,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def take_global_checkpoint(
+    async def send_nested(
         self,
-        take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
+        nested: "Nested",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "TakeGlobalCheckpointResponse":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
-            take_global_checkpoint_request,
-            TakeGlobalCheckpointResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
+            nested,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def debug_command(
+    async def send_pass(
         self,
-        debug_command_request: "DebugCommandRequest",
+        pass_: "Pass",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
-            debug_command_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
+            pass_,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def evaluate_python_expression(
+    async def send_error_command(
         self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+        error_command: "ErrorCommand",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatePythonExpressionResponse":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatePythonExpressionResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
+            error_command,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def console_message_triggered(
+    async def send_recursion(
         self,
-        console_message_triggered_request: "ConsoleMessageTriggeredRequest",
+        recursion: "Recursion",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
-            console_message_triggered_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
+            recursion,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def port_completed(
+    async def send_collect(
         self,
-        port_completed_request: "PortCompletedRequest",
+        collect: "Collect",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
-            port_completed_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
+            collect,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def start_workflow(
+    async def send_generate_number(
         self,
-        empty_request: "EmptyRequest",
+        generate_number: "GenerateNumber",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StartWorkflowResponse":
+    ) -> "IntResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
-            empty_request,
-            StartWorkflowResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
+            generate_number,
+            IntResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def resume_workflow(
+    async def send_multi_call(
         self,
-        empty_request: "EmptyRequest",
+        multi_call: "MultiCall",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
+            multi_call,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def pause_workflow(
+    async def send_chain(
         self,
-        empty_request: "EmptyRequest",
+        chain: "Chain",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StringResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
+            chain,
+            StringResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def worker_state_updated(
+
+class WorkerServiceStub(betterproto.ServiceStub):
+    async def add_input_channel(
         self,
-        worker_state_updated_request: "WorkerStateUpdatedRequest",
+        add_input_channel_request: "AddInputChannelRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
-            worker_state_updated_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
+            add_input_channel_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def worker_execution_completed(
+    async def add_partitioning(
         self,
-        empty_request: "EmptyRequest",
+        add_partitioning_request: "AddPartitioningRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
+            add_partitioning_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def link_workers(
+    async def assign_port(
         self,
-        link_workers_request: "LinkWorkersRequest",
+        assign_port_request: "AssignPortRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
-            link_workers_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
+            assign_port_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def controller_initiate_query_statistics(
+    async def finalize_checkpoint(
         self,
-        query_statistics_request: "QueryStatisticsRequest",
+        finalize_checkpoint_request: "FinalizeCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "FinalizeCheckpointResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
-            query_statistics_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
+            finalize_checkpoint_request,
+            FinalizeCheckpointResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retry_workflow(
+    async def flush_network_buffer(
         self,
-        retry_workflow_request: "RetryWorkflowRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
-            retry_workflow_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def reconfigure_workflow(
+    async def initialize_executor(
         self,
-        workflow_reconfigure_request: "WorkflowReconfigureRequest",
+        initialize_executor_request: "InitializeExecutorRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
-            workflow_reconfigure_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
+            initialize_executor_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-
-class RpcTesterStub(betterproto.ServiceStub):
-    async def send_ping(
+    async def open_executor(
         self,
-        ping: "Ping",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
-            ping,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_pong(
+    async def pause_worker(
         self,
-        pong: "Pong",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
-            pong,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
+            empty_request,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_nested(
+    async def prepare_checkpoint(
         self,
-        nested: "Nested",
+        prepare_checkpoint_request: "PrepareCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
-            nested,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
+            prepare_checkpoint_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_pass(
+    async def query_statistics(
         self,
-        pass_: "Pass",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "WorkerMetricsResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
-            pass_,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
+            empty_request,
+            WorkerMetricsResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_error_command(
+    async def resume_worker(
         self,
-        error_command: "ErrorCommand",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
-            error_command,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
+            empty_request,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_recursion(
+    async def retrieve_state(
         self,
-        recursion: "Recursion",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
-            recursion,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_collect(
+    async def retry_current_tuple(
         self,
-        collect: "Collect",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
-            collect,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_generate_number(
+    async def start_worker(
         self,
-        generate_number: "GenerateNumber",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
+    ) -> "WorkerStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
-            generate_number,
-            IntResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
+            empty_request,
+            WorkerStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_multi_call(
+    async def end_worker(
         self,
-        multi_call: "MultiCall",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
-            multi_call,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def send_chain(
+    async def start_channel(
         self,
-        chain: "Chain",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
-            chain,
-            StringResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-
-class WorkerServiceStub(betterproto.ServiceStub):
-    async def add_input_channel(
+    async def end_channel(
         self,
-        add_input_channel_request: "AddInputChannelRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
-            add_input_channel_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
+            empty_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def add_partitioning(
+    async def debug_command(
         self,
-        add_partitioning_request: "AddPartitioningRequest",
+        debug_command_request: "DebugCommandRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
-            add_partitioning_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
+            debug_command_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def assign_port(
+    async def evaluate_python_expression(
         self,
-        assign_port_request: "AssignPortRequest",
+        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "EvaluatedValue":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
-            assign_port_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
+            evaluate_python_expression_request,
+            EvaluatedValue,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def finalize_checkpoint(
+    async def no_operation(
         self,
-        finalize_checkpoint_request: "FinalizeCheckpointRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "FinalizeCheckpointResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
-            finalize_checkpoint_request,
-            FinalizeCheckpointResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
+            empty_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def flush_network_buffer(
+    async def update_executor(
         self,
-        empty_request: "EmptyRequest",
+        update_executor_request: "UpdateExecutorRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
+            update_executor_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def initialize_executor(
+
+class ControllerServiceStub(betterproto.ServiceStub):
+    async def retrieve_workflow_state(
         self,
-        initialize_executor_request: "InitializeExecutorRequest",
+        empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "RetrieveWorkflowStateResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
-            initialize_executor_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
+            empty_request,
+            RetrieveWorkflowStateResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def open_executor(
+    async def propagate_embedded_control_message(
         self,
-        empty_request: "EmptyRequest",
+        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "PropagateEmbeddedControlMessageResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
-            empty_request,
-            EmptyReturn,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
+            propagate_embedded_control_message_request,
+            PropagateEmbeddedControlMessageResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def pause_worker(
+    async def take_global_checkpoint(
         self,
-        empty_request: "EmptyRequest",
+        take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "TakeGlobalCheckpointResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
-            empty_request,
-            WorkerStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
+            take_global_checkpoint_request,
+            TakeGlobalCheckpointResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def prepare_checkpoint(
+    async def debug_command(
         self,
-        prepare_checkpoint_request: "PrepareCheckpointRequest",
+        debug_command_request: "DebugCommandRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
-            prepare_checkpoint_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
+            debug_command_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def query_statistics(
+    async def evaluate_python_expression(
         self,
-        empty_request: "EmptyRequest",
+        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerMetricsResponse":
+    ) -> "EvaluatePythonExpressionResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
-            empty_request,
-            WorkerMetricsResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
+            evaluate_python_expression_request,
+            EvaluatePythonExpressionResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def resume_worker(
+    async def console_message_triggered(
         self,
-        empty_request: "EmptyRequest",
+        console_message_triggered_request: "ConsoleMessageTriggeredRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
-            empty_request,
-            WorkerStateResponse,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
+            console_message_triggered_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retrieve_state(
+    async def port_completed(
         self,
-        empty_request: "EmptyRequest",
+        port_completed_request: "PortCompletedRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
+            port_completed_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def retry_current_tuple(
+    async def start_workflow(
         self,
         empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
+    ) -> "StartWorkflowResponse":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
             empty_request,
-            EmptyReturn,
+            StartWorkflowResponse,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def start_worker(
+    async def resume_workflow(
         self,
         empty_request: "EmptyRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
             empty_request,
-            WorkerStateResponse,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def end_worker(
+    async def pause_workflow(
         self,
         empty_request: "EmptyRequest",
         *,
@@ -1198,7 +1202,7 @@ class WorkerServiceStub(betterproto.ServiceStub):
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
             empty_request,
             EmptyReturn,
             timeout=timeout,
@@ -1206,24 +1210,24 @@ class WorkerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
-    async def start_channel(
+    async def worker_state_updated(
         self,
-        empty_request: "EmptyRequest",
+        worker_state_updated_request: "WorkerStateUpdatedRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
+            worker_state_updated_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def end_channel(
+    async def worker_execution_completed(
         self,
         empty_request: "EmptyRequest",
         *,
@@ -1232,7 +1236,7 @@ class WorkerServiceStub(betterproto.ServiceStub):
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
             empty_request,
             EmptyReturn,
             timeout=timeout,
@@ -1240,68 +1244,68 @@ class WorkerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
-    async def debug_command(
+    async def link_workers(
         self,
-        debug_command_request: "DebugCommandRequest",
+        link_workers_request: "LinkWorkersRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
-            debug_command_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
+            link_workers_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def evaluate_python_expression(
+    async def controller_initiate_query_statistics(
         self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
+        query_statistics_request: "QueryStatisticsRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatedValue":
+    ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatedValue,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
+            query_statistics_request,
+            EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def no_operation(
+    async def retry_workflow(
         self,
-        empty_request: "EmptyRequest",
+        retry_workflow_request: "RetryWorkflowRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
-            empty_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
+            retry_workflow_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
             metadata=metadata,
         )
 
-    async def update_executor(
+    async def reconfigure_workflow(
         self,
-        update_executor_request: "UpdateExecutorRequest",
+        workflow_reconfigure_request: "WorkflowReconfigureRequest",
         *,
         timeout: Optional[float] = None,
         deadline: Optional["Deadline"] = None,
         metadata: Optional["MetadataLike"] = None
     ) -> "EmptyReturn":
         return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
-            update_executor_request,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
+            workflow_reconfigure_request,
             EmptyReturn,
             timeout=timeout,
             deadline=deadline,
@@ -1309,842 +1313,842 @@ class WorkerServiceStub(betterproto.ServiceStub):
         )
 
 
-class ControllerServiceBase(ServiceBase):
+class RpcTesterBase(ServiceBase):
 
-    async def retrieve_workflow_state(
-        self, empty_request: "EmptyRequest"
-    ) -> "RetrieveWorkflowStateResponse":
+    async def send_ping(self, ping: "Ping") -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def propagate_embedded_control_message(
-        self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
-    ) -> "PropagateEmbeddedControlMessageResponse":
+    async def send_pong(self, pong: "Pong") -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def take_global_checkpoint(
-        self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
-    ) -> "TakeGlobalCheckpointResponse":
+    async def send_nested(self, nested: "Nested") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatePythonExpressionResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def console_message_triggered(
-        self, console_message_triggered_request: 
"ConsoleMessageTriggeredRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def port_completed(
-        self, port_completed_request: "PortCompletedRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def start_workflow(
-        self, empty_request: "EmptyRequest"
-    ) -> "StartWorkflowResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def resume_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def pause_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def send_pass(self, pass_: "Pass") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def worker_state_updated(
-        self, worker_state_updated_request: "WorkerStateUpdatedRequest"
-    ) -> "EmptyReturn":
+    async def send_error_command(
+        self, error_command: "ErrorCommand"
+    ) -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def worker_execution_completed(
-        self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
+    async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def link_workers(
-        self, link_workers_request: "LinkWorkersRequest"
-    ) -> "EmptyReturn":
+    async def send_collect(self, collect: "Collect") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def controller_initiate_query_statistics(
-        self, query_statistics_request: "QueryStatisticsRequest"
-    ) -> "EmptyReturn":
+    async def send_generate_number(
+        self, generate_number: "GenerateNumber"
+    ) -> "IntResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retry_workflow(
-        self, retry_workflow_request: "RetryWorkflowRequest"
-    ) -> "EmptyReturn":
+    async def send_multi_call(self, multi_call: "MultiCall") -> 
"StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def reconfigure_workflow(
-        self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
-    ) -> "EmptyReturn":
+    async def send_chain(self, chain: "Chain") -> "StringResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_retrieve_workflow_state(
-        self,
-        stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.retrieve_workflow_state(request)
-        await stream.send_message(response)
-
-    async def __rpc_propagate_embedded_control_message(
-        self,
-        stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, 
PropagateEmbeddedControlMessageResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.propagate_embedded_control_message(request)
-        await stream.send_message(response)
-
-    async def __rpc_take_global_checkpoint(
-        self,
-        stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, 
TakeGlobalCheckpointResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.take_global_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.debug_command(request)
-        await stream.send_message(response)
-
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatePythonExpressionResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
-        await stream.send_message(response)
-
-    async def __rpc_console_message_triggered(
-        self,
-        stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, 
EmptyReturn]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.console_message_triggered(request)
-        await stream.send_message(response)
-
-    async def __rpc_port_completed(
-        self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
+    async def __rpc_send_ping(
+        self, stream: "grpclib.server.Stream[Ping, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.port_completed(request)
+        response = await self.send_ping(request)
         await stream.send_message(response)
 
-    async def __rpc_start_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
StartWorkflowResponse]"
+    async def __rpc_send_pong(
+        self, stream: "grpclib.server.Stream[Pong, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.start_workflow(request)
+        response = await self.send_pong(request)
         await stream.send_message(response)
 
-    async def __rpc_resume_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_nested(
+        self, stream: "grpclib.server.Stream[Nested, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.resume_workflow(request)
+        response = await self.send_nested(request)
         await stream.send_message(response)
 
-    async def __rpc_pause_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_pass(
+        self, stream: "grpclib.server.Stream[Pass, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.pause_workflow(request)
+        response = await self.send_pass(request)
         await stream.send_message(response)
 
-    async def __rpc_worker_state_updated(
-        self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, 
EmptyReturn]"
+    async def __rpc_send_error_command(
+        self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.worker_state_updated(request)
+        response = await self.send_error_command(request)
         await stream.send_message(response)
 
-    async def __rpc_worker_execution_completed(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_send_recursion(
+        self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.worker_execution_completed(request)
+        response = await self.send_recursion(request)
         await stream.send_message(response)
 
-    async def __rpc_link_workers(
-        self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
+    async def __rpc_send_collect(
+        self, stream: "grpclib.server.Stream[Collect, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.link_workers(request)
+        response = await self.send_collect(request)
         await stream.send_message(response)
 
-    async def __rpc_controller_initiate_query_statistics(
-        self, stream: "grpclib.server.Stream[QueryStatisticsRequest, 
EmptyReturn]"
+    async def __rpc_send_generate_number(
+        self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.controller_initiate_query_statistics(request)
+        response = await self.send_generate_number(request)
         await stream.send_message(response)
 
-    async def __rpc_retry_workflow(
-        self, stream: "grpclib.server.Stream[RetryWorkflowRequest, 
EmptyReturn]"
+    async def __rpc_send_multi_call(
+        self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retry_workflow(request)
+        response = await self.send_multi_call(request)
         await stream.send_message(response)
 
-    async def __rpc_reconfigure_workflow(
-        self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, 
EmptyReturn]"
+    async def __rpc_send_chain(
+        self, stream: "grpclib.server.Stream[Chain, StringResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.reconfigure_workflow(request)
+        response = await self.send_chain(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
-                self.__rpc_retrieve_workflow_state,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                RetrieveWorkflowStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
 grpclib.const.Handler(
-                self.__rpc_propagate_embedded_control_message,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                PropagateEmbeddedControlMessageRequest,
-                PropagateEmbeddedControlMessageResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_take_global_checkpoint,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": 
grpclib.const.Handler(
+                self.__rpc_send_ping,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                TakeGlobalCheckpointRequest,
-                TakeGlobalCheckpointResponse,
+                Ping,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
 grpclib.const.Handler(
-                self.__rpc_debug_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": 
grpclib.const.Handler(
+                self.__rpc_send_pong,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
-                EmptyReturn,
+                Pong,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": 
grpclib.const.Handler(
+                self.__rpc_send_nested,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatePythonExpressionResponse,
+                Nested,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
 grpclib.const.Handler(
-                self.__rpc_console_message_triggered,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": 
grpclib.const.Handler(
+                self.__rpc_send_pass,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                ConsoleMessageTriggeredRequest,
-                EmptyReturn,
+                Pass,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
 grpclib.const.Handler(
-                self.__rpc_port_completed,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": 
grpclib.const.Handler(
+                self.__rpc_send_error_command,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                PortCompletedRequest,
-                EmptyReturn,
+                ErrorCommand,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
 grpclib.const.Handler(
-                self.__rpc_start_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": 
grpclib.const.Handler(
+                self.__rpc_send_recursion,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                StartWorkflowResponse,
+                Recursion,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
 grpclib.const.Handler(
-                self.__rpc_resume_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": 
grpclib.const.Handler(
+                self.__rpc_send_collect,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                Collect,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
 grpclib.const.Handler(
-                self.__rpc_pause_workflow,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
 grpclib.const.Handler(
+                self.__rpc_send_generate_number,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
+                GenerateNumber,
+                IntResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
 grpclib.const.Handler(
-                self.__rpc_worker_state_updated,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": 
grpclib.const.Handler(
+                self.__rpc_send_multi_call,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                WorkerStateUpdatedRequest,
-                EmptyReturn,
+                MultiCall,
+                StringResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
 grpclib.const.Handler(
-                self.__rpc_worker_execution_completed,
+            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": 
grpclib.const.Handler(
+                self.__rpc_send_chain,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
-                self.__rpc_link_workers,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                LinkWorkersRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_controller_initiate_query_statistics,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                QueryStatisticsRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
 grpclib.const.Handler(
-                self.__rpc_retry_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                RetryWorkflowRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
 grpclib.const.Handler(
-                self.__rpc_reconfigure_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                WorkflowReconfigureRequest,
-                EmptyReturn,
+                Chain,
+                StringResponse,
             ),
         }
 
 
-class RpcTesterBase(ServiceBase):
+class WorkerServiceBase(ServiceBase):
 
-    async def send_ping(self, ping: "Ping") -> "IntResponse":
+    async def add_input_channel(
+        self, add_input_channel_request: "AddInputChannelRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_pong(self, pong: "Pong") -> "IntResponse":
+    async def add_partitioning(
+        self, add_partitioning_request: "AddPartitioningRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_nested(self, nested: "Nested") -> "StringResponse":
+    async def assign_port(
+        self, assign_port_request: "AssignPortRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_pass(self, pass_: "Pass") -> "StringResponse":
+    async def finalize_checkpoint(
+        self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
+    ) -> "FinalizeCheckpointResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_error_command(
-        self, error_command: "ErrorCommand"
-    ) -> "StringResponse":
+    async def flush_network_buffer(
+        self, empty_request: "EmptyRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
+    async def initialize_executor(
+        self, initialize_executor_request: "InitializeExecutorRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_collect(self, collect: "Collect") -> "StringResponse":
+    async def open_executor(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_generate_number(
-        self, generate_number: "GenerateNumber"
-    ) -> "IntResponse":
+    async def pause_worker(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerStateResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_multi_call(self, multi_call: "MultiCall") -> 
"StringResponse":
+    async def prepare_checkpoint(
+        self, prepare_checkpoint_request: "PrepareCheckpointRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def send_chain(self, chain: "Chain") -> "StringResponse":
+    async def query_statistics(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerMetricsResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_send_ping(
-        self, stream: "grpclib.server.Stream[Ping, IntResponse]"
+    async def resume_worker(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerStateResponse":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def retrieve_state(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def retry_current_tuple(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def start_worker(
+        self, empty_request: "EmptyRequest"
+    ) -> "WorkerStateResponse":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def start_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def end_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def debug_command(
+        self, debug_command_request: "DebugCommandRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def evaluate_python_expression(
+        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
+    ) -> "EvaluatedValue":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def update_executor(
+        self, update_executor_request: "UpdateExecutorRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
+    async def __rpc_add_input_channel(
+        self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_ping(request)
+        response = await self.add_input_channel(request)
         await stream.send_message(response)
 
-    async def __rpc_send_pong(
-        self, stream: "grpclib.server.Stream[Pong, IntResponse]"
+    async def __rpc_add_partitioning(
+        self, stream: "grpclib.server.Stream[AddPartitioningRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_pong(request)
+        response = await self.add_partitioning(request)
         await stream.send_message(response)
 
-    async def __rpc_send_nested(
-        self, stream: "grpclib.server.Stream[Nested, StringResponse]"
+    async def __rpc_assign_port(
+        self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_nested(request)
+        response = await self.assign_port(request)
         await stream.send_message(response)
 
-    async def __rpc_send_pass(
-        self, stream: "grpclib.server.Stream[Pass, StringResponse]"
+    async def __rpc_finalize_checkpoint(
+        self,
+        stream: "grpclib.server.Stream[FinalizeCheckpointRequest, 
FinalizeCheckpointResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_pass(request)
+        response = await self.finalize_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_send_error_command(
-        self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
+    async def __rpc_flush_network_buffer(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_error_command(request)
+        response = await self.flush_network_buffer(request)
         await stream.send_message(response)
 
-    async def __rpc_send_recursion(
-        self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
+    async def __rpc_initialize_executor(
+        self, stream: "grpclib.server.Stream[InitializeExecutorRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_recursion(request)
+        response = await self.initialize_executor(request)
         await stream.send_message(response)
 
-    async def __rpc_send_collect(
-        self, stream: "grpclib.server.Stream[Collect, StringResponse]"
+    async def __rpc_open_executor(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_collect(request)
+        response = await self.open_executor(request)
         await stream.send_message(response)
 
-    async def __rpc_send_generate_number(
-        self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
+    async def __rpc_pause_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_generate_number(request)
+        response = await self.pause_worker(request)
         await stream.send_message(response)
 
-    async def __rpc_send_multi_call(
-        self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
+    async def __rpc_prepare_checkpoint(
+        self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_multi_call(request)
+        response = await self.prepare_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_send_chain(
-        self, stream: "grpclib.server.Stream[Chain, StringResponse]"
+    async def __rpc_query_statistics(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerMetricsResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.send_chain(request)
+        response = await self.query_statistics(request)
+        await stream.send_message(response)
+
+    async def __rpc_resume_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.resume_worker(request)
+        await stream.send_message(response)
+
+    async def __rpc_retrieve_state(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.retrieve_state(request)
+        await stream.send_message(response)
+
+    async def __rpc_retry_current_tuple(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.retry_current_tuple(request)
+        await stream.send_message(response)
+
+    async def __rpc_start_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.start_worker(request)
+        await stream.send_message(response)
+
+    async def __rpc_end_worker(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.end_worker(request)
+        await stream.send_message(response)
+
+    async def __rpc_start_channel(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.start_channel(request)
+        await stream.send_message(response)
+
+    async def __rpc_end_channel(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.end_channel(request)
+        await stream.send_message(response)
+
+    async def __rpc_debug_command(
+        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.debug_command(request)
+        await stream.send_message(response)
+
+    async def __rpc_evaluate_python_expression(
+        self,
+        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatedValue]",
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.evaluate_python_expression(request)
+        await stream.send_message(response)
+
+    async def __rpc_no_operation(
+        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.no_operation(request)
+        await stream.send_message(response)
+
+    async def __rpc_update_executor(
+        self, stream: "grpclib.server.Stream[UpdateExecutorRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.update_executor(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": 
grpclib.const.Handler(
-                self.__rpc_send_ping,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
+                self.__rpc_add_input_channel,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Ping,
-                IntResponse,
+                AddInputChannelRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": 
grpclib.const.Handler(
-                self.__rpc_send_pong,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
 grpclib.const.Handler(
+                self.__rpc_add_partitioning,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Pong,
-                IntResponse,
+                AddPartitioningRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": 
grpclib.const.Handler(
-                self.__rpc_send_nested,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": 
grpclib.const.Handler(
+                self.__rpc_assign_port,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Nested,
-                StringResponse,
+                AssignPortRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": 
grpclib.const.Handler(
-                self.__rpc_send_pass,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_finalize_checkpoint,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Pass,
-                StringResponse,
+                FinalizeCheckpointRequest,
+                FinalizeCheckpointResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": 
grpclib.const.Handler(
-                self.__rpc_send_error_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
 grpclib.const.Handler(
+                self.__rpc_flush_network_buffer,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                ErrorCommand,
-                StringResponse,
+                EmptyRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": 
grpclib.const.Handler(
-                self.__rpc_send_recursion,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
 grpclib.const.Handler(
+                self.__rpc_initialize_executor,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Recursion,
-                StringResponse,
+                InitializeExecutorRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": 
grpclib.const.Handler(
-                self.__rpc_send_collect,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": 
grpclib.const.Handler(
+                self.__rpc_open_executor,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Collect,
-                StringResponse,
+                EmptyRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
 grpclib.const.Handler(
-                self.__rpc_send_generate_number,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": 
grpclib.const.Handler(
+                self.__rpc_pause_worker,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                GenerateNumber,
-                IntResponse,
+                EmptyRequest,
+                WorkerStateResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": 
grpclib.const.Handler(
-                self.__rpc_send_multi_call,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_prepare_checkpoint,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                MultiCall,
-                StringResponse,
+                PrepareCheckpointRequest,
+                EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": 
grpclib.const.Handler(
-                self.__rpc_send_chain,
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
 grpclib.const.Handler(
+                self.__rpc_query_statistics,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                Chain,
-                StringResponse,
+                EmptyRequest,
+                WorkerMetricsResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": 
grpclib.const.Handler(
+                self.__rpc_resume_worker,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                WorkerStateResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": 
grpclib.const.Handler(
+                self.__rpc_retrieve_state,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
 grpclib.const.Handler(
+                self.__rpc_retry_current_tuple,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": 
grpclib.const.Handler(
+                self.__rpc_start_worker,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                WorkerStateResponse,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": 
grpclib.const.Handler(
+                self.__rpc_end_worker,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": 
grpclib.const.Handler(
+                self.__rpc_start_channel,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": 
grpclib.const.Handler(
+                self.__rpc_end_channel,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": 
grpclib.const.Handler(
+                self.__rpc_debug_command,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                DebugCommandRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
 grpclib.const.Handler(
+                self.__rpc_evaluate_python_expression,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EvaluatePythonExpressionRequest,
+                EvaluatedValue,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": 
grpclib.const.Handler(
+                self.__rpc_no_operation,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                EmptyRequest,
+                EmptyReturn,
+            ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
 grpclib.const.Handler(
+                self.__rpc_update_executor,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                UpdateExecutorRequest,
+                EmptyReturn,
             ),
         }
 
 
-class WorkerServiceBase(ServiceBase):
-
-    async def add_input_channel(
-        self, add_input_channel_request: "AddInputChannelRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def add_partitioning(
-        self, add_partitioning_request: "AddPartitioningRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+class ControllerServiceBase(ServiceBase):
 
-    async def assign_port(
-        self, assign_port_request: "AssignPortRequest"
-    ) -> "EmptyReturn":
+    async def retrieve_workflow_state(
+        self, empty_request: "EmptyRequest"
+    ) -> "RetrieveWorkflowStateResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def finalize_checkpoint(
-        self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
-    ) -> "FinalizeCheckpointResponse":
+    async def propagate_embedded_control_message(
+        self,
+        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
+    ) -> "PropagateEmbeddedControlMessageResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def flush_network_buffer(
-        self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
+    async def take_global_checkpoint(
+        self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
+    ) -> "TakeGlobalCheckpointResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def initialize_executor(
-        self, initialize_executor_request: "InitializeExecutorRequest"
+    async def debug_command(
+        self, debug_command_request: "DebugCommandRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def open_executor(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def evaluate_python_expression(
+        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
+    ) -> "EvaluatePythonExpressionResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def pause_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
+    async def console_message_triggered(
+        self, console_message_triggered_request: 
"ConsoleMessageTriggeredRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def prepare_checkpoint(
-        self, prepare_checkpoint_request: "PrepareCheckpointRequest"
+    async def port_completed(
+        self, port_completed_request: "PortCompletedRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def query_statistics(
+    async def start_workflow(
         self, empty_request: "EmptyRequest"
-    ) -> "WorkerMetricsResponse":
+    ) -> "StartWorkflowResponse":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def resume_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
+    async def resume_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retrieve_state(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def pause_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def retry_current_tuple(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
+    async def worker_state_updated(
+        self, worker_state_updated_request: "WorkerStateUpdatedRequest"
+    ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def start_worker(
+    async def worker_execution_completed(
         self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def start_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def end_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatedValue":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def update_executor(
-        self, update_executor_request: "UpdateExecutorRequest"
+    async def link_workers(
+        self, link_workers_request: "LinkWorkersRequest"
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_add_input_channel(
-        self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_input_channel(request)
-        await stream.send_message(response)
-
-    async def __rpc_add_partitioning(
-        self, stream: "grpclib.server.Stream[AddPartitioningRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_partitioning(request)
-        await stream.send_message(response)
+    async def controller_initiate_query_statistics(
+        self, query_statistics_request: "QueryStatisticsRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_assign_port(
-        self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.assign_port(request)
-        await stream.send_message(response)
+    async def retry_workflow(
+        self, retry_workflow_request: "RetryWorkflowRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_finalize_checkpoint(
-        self,
-        stream: "grpclib.server.Stream[FinalizeCheckpointRequest, 
FinalizeCheckpointResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.finalize_checkpoint(request)
-        await stream.send_message(response)
+    async def reconfigure_workflow(
+        self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
-    async def __rpc_flush_network_buffer(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_retrieve_workflow_state(
+        self,
+        stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.flush_network_buffer(request)
+        response = await self.retrieve_workflow_state(request)
         await stream.send_message(response)
 
-    async def __rpc_initialize_executor(
-        self, stream: "grpclib.server.Stream[InitializeExecutorRequest, 
EmptyReturn]"
+    async def __rpc_propagate_embedded_control_message(
+        self,
+        stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, 
PropagateEmbeddedControlMessageResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.initialize_executor(request)
+        response = await self.propagate_embedded_control_message(request)
         await stream.send_message(response)
 
-    async def __rpc_open_executor(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_take_global_checkpoint(
+        self,
+        stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, 
TakeGlobalCheckpointResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.open_executor(request)
+        response = await self.take_global_checkpoint(request)
         await stream.send_message(response)
 
-    async def __rpc_pause_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    async def __rpc_debug_command(
+        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.pause_worker(request)
+        response = await self.debug_command(request)
         await stream.send_message(response)
 
-    async def __rpc_prepare_checkpoint(
-        self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, 
EmptyReturn]"
+    async def __rpc_evaluate_python_expression(
+        self,
+        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatePythonExpressionResponse]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.prepare_checkpoint(request)
+        response = await self.evaluate_python_expression(request)
         await stream.send_message(response)
 
-    async def __rpc_query_statistics(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerMetricsResponse]"
+    async def __rpc_console_message_triggered(
+        self,
+        stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, 
EmptyReturn]",
     ) -> None:
         request = await stream.recv_message()
-        response = await self.query_statistics(request)
+        response = await self.console_message_triggered(request)
         await stream.send_message(response)
 
-    async def __rpc_resume_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
+    async def __rpc_port_completed(
+        self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.resume_worker(request)
+        response = await self.port_completed(request)
         await stream.send_message(response)
 
-    async def __rpc_retrieve_state(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_start_workflow(
+        self, stream: "grpclib.server.Stream[EmptyRequest, 
StartWorkflowResponse]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retrieve_state(request)
+        response = await self.start_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_retry_current_tuple(
+    async def __rpc_resume_workflow(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.retry_current_tuple(request)
-        await stream.send_message(response)
-
-    async def __rpc_start_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.start_worker(request)
+        response = await self.resume_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_end_worker(
+    async def __rpc_pause_workflow(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.end_worker(request)
+        response = await self.pause_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_start_channel(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_worker_state_updated(
+        self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.start_channel(request)
+        response = await self.worker_state_updated(request)
         await stream.send_message(response)
 
-    async def __rpc_end_channel(
+    async def __rpc_worker_execution_completed(
         self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.end_channel(request)
+        response = await self.worker_execution_completed(request)
         await stream.send_message(response)
 
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
+    async def __rpc_link_workers(
+        self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.debug_command(request)
+        response = await self.link_workers(request)
         await stream.send_message(response)
 
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatedValue]",
+    async def __rpc_controller_initiate_query_statistics(
+        self, stream: "grpclib.server.Stream[QueryStatisticsRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
+        response = await self.controller_initiate_query_statistics(request)
         await stream.send_message(response)
 
-    async def __rpc_no_operation(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
+    async def __rpc_retry_workflow(
+        self, stream: "grpclib.server.Stream[RetryWorkflowRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.no_operation(request)
+        response = await self.retry_workflow(request)
         await stream.send_message(response)
 
-    async def __rpc_update_executor(
-        self, stream: "grpclib.server.Stream[UpdateExecutorRequest, 
EmptyReturn]"
+    async def __rpc_reconfigure_workflow(
+        self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, 
EmptyReturn]"
     ) -> None:
         request = await stream.recv_message()
-        response = await self.update_executor(request)
+        response = await self.reconfigure_workflow(request)
         await stream.send_message(response)
 
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
-                self.__rpc_add_input_channel,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AddInputChannelRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
 grpclib.const.Handler(
-                self.__rpc_add_partitioning,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
+                self.__rpc_retrieve_workflow_state,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                AddPartitioningRequest,
-                EmptyReturn,
+                EmptyRequest,
+                RetrieveWorkflowStateResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": 
grpclib.const.Handler(
-                self.__rpc_assign_port,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
 grpclib.const.Handler(
+                self.__rpc_propagate_embedded_control_message,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                AssignPortRequest,
-                EmptyReturn,
+                PropagateEmbeddedControlMessageRequest,
+                PropagateEmbeddedControlMessageResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_finalize_checkpoint,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
 grpclib.const.Handler(
+                self.__rpc_take_global_checkpoint,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                FinalizeCheckpointRequest,
-                FinalizeCheckpointResponse,
+                TakeGlobalCheckpointRequest,
+                TakeGlobalCheckpointResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
 grpclib.const.Handler(
-                self.__rpc_flush_network_buffer,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
 grpclib.const.Handler(
+                self.__rpc_debug_command,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
+                DebugCommandRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
 grpclib.const.Handler(
-                self.__rpc_initialize_executor,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
 grpclib.const.Handler(
+                self.__rpc_evaluate_python_expression,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                InitializeExecutorRequest,
-                EmptyReturn,
+                EvaluatePythonExpressionRequest,
+                EvaluatePythonExpressionResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": 
grpclib.const.Handler(
-                self.__rpc_open_executor,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
 grpclib.const.Handler(
+                self.__rpc_console_message_triggered,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
+                ConsoleMessageTriggeredRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": 
grpclib.const.Handler(
-                self.__rpc_pause_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_prepare_checkpoint,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
 grpclib.const.Handler(
+                self.__rpc_port_completed,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                PrepareCheckpointRequest,
+                PortCompletedRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_query_statistics,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerMetricsResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": 
grpclib.const.Handler(
-                self.__rpc_resume_worker,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
 grpclib.const.Handler(
+                self.__rpc_start_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
-                WorkerStateResponse,
+                StartWorkflowResponse,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": 
grpclib.const.Handler(
-                self.__rpc_retrieve_state,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
 grpclib.const.Handler(
+                self.__rpc_resume_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
 grpclib.const.Handler(
-                self.__rpc_retry_current_tuple,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
 grpclib.const.Handler(
+                self.__rpc_pause_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": 
grpclib.const.Handler(
-                self.__rpc_start_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": 
grpclib.const.Handler(
-                self.__rpc_end_worker,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
 grpclib.const.Handler(
+                self.__rpc_worker_state_updated,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
+                WorkerStateUpdatedRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": 
grpclib.const.Handler(
-                self.__rpc_start_channel,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
 grpclib.const.Handler(
+                self.__rpc_worker_execution_completed,
                 grpclib.const.Cardinality.UNARY_UNARY,
                 EmptyRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": 
grpclib.const.Handler(
-                self.__rpc_end_channel,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
+                self.__rpc_link_workers,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
+                LinkWorkersRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": 
grpclib.const.Handler(
-                self.__rpc_debug_command,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
 grpclib.const.Handler(
+                self.__rpc_controller_initiate_query_statistics,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
+                QueryStatisticsRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatedValue,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": 
grpclib.const.Handler(
-                self.__rpc_no_operation,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
 grpclib.const.Handler(
+                self.__rpc_retry_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
+                RetryWorkflowRequest,
                 EmptyReturn,
             ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
 grpclib.const.Handler(
-                self.__rpc_update_executor,
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
 grpclib.const.Handler(
+                self.__rpc_reconfigure_workflow,
                 grpclib.const.Cardinality.UNARY_UNARY,
-                UpdateExecutorRequest,
+                WorkflowReconfigureRequest,
                 EmptyReturn,
             ),
         }
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
index 8c1464cc76..55c789aa39 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
@@ -18,27 +18,6 @@ from ..architecture import (
 )
 
 
-@dataclass(eq=False, repr=False)
-class Backpressure(betterproto.Message):
-    enable_backpressure: bool = betterproto.bool_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class CreditUpdate(betterproto.Message):
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class ActorCommand(betterproto.Message):
-    backpressure: "Backpressure" = betterproto.message_field(1, 
group="sealed_value")
-    credit_update: "CreditUpdate" = betterproto.message_field(2, 
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class PythonActorMessage(betterproto.Message):
-    payload: "ActorCommand" = betterproto.message_field(1)
-
-
 @dataclass(eq=False, repr=False)
 class DirectControlMessagePayloadV2(betterproto.Message):
     control_invocation: "_architecture_rpc__.ControlInvocation" = (
@@ -154,3 +133,24 @@ class ExecutionMetadataStore(betterproto.Message):
     fatal_errors: List["__core__.WorkflowFatalError"] = 
betterproto.message_field(2)
     execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3)
     is_recovering: bool = betterproto.bool_field(4)
+
+
+@dataclass(eq=False, repr=False)
+class Backpressure(betterproto.Message):
+    enable_backpressure: bool = betterproto.bool_field(1)
+
+
+@dataclass(eq=False, repr=False)
+class CreditUpdate(betterproto.Message):
+    pass
+
+
+@dataclass(eq=False, repr=False)
+class ActorCommand(betterproto.Message):
+    backpressure: "Backpressure" = betterproto.message_field(1, 
group="sealed_value")
+    credit_update: "CreditUpdate" = betterproto.message_field(2, 
group="sealed_value")
+
+
+@dataclass(eq=False, repr=False)
+class PythonActorMessage(betterproto.Message):
+    payload: "ActorCommand" = betterproto.message_field(1)

Reply via email to