This is an automated email from the ASF dual-hosted git repository. shengquan pushed a commit to branch shengquan-add-reconfigration in repository https://gitbox.apache.org/repos/asf/texera.git
commit f5a0c38a84d6852e9af52bf163acc0b4756db70c Author: Shengquan Ni <[email protected]> AuthorDate: Sun Feb 1 19:22:17 2026 -0800 add rough impl --- .../engine/architecture/rpc/controlcommands.proto | 21 +- .../architecture/rpc/controllerservice.proto | 1 + .../engine/architecture/rpc/workerservice.proto | 1 + .../handlers/control/update_executor_handler.py | 29 +- .../amber/engine/architecture/rpc/__init__.py | 1527 ++++++++++---------- .../apache/texera/amber/engine/common/__init__.py | 42 +- .../ControllerAsyncRPCHandlerInitializer.scala | 3 +- .../promisehandlers/ReconfigurationHandler.scala | 105 ++ .../DataProcessorRPCHandlerInitializer.scala | 27 +- .../InitializeExecutorHandler.scala | 1 - ...orHandler.scala => UpdateExecutorHandler.scala} | 59 +- .../common}/FriesReconfigurationAlgorithm.scala | 56 +- common/config/src/main/resources/storage.conf | 4 +- common/config/src/main/resources/udf.conf | 2 +- 14 files changed, 1010 insertions(+), 868 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index ed17be236d..2ce7f9be8e 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -25,7 +25,6 @@ import "org/apache/texera/amber/engine/architecture/worker/statistics.proto"; import "org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto"; import "scalapb/scalapb.proto"; import "google/protobuf/timestamp.proto"; -import "google/protobuf/any.proto"; option (scalapb.options) = { scope: FILE, @@ -40,12 +39,11 @@ message ControlRequest { TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2; DebugCommandRequest debugCommandRequest = 3; EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4; - ModifyLogicRequest modifyLogicRequest = 5; - RetryWorkflowRequest retryWorkflowRequest = 6; - ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8; - PortCompletedRequest portCompletedRequest = 9; - WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; - LinkWorkersRequest linkWorkersRequest = 11; + RetryWorkflowRequest retryWorkflowRequest = 5; + ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 6; + PortCompletedRequest portCompletedRequest = 7; + WorkerStateUpdatedRequest workerStateUpdatedRequest = 8; + LinkWorkersRequest linkWorkersRequest = 9; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -119,7 +117,7 @@ message TakeGlobalCheckpointRequest { } message WorkflowReconfigureRequest{ - ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true]; + repeated UpdateExecutorRequest reconfiguration = 1; string reconfigurationId = 2; } @@ -134,10 +132,6 @@ message EvaluatePythonExpressionRequest { string operatorId = 2; } -message ModifyLogicRequest { - repeated UpdateExecutorRequest updateRequest = 1; -} - message RetryWorkflowRequest { repeated core.ActorVirtualIdentity workers = 1; } @@ -260,8 +254,7 @@ message InitializeExecutorRequest { message UpdateExecutorRequest { core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true]; - google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true]; - google.protobuf.Any stateTransferFunc = 3; + core.OpExecInitInfo newExecInitInfo = 2; } message PrepareCheckpointRequest{ diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a341..27b4727ee9 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -45,4 +45,5 @@ service ControllerService { rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn); rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn); rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn); + rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn); } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e..21944ffefc 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -50,4 +50,5 @@ service WorkerService { rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); + rpc UpdateExecutor(UpdateExecutorRequest) returns (EmptyReturn); } \ No newline at end of file diff --git a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py index fe33b2dec0..a5bc1a6c0f 100644 --- a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py @@ -1,28 +1 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# from proto.org.apache.texera.amber.engine.architecture.worker import UpdateExecutorV2 -# from core.architecture.handlers.control.control_handler_base import ControlHandler -# from core.architecture.managers.context import Context -# -# -# class UpdateExecutorHandler(ControlHandler): -# cmd = UpdateExecutorV2 -# -# def __call__(self, context: Context, command: cmd, *args, **kwargs): -# context.executor_manager.update_executor(command.code, command.is_source) -# return None +# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from core.architecture.handlers.control.control_handler_base import ControlHandler from proto.org.apache.texera.amber.engine.architecture.rpc import ( EmptyReturn, UpdateExecutorRequest, ) class Update ExecutorHandler(ControlHandler): async def update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn: self.context.executor_manager.update_executor(req.code, req.is_source) return EmptyReturn() \ No newline at end of file diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py index ea6ddc5e43..f7b89f13e6 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py @@ -13,7 +13,6 @@ from typing import ( ) import betterproto -import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf import grpclib from betterproto.grpc.grpclib_server import ServiceBase @@ -23,7 +22,6 @@ from .. import ( worker as _worker__, ) - if TYPE_CHECKING: import grpclib.server from betterproto.grpc.grpclib_client import MetadataLike @@ -78,23 +76,20 @@ class ControlRequest(betterproto.Message): evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = ( betterproto.message_field(4, group="sealed_value") ) - modify_logic_request: "ModifyLogicRequest" = betterproto.message_field( - 5, group="sealed_value" - ) retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field( - 6, group="sealed_value" + 5, group="sealed_value" ) console_message_triggered_request: "ConsoleMessageTriggeredRequest" = ( - betterproto.message_field(8, group="sealed_value") + betterproto.message_field(6, group="sealed_value") ) port_completed_request: "PortCompletedRequest" = betterproto.message_field( - 9, group="sealed_value" + 7, group="sealed_value" ) worker_state_updated_request: "WorkerStateUpdatedRequest" = ( - betterproto.message_field(10, group="sealed_value") + betterproto.message_field(8, group="sealed_value") ) link_workers_request: "LinkWorkersRequest" = betterproto.message_field( - 11, group="sealed_value" + 9, group="sealed_value" ) add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( 50, group="sealed_value" @@ -192,7 +187,7 @@ class TakeGlobalCheckpointRequest(betterproto.Message): @dataclass(eq=False, repr=False) class WorkflowReconfigureRequest(betterproto.Message): - reconfiguration: "ModifyLogicRequest" = betterproto.message_field(1) + reconfiguration: List["UpdateExecutorRequest"] = betterproto.message_field(1) reconfiguration_id: str = betterproto.string_field(2) @@ -208,11 +203,6 @@ class EvaluatePythonExpressionRequest(betterproto.Message): operator_id: str = betterproto.string_field(2) -@dataclass(eq=False, repr=False) -class ModifyLogicRequest(betterproto.Message): - update_request: List["UpdateExecutorRequest"] = betterproto.message_field(1) - - @dataclass(eq=False, repr=False) class RetryWorkflowRequest(betterproto.Message): workers: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) @@ -366,10 +356,7 @@ class InitializeExecutorRequest(betterproto.Message): @dataclass(eq=False, repr=False) class UpdateExecutorRequest(betterproto.Message): target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1) - new_executor: "betterproto_lib_google_protobuf.Any" = betterproto.message_field(2) - state_transfer_func: "betterproto_lib_google_protobuf.Any" = ( - betterproto.message_field(3) - ) + new_exec_init_info: "___core__.OpExecInitInfo" = betterproto.message_field(2) @dataclass(eq=False, repr=False) @@ -517,503 +504,522 @@ class WorkerMetricsResponse(betterproto.Message): metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1) -class RpcTesterStub(betterproto.ServiceStub): - async def send_ping( +class ControllerServiceStub(betterproto.ServiceStub): + async def retrieve_workflow_state( self, - ping: "Ping", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "RetrieveWorkflowStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing", - ping, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState", + empty_request, + RetrieveWorkflowStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_pong( + async def propagate_embedded_control_message( self, - pong: "Pong", + propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "PropagateEmbeddedControlMessageResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong", - pong, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage", + propagate_embedded_control_message_request, + PropagateEmbeddedControlMessageResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_nested( + async def take_global_checkpoint( self, - nested: "Nested", + take_global_checkpoint_request: "TakeGlobalCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "TakeGlobalCheckpointResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested", - nested, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint", + take_global_checkpoint_request, + TakeGlobalCheckpointResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_pass( + async def debug_command( self, - pass_: "Pass", + debug_command_request: "DebugCommandRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass", - pass_, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand", + debug_command_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_error_command( + async def evaluate_python_expression( self, - error_command: "ErrorCommand", + evaluate_python_expression_request: "EvaluatePythonExpressionRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EvaluatePythonExpressionResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand", - error_command, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression", + evaluate_python_expression_request, + EvaluatePythonExpressionResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_recursion( + async def console_message_triggered( self, - recursion: "Recursion", + console_message_triggered_request: "ConsoleMessageTriggeredRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion", - recursion, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered", + console_message_triggered_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_collect( + async def port_completed( self, - collect: "Collect", + port_completed_request: "PortCompletedRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect", - collect, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted", + port_completed_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_generate_number( + async def start_workflow( self, - generate_number: "GenerateNumber", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "IntResponse": + ) -> "StartWorkflowResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber", - generate_number, - IntResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow", + empty_request, + StartWorkflowResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_multi_call( + async def resume_workflow( self, - multi_call: "MultiCall", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall", - multi_call, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow", + empty_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def send_chain( + async def pause_workflow( self, - chain: "Chain", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StringResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain", - chain, - StringResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow", + empty_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - -class WorkerServiceStub(betterproto.ServiceStub): - async def add_input_channel( + async def worker_state_updated( self, - add_input_channel_request: "AddInputChannelRequest", + worker_state_updated_request: "WorkerStateUpdatedRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel", - add_input_channel_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated", + worker_state_updated_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def add_partitioning( + async def worker_execution_completed( self, - add_partitioning_request: "AddPartitioningRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning", - add_partitioning_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def assign_port( + async def link_workers( self, - assign_port_request: "AssignPortRequest", + link_workers_request: "LinkWorkersRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort", - assign_port_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers", + link_workers_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def finalize_checkpoint( + async def controller_initiate_query_statistics( self, - finalize_checkpoint_request: "FinalizeCheckpointRequest", + query_statistics_request: "QueryStatisticsRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "FinalizeCheckpointResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint", - finalize_checkpoint_request, - FinalizeCheckpointResponse, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics", + query_statistics_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def flush_network_buffer( + async def retry_workflow( self, - empty_request: "EmptyRequest", + retry_workflow_request: "RetryWorkflowRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow", + retry_workflow_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def initialize_executor( + async def reconfigure_workflow( self, - initialize_executor_request: "InitializeExecutorRequest", + workflow_reconfigure_request: "WorkflowReconfigureRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor", - initialize_executor_request, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow", + workflow_reconfigure_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def open_executor( + +class RpcTesterStub(betterproto.ServiceStub): + async def send_ping( self, - empty_request: "EmptyRequest", + ping: "Ping", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing", + ping, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def pause_worker( + async def send_pong( self, - empty_request: "EmptyRequest", + pong: "Pong", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong", + pong, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def prepare_checkpoint( + async def send_nested( self, - prepare_checkpoint_request: "PrepareCheckpointRequest", + nested: "Nested", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", - prepare_checkpoint_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested", + nested, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def query_statistics( + async def send_pass( self, - empty_request: "EmptyRequest", + pass_: "Pass", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerMetricsResponse": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics", - empty_request, - WorkerMetricsResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass", + pass_, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def resume_worker( + async def send_error_command( self, - empty_request: "EmptyRequest", + error_command: "ErrorCommand", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand", + error_command, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retrieve_state( + async def send_recursion( self, - empty_request: "EmptyRequest", + recursion: "Recursion", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState", - empty_request, - EmptyReturn, - timeout=timeout, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion", + recursion, + StringResponse, + timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retry_current_tuple( + async def send_collect( self, - empty_request: "EmptyRequest", + collect: "Collect", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect", + collect, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_worker( + async def send_generate_number( self, - empty_request: "EmptyRequest", + generate_number: "GenerateNumber", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "WorkerStateResponse": + ) -> "IntResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker", - empty_request, - WorkerStateResponse, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber", + generate_number, + IntResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def end_worker( + async def send_multi_call( self, - empty_request: "EmptyRequest", + multi_call: "MultiCall", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "StringResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker", - empty_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall", + multi_call, + StringResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_channel( + async def send_chain( self, - empty_request: "EmptyRequest", + chain: "Chain", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "StringResponse": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain", + chain, + StringResponse, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + + +class WorkerServiceStub(betterproto.ServiceStub): + async def add_input_channel( + self, + add_input_channel_request: "AddInputChannelRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel", + add_input_channel_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def end_channel( + async def add_partitioning( self, - empty_request: "EmptyRequest", + add_partitioning_request: "AddPartitioningRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning", + add_partitioning_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def debug_command( + async def assign_port( self, - debug_command_request: "DebugCommandRequest", + assign_port_request: "AssignPortRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand", - debug_command_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort", + assign_port_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def evaluate_python_expression( + async def finalize_checkpoint( self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", + finalize_checkpoint_request: "FinalizeCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EvaluatedValue": + ) -> "FinalizeCheckpointResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatedValue, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint", + finalize_checkpoint_request, + FinalizeCheckpointResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def no_operation( + async def flush_network_buffer( self, empty_request: "EmptyRequest", *, @@ -1022,7 +1028,7 @@ class WorkerServiceStub(betterproto.ServiceStub): metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer", empty_request, EmptyReturn, timeout=timeout, @@ -1030,145 +1036,160 @@ class WorkerServiceStub(betterproto.ServiceStub): metadata=metadata, ) + async def initialize_executor( + self, + initialize_executor_request: "InitializeExecutorRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor", + initialize_executor_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) -class ControllerServiceStub(betterproto.ServiceStub): - async def retrieve_workflow_state( + async def open_executor( self, empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "RetrieveWorkflowStateResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor", empty_request, - RetrieveWorkflowStateResponse, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def propagate_embedded_control_message( + async def pause_worker( self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "PropagateEmbeddedControlMessageResponse": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage", - propagate_embedded_control_message_request, - PropagateEmbeddedControlMessageResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker", + empty_request, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def take_global_checkpoint( + async def prepare_checkpoint( self, - take_global_checkpoint_request: "TakeGlobalCheckpointRequest", + prepare_checkpoint_request: "PrepareCheckpointRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "TakeGlobalCheckpointResponse": + ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint", - take_global_checkpoint_request, - TakeGlobalCheckpointResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", + prepare_checkpoint_request, + EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def debug_command( + async def query_statistics( self, - debug_command_request: "DebugCommandRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "WorkerMetricsResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand", - debug_command_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics", + empty_request, + WorkerMetricsResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def evaluate_python_expression( + async def resume_worker( self, - evaluate_python_expression_request: "EvaluatePythonExpressionRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EvaluatePythonExpressionResponse": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression", - evaluate_python_expression_request, - EvaluatePythonExpressionResponse, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker", + empty_request, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def console_message_triggered( + async def retrieve_state( self, - console_message_triggered_request: "ConsoleMessageTriggeredRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered", - console_message_triggered_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def port_completed( + async def retry_current_tuple( self, - port_completed_request: "PortCompletedRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted", - port_completed_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def start_workflow( + async def start_worker( self, empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "StartWorkflowResponse": + ) -> "WorkerStateResponse": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker", empty_request, - StartWorkflowResponse, + WorkerStateResponse, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def resume_workflow( + async def end_worker( self, empty_request: "EmptyRequest", *, @@ -1177,7 +1198,7 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker", empty_request, EmptyReturn, timeout=timeout, @@ -1185,7 +1206,7 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata=metadata, ) - async def pause_workflow( + async def start_channel( self, empty_request: "EmptyRequest", *, @@ -1194,7 +1215,7 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow", + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel", empty_request, EmptyReturn, timeout=timeout, @@ -1202,85 +1223,85 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata=metadata, ) - async def worker_state_updated( + async def end_channel( self, - worker_state_updated_request: "WorkerStateUpdatedRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated", - worker_state_updated_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def worker_execution_completed( + async def debug_command( self, - empty_request: "EmptyRequest", + debug_command_request: "DebugCommandRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted", - empty_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand", + debug_command_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def link_workers( + async def evaluate_python_expression( self, - link_workers_request: "LinkWorkersRequest", + evaluate_python_expression_request: "EvaluatePythonExpressionRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "EvaluatedValue": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers", - link_workers_request, - EmptyReturn, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression", + evaluate_python_expression_request, + EvaluatedValue, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def controller_initiate_query_statistics( + async def no_operation( self, - query_statistics_request: "QueryStatisticsRequest", + empty_request: "EmptyRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics", - query_statistics_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation", + empty_request, EmptyReturn, timeout=timeout, deadline=deadline, metadata=metadata, ) - async def retry_workflow( + async def update_executor( self, - retry_workflow_request: "RetryWorkflowRequest", + update_executor_request: "UpdateExecutorRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None ) -> "EmptyReturn": return await self._unary_unary( - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow", - retry_workflow_request, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor", + update_executor_request, EmptyReturn, timeout=timeout, deadline=deadline, @@ -1288,806 +1309,842 @@ class ControllerServiceStub(betterproto.ServiceStub): ) -class RpcTesterBase(ServiceBase): +class ControllerServiceBase(ServiceBase): - async def send_ping(self, ping: "Ping") -> "IntResponse": + async def retrieve_workflow_state( + self, empty_request: "EmptyRequest" + ) -> "RetrieveWorkflowStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_pong(self, pong: "Pong") -> "IntResponse": + async def propagate_embedded_control_message( + self, + propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", + ) -> "PropagateEmbeddedControlMessageResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_nested(self, nested: "Nested") -> "StringResponse": + async def take_global_checkpoint( + self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest" + ) -> "TakeGlobalCheckpointResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_pass(self, pass_: "Pass") -> "StringResponse": + async def debug_command( + self, debug_command_request: "DebugCommandRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_error_command( - self, error_command: "ErrorCommand" - ) -> "StringResponse": + async def evaluate_python_expression( + self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" + ) -> "EvaluatePythonExpressionResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_recursion(self, recursion: "Recursion") -> "StringResponse": + async def console_message_triggered( + self, console_message_triggered_request: "ConsoleMessageTriggeredRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_collect(self, collect: "Collect") -> "StringResponse": + async def port_completed( + self, port_completed_request: "PortCompletedRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_generate_number( - self, generate_number: "GenerateNumber" - ) -> "IntResponse": + async def start_workflow( + self, empty_request: "EmptyRequest" + ) -> "StartWorkflowResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_multi_call(self, multi_call: "MultiCall") -> "StringResponse": + async def resume_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def send_chain(self, chain: "Chain") -> "StringResponse": + async def pause_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_send_ping( - self, stream: "grpclib.server.Stream[Ping, IntResponse]" + async def worker_state_updated( + self, worker_state_updated_request: "WorkerStateUpdatedRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def worker_execution_completed( + self, empty_request: "EmptyRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def link_workers( + self, link_workers_request: "LinkWorkersRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def controller_initiate_query_statistics( + self, query_statistics_request: "QueryStatisticsRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def retry_workflow( + self, retry_workflow_request: "RetryWorkflowRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def reconfigure_workflow( + self, workflow_reconfigure_request: "WorkflowReconfigureRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def __rpc_retrieve_workflow_state( + self, + stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_ping(request) + response = await self.retrieve_workflow_state(request) await stream.send_message(response) - async def __rpc_send_pong( - self, stream: "grpclib.server.Stream[Pong, IntResponse]" + async def __rpc_propagate_embedded_control_message( + self, + stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, PropagateEmbeddedControlMessageResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_pong(request) + response = await self.propagate_embedded_control_message(request) await stream.send_message(response) - async def __rpc_send_nested( - self, stream: "grpclib.server.Stream[Nested, StringResponse]" + async def __rpc_take_global_checkpoint( + self, + stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, TakeGlobalCheckpointResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_nested(request) + response = await self.take_global_checkpoint(request) await stream.send_message(response) - async def __rpc_send_pass( - self, stream: "grpclib.server.Stream[Pass, StringResponse]" + async def __rpc_debug_command( + self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_pass(request) + response = await self.debug_command(request) await stream.send_message(response) - async def __rpc_send_error_command( - self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]" + async def __rpc_evaluate_python_expression( + self, + stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatePythonExpressionResponse]", ) -> None: request = await stream.recv_message() - response = await self.send_error_command(request) + response = await self.evaluate_python_expression(request) await stream.send_message(response) - async def __rpc_send_recursion( - self, stream: "grpclib.server.Stream[Recursion, StringResponse]" + async def __rpc_console_message_triggered( + self, + stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, EmptyReturn]", ) -> None: request = await stream.recv_message() - response = await self.send_recursion(request) + response = await self.console_message_triggered(request) await stream.send_message(response) - async def __rpc_send_collect( - self, stream: "grpclib.server.Stream[Collect, StringResponse]" + async def __rpc_port_completed( + self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_collect(request) + response = await self.port_completed(request) await stream.send_message(response) - async def __rpc_send_generate_number( - self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]" + async def __rpc_start_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, StartWorkflowResponse]" ) -> None: request = await stream.recv_message() - response = await self.send_generate_number(request) + response = await self.start_workflow(request) await stream.send_message(response) - async def __rpc_send_multi_call( - self, stream: "grpclib.server.Stream[MultiCall, StringResponse]" + async def __rpc_resume_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_multi_call(request) + response = await self.resume_workflow(request) await stream.send_message(response) - async def __rpc_send_chain( - self, stream: "grpclib.server.Stream[Chain, StringResponse]" + async def __rpc_pause_workflow( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.send_chain(request) + response = await self.pause_workflow(request) + await stream.send_message(response) + + async def __rpc_worker_state_updated( + self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.worker_state_updated(request) + await stream.send_message(response) + + async def __rpc_worker_execution_completed( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.worker_execution_completed(request) + await stream.send_message(response) + + async def __rpc_link_workers( + self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.link_workers(request) + await stream.send_message(response) + + async def __rpc_controller_initiate_query_statistics( + self, stream: "grpclib.server.Stream[QueryStatisticsRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.controller_initiate_query_statistics(request) + await stream.send_message(response) + + async def __rpc_retry_workflow( + self, stream: "grpclib.server.Stream[RetryWorkflowRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.retry_workflow(request) + await stream.send_message(response) + + async def __rpc_reconfigure_workflow( + self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.reconfigure_workflow(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": grpclib.const.Handler( - self.__rpc_send_ping, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( + self.__rpc_retrieve_workflow_state, grpclib.const.Cardinality.UNARY_UNARY, - Ping, - IntResponse, + EmptyRequest, + RetrieveWorkflowStateResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": grpclib.const.Handler( - self.__rpc_send_pong, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage": grpclib.const.Handler( + self.__rpc_propagate_embedded_control_message, grpclib.const.Cardinality.UNARY_UNARY, - Pong, - IntResponse, + PropagateEmbeddedControlMessageRequest, + PropagateEmbeddedControlMessageResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": grpclib.const.Handler( - self.__rpc_send_nested, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint": grpclib.const.Handler( + self.__rpc_take_global_checkpoint, grpclib.const.Cardinality.UNARY_UNARY, - Nested, - StringResponse, + TakeGlobalCheckpointRequest, + TakeGlobalCheckpointResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": grpclib.const.Handler( - self.__rpc_send_pass, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand": grpclib.const.Handler( + self.__rpc_debug_command, grpclib.const.Cardinality.UNARY_UNARY, - Pass, - StringResponse, + DebugCommandRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": grpclib.const.Handler( - self.__rpc_send_error_command, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression": grpclib.const.Handler( + self.__rpc_evaluate_python_expression, grpclib.const.Cardinality.UNARY_UNARY, - ErrorCommand, - StringResponse, + EvaluatePythonExpressionRequest, + EvaluatePythonExpressionResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": grpclib.const.Handler( - self.__rpc_send_recursion, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered": grpclib.const.Handler( + self.__rpc_console_message_triggered, grpclib.const.Cardinality.UNARY_UNARY, - Recursion, - StringResponse, + ConsoleMessageTriggeredRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": grpclib.const.Handler( - self.__rpc_send_collect, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( + self.__rpc_port_completed, grpclib.const.Cardinality.UNARY_UNARY, - Collect, - StringResponse, + PortCompletedRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber": grpclib.const.Handler( - self.__rpc_send_generate_number, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow": grpclib.const.Handler( + self.__rpc_start_workflow, grpclib.const.Cardinality.UNARY_UNARY, - GenerateNumber, - IntResponse, + EmptyRequest, + StartWorkflowResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": grpclib.const.Handler( - self.__rpc_send_multi_call, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow": grpclib.const.Handler( + self.__rpc_resume_workflow, grpclib.const.Cardinality.UNARY_UNARY, - MultiCall, - StringResponse, + EmptyRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": grpclib.const.Handler( - self.__rpc_send_chain, + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow": grpclib.const.Handler( + self.__rpc_pause_workflow, grpclib.const.Cardinality.UNARY_UNARY, - Chain, - StringResponse, + EmptyRequest, + EmptyReturn, ), - } - - -class WorkerServiceBase(ServiceBase): - - async def add_input_channel( - self, add_input_channel_request: "AddInputChannelRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def add_partitioning( - self, add_partitioning_request: "AddPartitioningRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def assign_port( - self, assign_port_request: "AssignPortRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def finalize_checkpoint( - self, finalize_checkpoint_request: "FinalizeCheckpointRequest" - ) -> "FinalizeCheckpointResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def flush_network_buffer( - self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def initialize_executor( - self, initialize_executor_request: "InitializeExecutorRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def open_executor(self, empty_request: "EmptyRequest") -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - - async def pause_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated": grpclib.const.Handler( + self.__rpc_worker_state_updated, + grpclib.const.Cardinality.UNARY_UNARY, + WorkerStateUpdatedRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted": grpclib.const.Handler( + self.__rpc_worker_execution_completed, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( + self.__rpc_link_workers, + grpclib.const.Cardinality.UNARY_UNARY, + LinkWorkersRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics": grpclib.const.Handler( + self.__rpc_controller_initiate_query_statistics, + grpclib.const.Cardinality.UNARY_UNARY, + QueryStatisticsRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow": grpclib.const.Handler( + self.__rpc_retry_workflow, + grpclib.const.Cardinality.UNARY_UNARY, + RetryWorkflowRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow": grpclib.const.Handler( + self.__rpc_reconfigure_workflow, + grpclib.const.Cardinality.UNARY_UNARY, + WorkflowReconfigureRequest, + EmptyReturn, + ), + } - async def prepare_checkpoint( - self, prepare_checkpoint_request: "PrepareCheckpointRequest" - ) -> "EmptyReturn": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def query_statistics( - self, empty_request: "EmptyRequest" - ) -> "WorkerMetricsResponse": - raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) +class RpcTesterBase(ServiceBase): - async def resume_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": + async def send_ping(self, ping: "Ping") -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retrieve_state(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_pong(self, pong: "Pong") -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retry_current_tuple(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_nested(self, nested: "Nested") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_worker( - self, empty_request: "EmptyRequest" - ) -> "WorkerStateResponse": + async def send_pass(self, pass_: "Pass") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_error_command( + self, error_command: "ErrorCommand" + ) -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_recursion(self, recursion: "Recursion") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_collect(self, collect: "Collect") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": + async def send_generate_number( + self, generate_number: "GenerateNumber" + ) -> "IntResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatedValue": + async def send_multi_call(self, multi_call: "MultiCall") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def send_chain(self, chain: "Chain") -> "StringResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_add_input_channel( - self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_input_channel(request) - await stream.send_message(response) - - async def __rpc_add_partitioning( - self, stream: "grpclib.server.Stream[AddPartitioningRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.add_partitioning(request) - await stream.send_message(response) - - async def __rpc_assign_port( - self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.assign_port(request) - await stream.send_message(response) - - async def __rpc_finalize_checkpoint( - self, - stream: "grpclib.server.Stream[FinalizeCheckpointRequest, FinalizeCheckpointResponse]", - ) -> None: - request = await stream.recv_message() - response = await self.finalize_checkpoint(request) - await stream.send_message(response) - - async def __rpc_flush_network_buffer( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.flush_network_buffer(request) - await stream.send_message(response) - - async def __rpc_initialize_executor( - self, stream: "grpclib.server.Stream[InitializeExecutorRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.initialize_executor(request) - await stream.send_message(response) - - async def __rpc_open_executor( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.open_executor(request) - await stream.send_message(response) - - async def __rpc_pause_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.pause_worker(request) - await stream.send_message(response) - - async def __rpc_prepare_checkpoint( - self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, EmptyReturn]" - ) -> None: - request = await stream.recv_message() - response = await self.prepare_checkpoint(request) - await stream.send_message(response) - - async def __rpc_query_statistics( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerMetricsResponse]" - ) -> None: - request = await stream.recv_message() - response = await self.query_statistics(request) - await stream.send_message(response) - - async def __rpc_resume_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + async def __rpc_send_ping( + self, stream: "grpclib.server.Stream[Ping, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.resume_worker(request) + response = await self.send_ping(request) await stream.send_message(response) - async def __rpc_retrieve_state( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_pong( + self, stream: "grpclib.server.Stream[Pong, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.retrieve_state(request) + response = await self.send_pong(request) await stream.send_message(response) - async def __rpc_retry_current_tuple( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_nested( + self, stream: "grpclib.server.Stream[Nested, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.retry_current_tuple(request) + response = await self.send_nested(request) await stream.send_message(response) - async def __rpc_start_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + async def __rpc_send_pass( + self, stream: "grpclib.server.Stream[Pass, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.start_worker(request) + response = await self.send_pass(request) await stream.send_message(response) - async def __rpc_end_worker( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_error_command( + self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.end_worker(request) + response = await self.send_error_command(request) await stream.send_message(response) - async def __rpc_start_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_recursion( + self, stream: "grpclib.server.Stream[Recursion, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.start_channel(request) + response = await self.send_recursion(request) await stream.send_message(response) - async def __rpc_end_channel( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_collect( + self, stream: "grpclib.server.Stream[Collect, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.end_channel(request) + response = await self.send_collect(request) await stream.send_message(response) - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" + async def __rpc_send_generate_number( + self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]" ) -> None: request = await stream.recv_message() - response = await self.debug_command(request) + response = await self.send_generate_number(request) await stream.send_message(response) - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatedValue]", + async def __rpc_send_multi_call( + self, stream: "grpclib.server.Stream[MultiCall, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.evaluate_python_expression(request) + response = await self.send_multi_call(request) await stream.send_message(response) - async def __rpc_no_operation( - self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + async def __rpc_send_chain( + self, stream: "grpclib.server.Stream[Chain, StringResponse]" ) -> None: request = await stream.recv_message() - response = await self.no_operation(request) + response = await self.send_chain(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( - self.__rpc_add_input_channel, - grpclib.const.Cardinality.UNARY_UNARY, - AddInputChannelRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning": grpclib.const.Handler( - self.__rpc_add_partitioning, - grpclib.const.Cardinality.UNARY_UNARY, - AddPartitioningRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": grpclib.const.Handler( - self.__rpc_assign_port, - grpclib.const.Cardinality.UNARY_UNARY, - AssignPortRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint": grpclib.const.Handler( - self.__rpc_finalize_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - FinalizeCheckpointRequest, - FinalizeCheckpointResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer": grpclib.const.Handler( - self.__rpc_flush_network_buffer, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor": grpclib.const.Handler( - self.__rpc_initialize_executor, - grpclib.const.Cardinality.UNARY_UNARY, - InitializeExecutorRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": grpclib.const.Handler( - self.__rpc_open_executor, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": grpclib.const.Handler( - self.__rpc_pause_worker, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint": grpclib.const.Handler( - self.__rpc_prepare_checkpoint, - grpclib.const.Cardinality.UNARY_UNARY, - PrepareCheckpointRequest, - EmptyReturn, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics": grpclib.const.Handler( - self.__rpc_query_statistics, - grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerMetricsResponse, - ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": grpclib.const.Handler( - self.__rpc_resume_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": grpclib.const.Handler( + self.__rpc_send_ping, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, + Ping, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": grpclib.const.Handler( - self.__rpc_retrieve_state, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": grpclib.const.Handler( + self.__rpc_send_pong, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Pong, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple": grpclib.const.Handler( - self.__rpc_retry_current_tuple, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": grpclib.const.Handler( + self.__rpc_send_nested, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Nested, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": grpclib.const.Handler( - self.__rpc_start_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": grpclib.const.Handler( + self.__rpc_send_pass, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - WorkerStateResponse, + Pass, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": grpclib.const.Handler( - self.__rpc_end_worker, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": grpclib.const.Handler( + self.__rpc_send_error_command, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + ErrorCommand, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": grpclib.const.Handler( - self.__rpc_start_channel, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": grpclib.const.Handler( + self.__rpc_send_recursion, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Recursion, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": grpclib.const.Handler( - self.__rpc_end_channel, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": grpclib.const.Handler( + self.__rpc_send_collect, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Collect, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber": grpclib.const.Handler( + self.__rpc_send_generate_number, grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, - EmptyReturn, + GenerateNumber, + IntResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": grpclib.const.Handler( + self.__rpc_send_multi_call, grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatedValue, + MultiCall, + StringResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": grpclib.const.Handler( - self.__rpc_no_operation, + "/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": grpclib.const.Handler( + self.__rpc_send_chain, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - EmptyReturn, + Chain, + StringResponse, ), } -class ControllerServiceBase(ServiceBase): +class WorkerServiceBase(ServiceBase): - async def retrieve_workflow_state( - self, empty_request: "EmptyRequest" - ) -> "RetrieveWorkflowStateResponse": + async def add_input_channel( + self, add_input_channel_request: "AddInputChannelRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def propagate_embedded_control_message( - self, - propagate_embedded_control_message_request: "PropagateEmbeddedControlMessageRequest", - ) -> "PropagateEmbeddedControlMessageResponse": + async def add_partitioning( + self, add_partitioning_request: "AddPartitioningRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def take_global_checkpoint( - self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest" - ) -> "TakeGlobalCheckpointResponse": + async def assign_port( + self, assign_port_request: "AssignPortRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def debug_command( - self, debug_command_request: "DebugCommandRequest" - ) -> "EmptyReturn": + async def finalize_checkpoint( + self, finalize_checkpoint_request: "FinalizeCheckpointRequest" + ) -> "FinalizeCheckpointResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def evaluate_python_expression( - self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" - ) -> "EvaluatePythonExpressionResponse": + async def flush_network_buffer( + self, empty_request: "EmptyRequest" + ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def console_message_triggered( - self, console_message_triggered_request: "ConsoleMessageTriggeredRequest" + async def initialize_executor( + self, initialize_executor_request: "InitializeExecutorRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def port_completed( - self, port_completed_request: "PortCompletedRequest" + async def open_executor(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def pause_worker( + self, empty_request: "EmptyRequest" + ) -> "WorkerStateResponse": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def prepare_checkpoint( + self, prepare_checkpoint_request: "PrepareCheckpointRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def start_workflow( + async def query_statistics( self, empty_request: "EmptyRequest" - ) -> "StartWorkflowResponse": + ) -> "WorkerMetricsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def resume_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def resume_worker( + self, empty_request: "EmptyRequest" + ) -> "WorkerStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def pause_workflow(self, empty_request: "EmptyRequest") -> "EmptyReturn": + async def retrieve_state(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def worker_state_updated( - self, worker_state_updated_request: "WorkerStateUpdatedRequest" - ) -> "EmptyReturn": + async def retry_current_tuple(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def worker_execution_completed( + async def start_worker( self, empty_request: "EmptyRequest" - ) -> "EmptyReturn": + ) -> "WorkerStateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def link_workers( - self, link_workers_request: "LinkWorkersRequest" - ) -> "EmptyReturn": + async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def controller_initiate_query_statistics( - self, query_statistics_request: "QueryStatisticsRequest" + async def start_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def end_channel(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def debug_command( + self, debug_command_request: "DebugCommandRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def retry_workflow( - self, retry_workflow_request: "RetryWorkflowRequest" + async def evaluate_python_expression( + self, evaluate_python_expression_request: "EvaluatePythonExpressionRequest" + ) -> "EvaluatedValue": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + + async def update_executor( + self, update_executor_request: "UpdateExecutorRequest" ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) - async def __rpc_retrieve_workflow_state( - self, - stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", + async def __rpc_add_input_channel( + self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.retrieve_workflow_state(request) + response = await self.add_input_channel(request) await stream.send_message(response) - async def __rpc_propagate_embedded_control_message( - self, - stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, PropagateEmbeddedControlMessageResponse]", + async def __rpc_add_partitioning( + self, stream: "grpclib.server.Stream[AddPartitioningRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.propagate_embedded_control_message(request) + response = await self.add_partitioning(request) await stream.send_message(response) - async def __rpc_take_global_checkpoint( + async def __rpc_assign_port( + self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.assign_port(request) + await stream.send_message(response) + + async def __rpc_finalize_checkpoint( self, - stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, TakeGlobalCheckpointResponse]", + stream: "grpclib.server.Stream[FinalizeCheckpointRequest, FinalizeCheckpointResponse]", ) -> None: request = await stream.recv_message() - response = await self.take_global_checkpoint(request) + response = await self.finalize_checkpoint(request) await stream.send_message(response) - async def __rpc_debug_command( - self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" + async def __rpc_flush_network_buffer( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.debug_command(request) + response = await self.flush_network_buffer(request) await stream.send_message(response) - async def __rpc_evaluate_python_expression( - self, - stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatePythonExpressionResponse]", + async def __rpc_initialize_executor( + self, stream: "grpclib.server.Stream[InitializeExecutorRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.evaluate_python_expression(request) + response = await self.initialize_executor(request) await stream.send_message(response) - async def __rpc_console_message_triggered( - self, - stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, EmptyReturn]", + async def __rpc_open_executor( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.console_message_triggered(request) + response = await self.open_executor(request) await stream.send_message(response) - async def __rpc_port_completed( - self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" + async def __rpc_pause_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" ) -> None: request = await stream.recv_message() - response = await self.port_completed(request) + response = await self.pause_worker(request) await stream.send_message(response) - async def __rpc_start_workflow( - self, stream: "grpclib.server.Stream[EmptyRequest, StartWorkflowResponse]" + async def __rpc_prepare_checkpoint( + self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.start_workflow(request) + response = await self.prepare_checkpoint(request) await stream.send_message(response) - async def __rpc_resume_workflow( + async def __rpc_query_statistics( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerMetricsResponse]" + ) -> None: + request = await stream.recv_message() + response = await self.query_statistics(request) + await stream.send_message(response) + + async def __rpc_resume_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" + ) -> None: + request = await stream.recv_message() + response = await self.resume_worker(request) + await stream.send_message(response) + + async def __rpc_retrieve_state( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.resume_workflow(request) + response = await self.retrieve_state(request) await stream.send_message(response) - async def __rpc_pause_workflow( + async def __rpc_retry_current_tuple( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.pause_workflow(request) + response = await self.retry_current_tuple(request) await stream.send_message(response) - async def __rpc_worker_state_updated( - self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, EmptyReturn]" + async def __rpc_start_worker( + self, stream: "grpclib.server.Stream[EmptyRequest, WorkerStateResponse]" ) -> None: request = await stream.recv_message() - response = await self.worker_state_updated(request) + response = await self.start_worker(request) await stream.send_message(response) - async def __rpc_worker_execution_completed( + async def __rpc_end_worker( self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.worker_execution_completed(request) + response = await self.end_worker(request) await stream.send_message(response) - async def __rpc_link_workers( - self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]" + async def __rpc_start_channel( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.link_workers(request) + response = await self.start_channel(request) await stream.send_message(response) - async def __rpc_controller_initiate_query_statistics( - self, stream: "grpclib.server.Stream[QueryStatisticsRequest, EmptyReturn]" + async def __rpc_end_channel( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.controller_initiate_query_statistics(request) + response = await self.end_channel(request) await stream.send_message(response) - async def __rpc_retry_workflow( - self, stream: "grpclib.server.Stream[RetryWorkflowRequest, EmptyReturn]" + async def __rpc_debug_command( + self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]" ) -> None: request = await stream.recv_message() - response = await self.retry_workflow(request) + response = await self.debug_command(request) + await stream.send_message(response) + + async def __rpc_evaluate_python_expression( + self, + stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, EvaluatedValue]", + ) -> None: + request = await stream.recv_message() + response = await self.evaluate_python_expression(request) + await stream.send_message(response) + + async def __rpc_no_operation( + self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.no_operation(request) + await stream.send_message(response) + + async def __rpc_update_executor( + self, stream: "grpclib.server.Stream[UpdateExecutorRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.update_executor(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( - self.__rpc_retrieve_workflow_state, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( + self.__rpc_add_input_channel, grpclib.const.Cardinality.UNARY_UNARY, - EmptyRequest, - RetrieveWorkflowStateResponse, + AddInputChannelRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage": grpclib.const.Handler( - self.__rpc_propagate_embedded_control_message, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning": grpclib.const.Handler( + self.__rpc_add_partitioning, grpclib.const.Cardinality.UNARY_UNARY, - PropagateEmbeddedControlMessageRequest, - PropagateEmbeddedControlMessageResponse, + AddPartitioningRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint": grpclib.const.Handler( - self.__rpc_take_global_checkpoint, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": grpclib.const.Handler( + self.__rpc_assign_port, grpclib.const.Cardinality.UNARY_UNARY, - TakeGlobalCheckpointRequest, - TakeGlobalCheckpointResponse, + AssignPortRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand": grpclib.const.Handler( - self.__rpc_debug_command, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint": grpclib.const.Handler( + self.__rpc_finalize_checkpoint, grpclib.const.Cardinality.UNARY_UNARY, - DebugCommandRequest, + FinalizeCheckpointRequest, + FinalizeCheckpointResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer": grpclib.const.Handler( + self.__rpc_flush_network_buffer, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression": grpclib.const.Handler( - self.__rpc_evaluate_python_expression, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor": grpclib.const.Handler( + self.__rpc_initialize_executor, grpclib.const.Cardinality.UNARY_UNARY, - EvaluatePythonExpressionRequest, - EvaluatePythonExpressionResponse, + InitializeExecutorRequest, + EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered": grpclib.const.Handler( - self.__rpc_console_message_triggered, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": grpclib.const.Handler( + self.__rpc_open_executor, grpclib.const.Cardinality.UNARY_UNARY, - ConsoleMessageTriggeredRequest, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( - self.__rpc_port_completed, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": grpclib.const.Handler( + self.__rpc_pause_worker, grpclib.const.Cardinality.UNARY_UNARY, - PortCompletedRequest, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint": grpclib.const.Handler( + self.__rpc_prepare_checkpoint, + grpclib.const.Cardinality.UNARY_UNARY, + PrepareCheckpointRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow": grpclib.const.Handler( - self.__rpc_start_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics": grpclib.const.Handler( + self.__rpc_query_statistics, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, - StartWorkflowResponse, + WorkerMetricsResponse, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow": grpclib.const.Handler( - self.__rpc_resume_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": grpclib.const.Handler( + self.__rpc_resume_worker, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": grpclib.const.Handler( + self.__rpc_retrieve_state, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow": grpclib.const.Handler( - self.__rpc_pause_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple": grpclib.const.Handler( + self.__rpc_retry_current_tuple, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated": grpclib.const.Handler( - self.__rpc_worker_state_updated, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": grpclib.const.Handler( + self.__rpc_start_worker, grpclib.const.Cardinality.UNARY_UNARY, - WorkerStateUpdatedRequest, + EmptyRequest, + WorkerStateResponse, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": grpclib.const.Handler( + self.__rpc_end_worker, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted": grpclib.const.Handler( - self.__rpc_worker_execution_completed, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": grpclib.const.Handler( + self.__rpc_start_channel, grpclib.const.Cardinality.UNARY_UNARY, EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers": grpclib.const.Handler( - self.__rpc_link_workers, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": grpclib.const.Handler( + self.__rpc_end_channel, grpclib.const.Cardinality.UNARY_UNARY, - LinkWorkersRequest, + EmptyRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics": grpclib.const.Handler( - self.__rpc_controller_initiate_query_statistics, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": grpclib.const.Handler( + self.__rpc_debug_command, grpclib.const.Cardinality.UNARY_UNARY, - QueryStatisticsRequest, + DebugCommandRequest, EmptyReturn, ), - "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow": grpclib.const.Handler( - self.__rpc_retry_workflow, + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression": grpclib.const.Handler( + self.__rpc_evaluate_python_expression, grpclib.const.Cardinality.UNARY_UNARY, - RetryWorkflowRequest, + EvaluatePythonExpressionRequest, + EvaluatedValue, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": grpclib.const.Handler( + self.__rpc_no_operation, + grpclib.const.Cardinality.UNARY_UNARY, + EmptyRequest, + EmptyReturn, + ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor": grpclib.const.Handler( + self.__rpc_update_executor, + grpclib.const.Cardinality.UNARY_UNARY, + UpdateExecutorRequest, EmptyReturn, ), } diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py index 55c789aa39..8c1464cc76 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py @@ -18,6 +18,27 @@ from ..architecture import ( ) +@dataclass(eq=False, repr=False) +class Backpressure(betterproto.Message): + enable_backpressure: bool = betterproto.bool_field(1) + + +@dataclass(eq=False, repr=False) +class CreditUpdate(betterproto.Message): + pass + + +@dataclass(eq=False, repr=False) +class ActorCommand(betterproto.Message): + backpressure: "Backpressure" = betterproto.message_field(1, group="sealed_value") + credit_update: "CreditUpdate" = betterproto.message_field(2, group="sealed_value") + + +@dataclass(eq=False, repr=False) +class PythonActorMessage(betterproto.Message): + payload: "ActorCommand" = betterproto.message_field(1) + + @dataclass(eq=False, repr=False) class DirectControlMessagePayloadV2(betterproto.Message): control_invocation: "_architecture_rpc__.ControlInvocation" = ( @@ -133,24 +154,3 @@ class ExecutionMetadataStore(betterproto.Message): fatal_errors: List["__core__.WorkflowFatalError"] = betterproto.message_field(2) execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3) is_recovering: bool = betterproto.bool_field(4) - - -@dataclass(eq=False, repr=False) -class Backpressure(betterproto.Message): - enable_backpressure: bool = betterproto.bool_field(1) - - -@dataclass(eq=False, repr=False) -class CreditUpdate(betterproto.Message): - pass - - -@dataclass(eq=False, repr=False) -class ActorCommand(betterproto.Message): - backpressure: "Backpressure" = betterproto.message_field(1, group="sealed_value") - credit_update: "CreditUpdate" = betterproto.message_field(2, group="sealed_value") - - -@dataclass(eq=False, repr=False) -class PythonActorMessage(betterproto.Message): - payload: "ActorCommand" = betterproto.message_field(1) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab4..2902173364 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -46,6 +46,7 @@ class ControllerAsyncRPCHandlerInitializer( with DebugCommandHandler with TakeGlobalCheckpointHandler with EmbeddedControlMessageHandler - with RetrieveWorkflowStateHandler { + with RetrieveWorkflowStateHandler + with ReconfigurationHandler { val actorId: ActorVirtualIdentity = cp.actorId } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala new file mode 100644 index 0000000000..9470336efb --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, EmbeddedControlMessageIdentity} +import org.apache.texera.amber.engine.architecture.controller.{ControllerAsyncRPCHandlerInitializer, ExecutionStatsUpdate} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, WorkflowReconfigureRequest} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.amber.util.VirtualIdentityUtils + +trait ReconfigurationHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def reconfigureWorkflow(msg: WorkflowReconfigureRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = { + FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator, msg).foreach{ + friesComponent => + if(friesComponent.scope.size == 1){ + val updateExecutorRequest = friesComponent.reconfigurations.head + val workerIds = cp.workflowExecution.getLatestOperatorExecution(updateExecutorRequest.targetOpId).getWorkerIds + workerIds.foreach{ + worker => + workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker)) + } + }else{ + val channelScope = cp.workflowExecution.getRunningRegionExecutions + .flatMap(regionExecution => + regionExecution.getAllLinkExecutions + .map(_._2) + .flatMap(linkExecution => linkExecution.getAllChannelExecutions.map(_._1)) + ).filter(channelId => { + friesComponent.scope + .contains(VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId)) && + friesComponent.scope + .contains(VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId)) + }) + val controlChannels = friesComponent.sources.flatMap { source => + cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.flatMap { worker => + Seq( + ChannelIdentity(CONTROLLER, worker, isControl = true), + ChannelIdentity(worker, CONTROLLER, isControl = true) + ) + } + } + val finalScope = channelScope ++ controlChannels + val cmdMapping = + friesComponent.reconfigurations + .flatMap { updateReq => + val workers = + cp.workflowExecution + .getLatestOperatorExecution(updateReq.targetOpId) + .getWorkerIds + + workers.map { worker => + worker.name -> + createInvocation( + METHOD_UPDATE_EXECUTOR.getBareMethodName, + updateReq.newExecInitInfo, + worker + ) + } + } + .groupBy(_._1) + .map { + case (worker, entries) => + worker -> entries.head._2 + } + friesComponent.sources.foreach { source => + cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { worker => + sendECM( + EmbeddedControlMessageIdentity(msg.reconfigurationId), + ALL_ALIGNMENT, + finalScope.toSet, + cmdMapping, + ChannelIdentity(actorId, worker, isControl = true) + ) + } + } + } + + } + EmptyReturn() + } + +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf6697..545d0c60f1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -20,18 +20,17 @@ package org.apache.texera.amber.engine.architecture.worker import com.twitter.util.Future +import org.apache.texera.amber.core.executor.{ExecFactory, OpExecInitInfo, OpExecSource, OpExecWithClassName, OpExecWithCode} import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - DebugCommandRequest, - EmptyRequest, - EvaluatePythonExpressionRequest -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, DebugCommandRequest, EmptyRequest, EvaluatePythonExpressionRequest} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{EmptyReturn, EvaluatedValue} import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.worker.promisehandlers._ import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.engine.common.rpc.AsyncRPCHandlerInitializer +import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec + +import java.net.URI class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) extends AsyncRPCHandlerInitializer(dp.asyncRPCClient, dp.asyncRPCServer) @@ -52,7 +51,8 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with FlushNetworkBufferHandler with RetrieveStateHandler with PrepareCheckpointHandler - with FinalizeCheckpointHandler { + with FinalizeCheckpointHandler + with UpdateExecutorHandler { val actorId: ActorVirtualIdentity = dp.actorId override def debugCommand( @@ -69,4 +69,17 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) ??? override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = ??? + + def initializeExecutor(execInitInfo: OpExecInitInfo): Unit = { + dp.executor = execInitInfo match { + case OpExecWithClassName(className, descString) => + ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) + case OpExecWithCode(code, _) => + ExecFactory.newExecFromJavaCode(code) + case OpExecSource(storageUri, _) => + new CacheSourceOpExec(URI.create(storageUri)) + case OpExecInitInfo.Empty => + throw new IllegalArgumentException("Empty executor initialization info") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index bf45d8eff9..1204710181 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -54,5 +54,4 @@ trait InitializeExecutorHandler { } EmptyReturn() } - } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala similarity index 50% copy from amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala copy to amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala index bf45d8eff9..7f9e1fa31d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala @@ -20,39 +20,44 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.core.executor._ -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - InitializeExecutorRequest -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, UpdateExecutorRequest} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec -import org.apache.texera.amber.util.VirtualIdentityUtils - -import java.net.URI +import scala.reflect.runtime.universe._ -trait InitializeExecutorHandler { +trait UpdateExecutorHandler { this: DataProcessorRPCHandlerInitializer => - override def initializeExecutor( - req: InitializeExecutorRequest, - ctx: AsyncRPCContext - ): Future[EmptyReturn] = { - dp.serializationManager.setOpInitialization(req) - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) - val workerCount = req.totalWorkerCount - dp.executor = req.opExecInitInfo match { - case OpExecWithClassName(className, descString) => - ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) - case OpExecWithCode(code, _) => - ExecFactory.newExecFromJavaCode(code) - case OpExecSource(storageUri, _) => - new CacheSourceOpExec(URI.create(storageUri)) - case OpExecInitInfo.Empty => - throw new IllegalArgumentException("Empty executor initialization info") - } + override def updateExecutor( + request: UpdateExecutorRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + val oldOpExecState = dp.executor + initializeExecutor(request.newExecInitInfo) + dp.executor.open() + copyMatchingFields(oldOpExecState, dp.executor) //TBD if we really need this EmptyReturn() } + private[this] def copyMatchingFields[A: TypeTag, B: TypeTag](from: A, to: B): Unit = { + val mirror = runtimeMirror(from.getClass.getClassLoader) + + val fromFields = typeOf[A].members.collect { + case m: MethodSymbol if m.isGetter => m + } + + val toFields = typeOf[B].members.collect { + case m: MethodSymbol if m.isVar => m + }.map(m => m.name.toString -> m).toMap + + fromFields.foreach { f => + val name = f.name.toString + toFields.get(name).foreach { setter => + if (f.returnType =:= setter.returnType) { + val value = mirror.reflect(from).reflectMethod(f).apply() + mirror.reflect(to).reflectField(setter).set(value) + } + } + } + } } diff --git a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala similarity index 71% rename from amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala index c2a15106b1..f1c82e6343 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.texera.web.service +package org.apache.texera.amber.engine.common import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity import org.apache.texera.amber.core.workflow.PhysicalPlan import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - ModifyLogicRequest, - PropagateEmbeddedControlMessageRequest + UpdateExecutorRequest, + WorkflowReconfigureRequest } import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator} import org.jgrapht.alg.connectivity.ConnectivityInspector @@ -34,28 +34,32 @@ import scala.jdk.CollectionConverters.SetHasAsScala object FriesReconfigurationAlgorithm { + case class FriesComponent( + sources: Set[PhysicalOpIdentity], + scope: Set[PhysicalOpIdentity], + reconfigurations: Set[UpdateExecutorRequest]) + private def getOneToManyOperators(region: Region): Set[PhysicalOpIdentity] = { region.getOperators.filter(op => op.isOneToManyOp).map(op => op.id) } - def scheduleReconfigurations( - workflowExecutionCoordinator: WorkflowExecutionCoordinator, - reconfiguration: ModifyLogicRequest, - epochMarkerId: String - ): Set[PropagateEmbeddedControlMessageRequest] = { + def getReconfigurations( + workflowExecutionCoordinator: WorkflowExecutionCoordinator, + reconfiguration: WorkflowReconfigureRequest + ): Set[FriesComponent] = { // independently schedule reconfigurations for each region: workflowExecutionCoordinator.getExecutingRegions - .flatMap(region => computeMCS(region, reconfiguration, epochMarkerId)) + .flatMap(region => computeMCS(region, reconfiguration, reconfiguration.reconfigurationId)) } private def computeMCS( region: Region, - reconfiguration: ModifyLogicRequest, + reconfiguration: WorkflowReconfigureRequest, epochMarkerId: String - ): List[PropagateEmbeddedControlMessageRequest] = { + ): List[FriesComponent] = { // add all reconfiguration operators to M - val reconfigOps = reconfiguration.updateRequest.map(req => req.targetOpId).toSet + val reconfigOps = reconfiguration.reconfiguration.map(req => req.targetOpId).toSet val M = mutable.Set.empty ++ reconfigOps // for each one-to-many operator, add it to M if its downstream has a reconfiguration operator @@ -101,30 +105,20 @@ object FriesReconfigurationAlgorithm { // find the MCS components, // for each component, send an epoch marker to each of its source operators - val epochMarkers = new ArrayBuffer[PropagateEmbeddedControlMessageRequest]() + val epochMarkers = new ArrayBuffer[FriesComponent]() val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets() connectedSets.forEach(component => { val componentSet = component.asScala.toSet val componentPlan = mcsPlan.getSubPlan(componentSet) - - // generate the reconfiguration command for this component - // val reconfigCommands = - // reconfiguration.updateRequest - // .filter(req => component.contains(req.targetOpId)) - // val reconfigTargets = reconfigCommands.map(_.targetOpId) - // - // // find the source operators of the component - // val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) - // epochMarkers += PropagateEmbeddedControlMessageRequest( - // sources.toSeq, - // EmbeddedControlMessageIdentity(epochMarkerId), - // ALL_ALIGNMENT, - // componentPlan.operators.map(_.id).toSeq, - // reconfigTargets, - // ModifyLogicRequest(reconfigCommands), - // METHOD_MODIFY_LOGIC.getBareMethodName - // ) + val reconfigCommands = + reconfiguration.reconfiguration + .filter(req => component.contains(req.targetOpId)) + .toSet + + // find the source operators of the component + val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) + epochMarkers += FriesComponent(sources, componentPlan.operators.map(_.id), reconfigCommands) }) epochMarkers.toList } diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3..f8b1868941 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -37,7 +37,7 @@ storage { username = "texera" username = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME} - password = "password" + password = "123456" password = ${?STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD} } } @@ -134,7 +134,7 @@ storage { username = "postgres" username = ${?STORAGE_JDBC_USERNAME} - password = "postgres" + password = "123456" password = ${?STORAGE_JDBC_PASSWORD} } } diff --git a/common/config/src/main/resources/udf.conf b/common/config/src/main/resources/udf.conf index 7212d1720e..c009aed0de 100644 --- a/common/config/src/main/resources/udf.conf +++ b/common/config/src/main/resources/udf.conf @@ -17,7 +17,7 @@ python { # python3 executable path - path = "" + path = "C:\\Users\\12198\\IdeaProjects\\texera\\.venv\\Scripts\\python.exe" path = ${?UDF_PYTHON_PATH} log {
