This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 38b8d6485ebb811e35cdd1a7ad4f2bba8f2a42a2 Author: Kunwoo Park <[email protected]> AuthorDate: Mon Oct 27 16:19:48 2025 -0700 Add websocket event BigObjectUpdateEvent --- .../engine/architecture/rpc/controlcommands.proto | 18 +++++++ .../architecture/rpc/controllerservice.proto | 1 + .../python/core/architecture/managers/context.py | 1 + .../architecture/managers/execution_context.py | 17 ++++++- .../main/python/core/runnables/data_processor.py | 2 + amber/src/main/python/core/runnables/main_loop.py | 3 ++ .../amber/engine/architecture/rpc/__init__.py | 57 ++++++++++++++++++++++ .../python/pytexera/storage/big_object_manager.py | 41 ++++++++++++++++ .../ControllerAsyncRPCHandlerInitializer.scala | 1 + .../promisehandlers/BigObjectEventHandler.scala | 40 +++++++++++++++ .../InitializeExecutorHandler.scala | 21 +++++++- .../websocket/event/BigObjectUpdateEvent.scala | 24 +++++++++ .../websocket/event/TexeraWebSocketEvent.scala | 3 +- .../web/service/ExecutionBigObjectService.scala | 48 ++++++++++++++++++ .../web/service/WorkflowExecutionService.scala | 2 + .../texera/web/storage/ExecutionStateStore.scala | 13 ++++- .../amber/core/executor/ExecutionContext.scala | 18 +++++++ .../texera/service/util/BigObjectManager.scala | 31 ++++++++++++ 18 files changed, 337 insertions(+), 4 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto index d596f8b044..61a38d1415 100644 --- a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + BigObjectEventTriggeredRequest bigObjectEventTriggeredRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -163,6 +164,23 @@ message ConsoleMessageTriggeredRequest { ConsoleMessage consoleMessage = 1 [(scalapb.field).no_box = true]; } +enum BigObjectEventType { + CREATE = 0; + READ = 1; +} + +message BigObjectEvent { + option (scalapb.message).extends = "org.apache.amber.engine.architecture.controller.ClientEvent"; + string operator_id = 1; + string uri = 2; + BigObjectEventType event_type = 3; + google.protobuf.Timestamp timestamp = 4 [(scalapb.field).no_box = true]; +} + +message BigObjectEventTriggeredRequest { + BigObjectEvent event = 1 [(scalapb.field).no_box = true]; +} + message PortCompletedRequest { core.PortIdentity portId = 1 [(scalapb.field).no_box = true]; bool input = 2; diff --git a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto index b03cd0f51f..402ee4c5aa 100644 --- a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto @@ -36,6 +36,7 @@ service ControllerService { rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse); rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn); + rpc BigObjectEventTriggered(BigObjectEventTriggeredRequest) returns (EmptyReturn); rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn); rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse); rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/python/core/architecture/managers/context.py b/amber/src/main/python/core/architecture/managers/context.py index fa92a6db26..c258345efe 100644 --- a/amber/src/main/python/core/architecture/managers/context.py +++ b/amber/src/main/python/core/architecture/managers/context.py @@ -53,6 +53,7 @@ class Context: # operator_id: parsed from worker_id (format: Worker:WF{wid}-{opid}-{layer}-{idx}) self.execution_id: Optional[int] = None self.operator_id: Optional[str] = None + self.async_rpc_client = None # Set by MainLoop after creation # Extract operator_id from worker_id (e.g., "Worker:WF78-op-scan-main-0" → "op-scan") match = re.match(r"Worker:WF\d+-(.+)-\w+-\d+", worker_id) diff --git a/amber/src/main/python/core/architecture/managers/execution_context.py b/amber/src/main/python/core/architecture/managers/execution_context.py index 31e02436a6..508d5c6327 100644 --- a/amber/src/main/python/core/architecture/managers/execution_context.py +++ b/amber/src/main/python/core/architecture/managers/execution_context.py @@ -22,7 +22,10 @@ execution_id and operator_id without requiring explicit parameters. """ import threading -from typing import Optional +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from core.architecture.rpc.async_rpc_client import AsyncRPCClient class ExecutionContext: @@ -53,6 +56,16 @@ class ExecutionContext: """Get the operator ID for the current thread, or None if not set.""" return getattr(cls._thread_local, "operator_id", None) + @classmethod + def set_async_rpc_client(cls, client: "AsyncRPCClient") -> None: + """Set the AsyncRPCClient for the current thread.""" + cls._thread_local.async_rpc_client = client + + @classmethod + def get_async_rpc_client(cls) -> Optional["AsyncRPCClient"]: + """Get the AsyncRPCClient for the current thread, or None if not set.""" + return getattr(cls._thread_local, "async_rpc_client", None) + @classmethod def clear(cls) -> None: """Clear all execution context information for the current thread.""" @@ -60,3 +73,5 @@ class ExecutionContext: delattr(cls._thread_local, "execution_id") if hasattr(cls._thread_local, "operator_id"): delattr(cls._thread_local, "operator_id") + if hasattr(cls._thread_local, "async_rpc_client"): + delattr(cls._thread_local, "async_rpc_client") diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 0c9747a406..dc0f902710 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -55,6 +55,8 @@ class DataProcessor(Runnable, Stoppable): # Sync ExecutionContext from context (for thread-local storage) ExecutionContext.set_execution_id(self._context.execution_id) ExecutionContext.set_operator_id(self._context.operator_id) + if self._context.async_rpc_client: + ExecutionContext.set_async_rpc_client(self._context.async_rpc_client) marker = self._context.tuple_processing_manager.get_internal_marker() state = self._context.state_processing_manager.get_input_state() diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index ec662ec721..3a984db9df 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -81,6 +81,9 @@ class MainLoop(StoppableQueueBlockingRunnable): self.context = Context(worker_id, input_queue) self._async_rpc_server = AsyncRPCServer(output_queue, context=self.context) self._async_rpc_client = AsyncRPCClient(output_queue, context=self.context) + self.context.async_rpc_client = ( + self._async_rpc_client + ) # Store reference for BigObjectManager self.data_processor = DataProcessor(self.context) threading.Thread( diff --git a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py index 258fc928f7..12619a516b 100644 --- a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py @@ -60,6 +60,11 @@ class ConsoleMessageType(betterproto.Enum): DEBUGGER = 3 +class BigObjectEventType(betterproto.Enum): + CREATE = 0 + READ = 1 + + class ErrorLanguage(betterproto.Enum): PYTHON = 0 SCALA = 1 @@ -113,6 +118,9 @@ class ControlRequest(betterproto.Message): link_workers_request: "LinkWorkersRequest" = betterproto.message_field( 11, group="sealed_value" ) + big_object_event_triggered_request: "BigObjectEventTriggeredRequest" = ( + betterproto.message_field(12, group="sealed_value") + ) add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( 50, group="sealed_value" ) @@ -250,6 +258,19 @@ class ConsoleMessageTriggeredRequest(betterproto.Message): console_message: "ConsoleMessage" = betterproto.message_field(1) +@dataclass(eq=False, repr=False) +class BigObjectEvent(betterproto.Message): + operator_id: str = betterproto.string_field(1) + uri: str = betterproto.string_field(2) + event_type: "BigObjectEventType" = betterproto.enum_field(3) + timestamp: datetime = betterproto.message_field(4) + + +@dataclass(eq=False, repr=False) +class BigObjectEventTriggeredRequest(betterproto.Message): + event: "BigObjectEvent" = betterproto.message_field(1) + + @dataclass(eq=False, repr=False) class PortCompletedRequest(betterproto.Message): port_id: "___core__.PortIdentity" = betterproto.message_field(1) @@ -1152,6 +1173,23 @@ class ControllerServiceStub(betterproto.ServiceStub): metadata=metadata, ) + async def big_object_event_triggered( + self, + big_object_event_triggered_request: "BigObjectEventTriggeredRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered", + big_object_event_triggered_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + async def port_completed( self, port_completed_request: "PortCompletedRequest", @@ -1865,6 +1903,11 @@ class ControllerServiceBase(ServiceBase): ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def big_object_event_triggered( + self, big_object_event_triggered_request: "BigObjectEventTriggeredRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def port_completed( self, port_completed_request: "PortCompletedRequest" ) -> "EmptyReturn": @@ -1953,6 +1996,14 @@ class ControllerServiceBase(ServiceBase): response = await self.console_message_triggered(request) await stream.send_message(response) + async def __rpc_big_object_event_triggered( + self, + stream: "grpclib.server.Stream[BigObjectEventTriggeredRequest, EmptyReturn]", + ) -> None: + request = await stream.recv_message() + response = await self.big_object_event_triggered(request) + await stream.send_message(response) + async def __rpc_port_completed( self, stream: "grpclib.server.Stream[PortCompletedRequest, EmptyReturn]" ) -> None: @@ -2054,6 +2105,12 @@ class ControllerServiceBase(ServiceBase): ConsoleMessageTriggeredRequest, EmptyReturn, ), + "/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered": grpclib.const.Handler( + self.__rpc_big_object_event_triggered, + grpclib.const.Cardinality.UNARY_UNARY, + BigObjectEventTriggeredRequest, + EmptyReturn, + ), "/org.apache.amber.engine.architecture.rpc.ControllerService/PortCompleted": grpclib.const.Handler( self.__rpc_port_completed, grpclib.const.Cardinality.UNARY_UNARY, diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py index 2fa2c7398c..eb640eb57b 100644 --- a/amber/src/main/python/pytexera/storage/big_object_manager.py +++ b/amber/src/main/python/pytexera/storage/big_object_manager.py @@ -21,6 +21,7 @@ import time import uuid from typing import BinaryIO, Union, Optional from io import BytesIO +from datetime import datetime from core.models.schema.big_object_pointer import BigObjectPointer from core.storage.storage_config import StorageConfig from core.architecture.managers.execution_context import ExecutionContext @@ -216,6 +217,9 @@ class BigObjectManager: pass raise RuntimeError(f"Failed to create big object: {e}") + # Send event to frontend + cls._send_big_object_event(operator_id, uri, "CREATE") + return BigObjectPointer(uri) @classmethod @@ -240,6 +244,43 @@ class BigObjectManager: try: body = cls._get_s3_client().get_object(Bucket=bucket, Key=key)["Body"] + + # Send event to frontend + operator_id = ExecutionContext.get_operator_id() + if operator_id: + cls._send_big_object_event(operator_id, pointer.uri, "READ") + return BigObjectStream(body, pointer) except Exception as e: raise RuntimeError(f"Failed to open {pointer.uri}: {e}") + + @classmethod + def _send_big_object_event(cls, operator_id: str, uri: str, event_type: str): + """Sends BigObjectEvent to controller for forwarding to frontend.""" + try: + from proto.org.apache.amber.engine.architecture.rpc import ( + BigObjectEvent, + BigObjectEventTriggeredRequest, + BigObjectEventType, + ) + from datetime import datetime, timezone + + client = ExecutionContext.get_async_rpc_client() + if not client: + return + + event = BigObjectEvent( + operator_id=operator_id, + uri=uri, + event_type=( + BigObjectEventType.CREATE + if event_type == "CREATE" + else BigObjectEventType.READ + ), + timestamp=datetime.now(timezone.utc), + ) + client.controller_stub().big_object_event_triggered( + BigObjectEventTriggeredRequest(event=event) + ) + except Exception: + pass diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 61d5e3869b..226cfceda0 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -41,6 +41,7 @@ class ControllerAsyncRPCHandlerInitializer( with StartWorkflowHandler with PortCompletedHandler with ConsoleMessageHandler + with BigObjectEventHandler with RetryWorkflowHandler with EvaluatePythonExpressionHandler with DebugCommandHandler diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala new file mode 100644 index 0000000000..1bc799c55b --- /dev/null +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer +import org.apache.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + BigObjectEventTriggeredRequest +} +import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn + +trait BigObjectEventHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def bigObjectEventTriggered( + msg: BigObjectEventTriggeredRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + sendToClient(msg.event) + EmptyReturn() + } +} diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 871d1a2efb..d3272cf95a 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -19,18 +19,24 @@ package org.apache.amber.engine.architecture.worker.promisehandlers +import com.google.protobuf.timestamp.Timestamp import com.twitter.util.Future import org.apache.amber.core.executor._ import org.apache.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, + BigObjectEvent, + BigObjectEventTriggeredRequest, + BigObjectEventType, InitializeExecutorRequest } import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.amber.operator.source.cache.CacheSourceOpExec import org.apache.amber.util.VirtualIdentityUtils import java.net.URI +import java.time.Instant trait InitializeExecutorHandler { this: DataProcessorRPCHandlerInitializer => @@ -43,9 +49,22 @@ trait InitializeExecutorHandler { val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) val workerCount = req.totalWorkerCount - // Set execution context for this thread ExecutionContext.setExecutionId(req.executionId) ExecutionContext.setOperatorId(VirtualIdentityUtils.getPhysicalOpId(actorId).logicalOpId.id) + ExecutionContext.setBigObjectEventCallback((operatorId, uri, eventType) => + try { + val event = BigObjectEvent( + operatorId, + uri, + if (eventType == "CREATE") BigObjectEventType.CREATE else BigObjectEventType.READ, + Timestamp(Instant.now) + ) + dp.asyncRPCClient.controllerInterface.bigObjectEventTriggered( + BigObjectEventTriggeredRequest(event), + dp.asyncRPCClient.mkContext(CONTROLLER) + ) + } catch { case _: Exception => } + ) dp.executor = req.opExecInitInfo match { case OpExecWithClassName(className, descString) => diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala new file mode 100644 index 0000000000..1f06a9a256 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.model.websocket.event + +import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent + +case class BigObjectUpdateEvent(events: Seq[BigObjectEvent]) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala index da072c80ea..d14184ca6f 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala @@ -38,7 +38,8 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify new Type(value = classOf[PaginatedResultEvent]), new Type(value = classOf[PythonExpressionEvaluateResponse]), new Type(value = classOf[WorkerAssignmentUpdateEvent]), - new Type(value = classOf[ModifyLogicResponse]) + new Type(value = classOf[ModifyLogicResponse]), + new Type(value = classOf[BigObjectUpdateEvent]) ) ) trait TexeraWebSocketEvent {} diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala new file mode 100644 index 0000000000..abcd7baac1 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent +import org.apache.amber.engine.common.client.AmberClient +import org.apache.texera.web.SubscriptionManager +import org.apache.texera.web.model.websocket.event.BigObjectUpdateEvent +import org.apache.texera.web.storage.ExecutionStateStore + +/** + * Forwards BigObjectEvent messages from workers to frontend via websocket. + */ +class ExecutionBigObjectService( + client: AmberClient, + stateStore: ExecutionStateStore +) extends SubscriptionManager { + + addSubscription( + client.registerCallback[BigObjectEvent](evt => stateStore.bigObjectStore.updateState(_ :+ evt)) + ) + + addSubscription( + stateStore.bigObjectStore.registerDiffHandler((oldEvents, newEvents) => + newEvents.diff(oldEvents) match { + case diff if diff.nonEmpty => Seq(BigObjectUpdateEvent(diff)) + case _ => Seq.empty + } + ) + ) +} diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index bfbff230e1..0788e06b9a 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -102,6 +102,7 @@ class WorkflowExecutionService( var executionStatsService: ExecutionStatsService = _ var executionRuntimeService: ExecutionRuntimeService = _ var executionConsoleService: ExecutionConsoleService = _ + var executionBigObjectService: ExecutionBigObjectService = _ def executeWorkflow(): Unit = { try { @@ -134,6 +135,7 @@ class WorkflowExecutionService( ) executionConsoleService = new ExecutionConsoleService(client, executionStateStore, wsInput, workflow.context) + executionBigObjectService = new ExecutionBigObjectService(client, executionStateStore) logger.info("Starting the workflow execution.") resultService.attachToExecution( diff --git a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala index 3700b44560..d97c789990 100644 --- a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala +++ b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala @@ -55,8 +55,19 @@ class ExecutionStateStore { val consoleStore = new StateStore(ExecutionConsoleStore()) val breakpointStore = new StateStore(ExecutionBreakpointStore()) val reconfigurationStore = new StateStore(ExecutionReconfigurationStore()) + val bigObjectStore = + new StateStore[Seq[org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent]]( + Seq.empty + ) def getAllStores: Iterable[StateStore[_]] = { - Iterable(statsStore, consoleStore, breakpointStore, metadataStore, reconfigurationStore) + Iterable( + statsStore, + consoleStore, + breakpointStore, + metadataStore, + reconfigurationStore, + bigObjectStore + ) } } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala index f448f3eb59..eecf554110 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala @@ -27,6 +27,8 @@ object ExecutionContext { private val executionIdThreadLocal: ThreadLocal[Option[Int]] = ThreadLocal.withInitial(() => None) private val operatorIdThreadLocal: ThreadLocal[Option[String]] = ThreadLocal.withInitial(() => None) + private val eventCallbackThreadLocal: ThreadLocal[Option[(String, String, String) => Unit]] = + ThreadLocal.withInitial(() => None) /** * Sets the execution ID for the current thread. @@ -60,6 +62,21 @@ object ExecutionContext { operatorIdThreadLocal.get() } + /** + * Sets a callback for sending big object events. + * The callback takes (operatorId, uri, eventType) as parameters. + */ + def setBigObjectEventCallback(callback: (String, String, String) => Unit): Unit = { + eventCallbackThreadLocal.set(Some(callback)) + } + + /** + * Gets the big object event callback for the current thread. + */ + def getBigObjectEventCallback: Option[(String, String, String) => Unit] = { + eventCallbackThreadLocal.get() + } + /** * Clears the execution context for the current thread. * Should be called when cleaning up. @@ -67,5 +84,6 @@ object ExecutionContext { def clear(): Unit = { executionIdThreadLocal.remove() operatorIdThreadLocal.remove() + eventCallbackThreadLocal.remove() } } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala index a7a3d1b2b3..c7b8edef6b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -116,6 +116,9 @@ object BigObjectManager extends LazyLogging { throw new RuntimeException(s"Failed to create big object: ${e.getMessage}", e) } + // Send event to frontend + sendBigObjectEvent(operatorId, uri, "CREATE") + new BigObjectPointer(uri) } @@ -132,9 +135,37 @@ object BigObjectManager extends LazyLogging { ) val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, ptr.getObjectKey) + + // Send event to frontend + ExecutionContext.getOperatorId.foreach { operatorId => + sendBigObjectEvent(operatorId, ptr.getUri, "READ") + } + new BigObjectStream(inputStream) } + /** + * Sends a BigObjectEvent using the registered callback. + * + * @param operatorId The operator ID + * @param uri The big object URI + * @param eventType The event type ("CREATE" or "READ") + */ + private def sendBigObjectEvent( + operatorId: String, + uri: String, + eventType: String + ): Unit = { + ExecutionContext.getBigObjectEventCallback.foreach { callback => + try { + callback(operatorId, uri, eventType) + } catch { + case e: Exception => + logger.warn(s"Failed to send BigObjectEvent: ${e.getMessage}") + } + } + } + /** * Deletes all big objects associated with a specific execution ID. * This is called during workflow execution cleanup.
