This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/chris-big-object-version-0 by 
this push:
     new 898ccd988a Add websocket event BigObjectUpdateEvent
898ccd988a is described below

commit 898ccd988ad1ec87648c5549394860e946b0ec8e
Author: Kunwoo Park <[email protected]>
AuthorDate: Mon Oct 27 16:19:48 2025 -0700

    Add websocket event BigObjectUpdateEvent
---
 .../engine/architecture/rpc/controlcommands.proto  | 18 +++++++
 .../architecture/rpc/controllerservice.proto       |  1 +
 .../python/core/architecture/managers/context.py   |  1 +
 .../architecture/managers/execution_context.py     | 17 ++++++-
 .../main/python/core/runnables/data_processor.py   |  2 +
 amber/src/main/python/core/runnables/main_loop.py  |  3 ++
 .../amber/engine/architecture/rpc/__init__.py      | 57 ++++++++++++++++++++++
 .../python/pytexera/storage/big_object_manager.py  | 41 ++++++++++++++++
 .../ControllerAsyncRPCHandlerInitializer.scala     |  1 +
 .../promisehandlers/BigObjectEventHandler.scala    | 40 +++++++++++++++
 .../InitializeExecutorHandler.scala                | 21 +++++++-
 .../websocket/event/BigObjectUpdateEvent.scala     | 24 +++++++++
 .../websocket/event/TexeraWebSocketEvent.scala     |  3 +-
 .../web/service/ExecutionBigObjectService.scala    | 48 ++++++++++++++++++
 .../web/service/WorkflowExecutionService.scala     |  2 +
 .../texera/web/storage/ExecutionStateStore.scala   | 13 ++++-
 .../amber/core/executor/ExecutionContext.scala     | 18 +++++++
 .../texera/service/util/BigObjectManager.scala     | 31 ++++++++++++
 18 files changed, 337 insertions(+), 4 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
index d596f8b044..61a38d1415 100644
--- 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
@@ -46,6 +46,7 @@ message ControlRequest {
     PortCompletedRequest portCompletedRequest = 9;
     WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
     LinkWorkersRequest linkWorkersRequest = 11;
+    BigObjectEventTriggeredRequest bigObjectEventTriggeredRequest = 12;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -163,6 +164,23 @@ message ConsoleMessageTriggeredRequest {
   ConsoleMessage consoleMessage = 1 [(scalapb.field).no_box = true];
 }
 
+enum BigObjectEventType {
+  CREATE = 0;
+  READ = 1;
+}
+
+message BigObjectEvent {
+  option (scalapb.message).extends = 
"org.apache.amber.engine.architecture.controller.ClientEvent";
+  string operator_id = 1;
+  string uri = 2;
+  BigObjectEventType event_type = 3;
+  google.protobuf.Timestamp timestamp = 4 [(scalapb.field).no_box = true];
+}
+
+message BigObjectEventTriggeredRequest {
+  BigObjectEvent event = 1 [(scalapb.field).no_box = true];
+}
+
 message PortCompletedRequest {
   core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
   bool input = 2;
diff --git 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
index b03cd0f51f..402ee4c5aa 100644
--- 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
@@ -36,6 +36,7 @@ service ControllerService {
   rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
   rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns 
(EvaluatePythonExpressionResponse);
   rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns 
(EmptyReturn);
+  rpc BigObjectEventTriggered(BigObjectEventTriggeredRequest) returns 
(EmptyReturn);
   rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn);
   rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse);
   rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn);
diff --git a/amber/src/main/python/core/architecture/managers/context.py 
b/amber/src/main/python/core/architecture/managers/context.py
index fa92a6db26..c258345efe 100644
--- a/amber/src/main/python/core/architecture/managers/context.py
+++ b/amber/src/main/python/core/architecture/managers/context.py
@@ -53,6 +53,7 @@ class Context:
         # operator_id: parsed from worker_id (format: 
Worker:WF{wid}-{opid}-{layer}-{idx})
         self.execution_id: Optional[int] = None
         self.operator_id: Optional[str] = None
+        self.async_rpc_client = None  # Set by MainLoop after creation
 
         # Extract operator_id from worker_id (e.g., 
"Worker:WF78-op-scan-main-0" → "op-scan")
         match = re.match(r"Worker:WF\d+-(.+)-\w+-\d+", worker_id)
diff --git 
a/amber/src/main/python/core/architecture/managers/execution_context.py 
b/amber/src/main/python/core/architecture/managers/execution_context.py
index 31e02436a6..508d5c6327 100644
--- a/amber/src/main/python/core/architecture/managers/execution_context.py
+++ b/amber/src/main/python/core/architecture/managers/execution_context.py
@@ -22,7 +22,10 @@ execution_id and operator_id without requiring explicit 
parameters.
 """
 
 import threading
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from core.architecture.rpc.async_rpc_client import AsyncRPCClient
 
 
 class ExecutionContext:
@@ -53,6 +56,16 @@ class ExecutionContext:
         """Get the operator ID for the current thread, or None if not set."""
         return getattr(cls._thread_local, "operator_id", None)
 
+    @classmethod
+    def set_async_rpc_client(cls, client: "AsyncRPCClient") -> None:
+        """Set the AsyncRPCClient for the current thread."""
+        cls._thread_local.async_rpc_client = client
+
+    @classmethod
+    def get_async_rpc_client(cls) -> Optional["AsyncRPCClient"]:
+        """Get the AsyncRPCClient for the current thread, or None if not 
set."""
+        return getattr(cls._thread_local, "async_rpc_client", None)
+
     @classmethod
     def clear(cls) -> None:
         """Clear all execution context information for the current thread."""
@@ -60,3 +73,5 @@ class ExecutionContext:
             delattr(cls._thread_local, "execution_id")
         if hasattr(cls._thread_local, "operator_id"):
             delattr(cls._thread_local, "operator_id")
+        if hasattr(cls._thread_local, "async_rpc_client"):
+            delattr(cls._thread_local, "async_rpc_client")
diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 0c9747a406..dc0f902710 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -55,6 +55,8 @@ class DataProcessor(Runnable, Stoppable):
             # Sync ExecutionContext from context (for thread-local storage)
             ExecutionContext.set_execution_id(self._context.execution_id)
             ExecutionContext.set_operator_id(self._context.operator_id)
+            if self._context.async_rpc_client:
+                
ExecutionContext.set_async_rpc_client(self._context.async_rpc_client)
 
             marker = 
self._context.tuple_processing_manager.get_internal_marker()
             state = self._context.state_processing_manager.get_input_state()
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index ec662ec721..3a984db9df 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -81,6 +81,9 @@ class MainLoop(StoppableQueueBlockingRunnable):
         self.context = Context(worker_id, input_queue)
         self._async_rpc_server = AsyncRPCServer(output_queue, 
context=self.context)
         self._async_rpc_client = AsyncRPCClient(output_queue, 
context=self.context)
+        self.context.async_rpc_client = (
+            self._async_rpc_client
+        )  # Store reference for BigObjectManager
 
         self.data_processor = DataProcessor(self.context)
         threading.Thread(
diff --git 
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
index 258fc928f7..12619a516b 100644
--- 
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
@@ -60,6 +60,11 @@ class ConsoleMessageType(betterproto.Enum):
     DEBUGGER = 3
 
 
+class BigObjectEventType(betterproto.Enum):
+    CREATE = 0
+    READ = 1
+
+
 class ErrorLanguage(betterproto.Enum):
     PYTHON = 0
     SCALA = 1
@@ -113,6 +118,9 @@ class ControlRequest(betterproto.Message):
     link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
         11, group="sealed_value"
     )
+    big_object_event_triggered_request: "BigObjectEventTriggeredRequest" = (
+        betterproto.message_field(12, group="sealed_value")
+    )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
     )
@@ -250,6 +258,19 @@ class ConsoleMessageTriggeredRequest(betterproto.Message):
     console_message: "ConsoleMessage" = betterproto.message_field(1)
 
 
+@dataclass(eq=False, repr=False)
+class BigObjectEvent(betterproto.Message):
+    operator_id: str = betterproto.string_field(1)
+    uri: str = betterproto.string_field(2)
+    event_type: "BigObjectEventType" = betterproto.enum_field(3)
+    timestamp: datetime = betterproto.message_field(4)
+
+
+@dataclass(eq=False, repr=False)
+class BigObjectEventTriggeredRequest(betterproto.Message):
+    event: "BigObjectEvent" = betterproto.message_field(1)
+
+
 @dataclass(eq=False, repr=False)
 class PortCompletedRequest(betterproto.Message):
     port_id: "___core__.PortIdentity" = betterproto.message_field(1)
@@ -1152,6 +1173,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def big_object_event_triggered(
+        self,
+        big_object_event_triggered_request: "BigObjectEventTriggeredRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered",
+            big_object_event_triggered_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
     async def port_completed(
         self,
         port_completed_request: "PortCompletedRequest",
@@ -1865,6 +1903,11 @@ class ControllerServiceBase(ServiceBase):
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
+    async def big_object_event_triggered(
+        self, big_object_event_triggered_request: 
"BigObjectEventTriggeredRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
     async def port_completed(
         self, port_completed_request: "PortCompletedRequest"
     ) -> "EmptyReturn":
@@ -1953,6 +1996,14 @@ class ControllerServiceBase(ServiceBase):
         response = await self.console_message_triggered(request)
         await stream.send_message(response)
 
+    async def __rpc_big_object_event_triggered(
+        self,
+        stream: "grpclib.server.Stream[BigObjectEventTriggeredRequest, 
EmptyReturn]",
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.big_object_event_triggered(request)
+        await stream.send_message(response)
+
     async def __rpc_port_completed(
         self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
     ) -> None:
@@ -2054,6 +2105,12 @@ class ControllerServiceBase(ServiceBase):
                 ConsoleMessageTriggeredRequest,
                 EmptyReturn,
             ),
+            
"/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered":
 grpclib.const.Handler(
+                self.__rpc_big_object_event_triggered,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                BigObjectEventTriggeredRequest,
+                EmptyReturn,
+            ),
             
"/org.apache.amber.engine.architecture.rpc.ControllerService/PortCompleted": 
grpclib.const.Handler(
                 self.__rpc_port_completed,
                 grpclib.const.Cardinality.UNARY_UNARY,
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py 
b/amber/src/main/python/pytexera/storage/big_object_manager.py
index 2fa2c7398c..eb640eb57b 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -21,6 +21,7 @@ import time
 import uuid
 from typing import BinaryIO, Union, Optional
 from io import BytesIO
+from datetime import datetime
 from core.models.schema.big_object_pointer import BigObjectPointer
 from core.storage.storage_config import StorageConfig
 from core.architecture.managers.execution_context import ExecutionContext
@@ -216,6 +217,9 @@ class BigObjectManager:
                 pass
             raise RuntimeError(f"Failed to create big object: {e}")
 
+        # Send event to frontend
+        cls._send_big_object_event(operator_id, uri, "CREATE")
+
         return BigObjectPointer(uri)
 
     @classmethod
@@ -240,6 +244,43 @@ class BigObjectManager:
 
         try:
             body = cls._get_s3_client().get_object(Bucket=bucket, 
Key=key)["Body"]
+
+            # Send event to frontend
+            operator_id = ExecutionContext.get_operator_id()
+            if operator_id:
+                cls._send_big_object_event(operator_id, pointer.uri, "READ")
+
             return BigObjectStream(body, pointer)
         except Exception as e:
             raise RuntimeError(f"Failed to open {pointer.uri}: {e}")
+
+    @classmethod
+    def _send_big_object_event(cls, operator_id: str, uri: str, event_type: 
str):
+        """Sends BigObjectEvent to controller for forwarding to frontend."""
+        try:
+            from proto.org.apache.amber.engine.architecture.rpc import (
+                BigObjectEvent,
+                BigObjectEventTriggeredRequest,
+                BigObjectEventType,
+            )
+            from datetime import datetime, timezone
+
+            client = ExecutionContext.get_async_rpc_client()
+            if not client:
+                return
+
+            event = BigObjectEvent(
+                operator_id=operator_id,
+                uri=uri,
+                event_type=(
+                    BigObjectEventType.CREATE
+                    if event_type == "CREATE"
+                    else BigObjectEventType.READ
+                ),
+                timestamp=datetime.now(timezone.utc),
+            )
+            client.controller_stub().big_object_event_triggered(
+                BigObjectEventTriggeredRequest(event=event)
+            )
+        except Exception:
+            pass
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 61d5e3869b..226cfceda0 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -41,6 +41,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with StartWorkflowHandler
     with PortCompletedHandler
     with ConsoleMessageHandler
+    with BigObjectEventHandler
     with RetryWorkflowHandler
     with EvaluatePythonExpressionHandler
     with DebugCommandHandler
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
new file mode 100644
index 0000000000..1bc799c55b
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import 
org.apache.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  BigObjectEventTriggeredRequest
+}
+import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+
+trait BigObjectEventHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
+
+  override def bigObjectEventTriggered(
+      msg: BigObjectEventTriggeredRequest,
+      ctx: AsyncRPCContext
+  ): Future[EmptyReturn] = {
+    sendToClient(msg.event)
+    EmptyReturn()
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 871d1a2efb..d3272cf95a 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -19,18 +19,24 @@
 
 package org.apache.amber.engine.architecture.worker.promisehandlers
 
+import com.google.protobuf.timestamp.Timestamp
 import com.twitter.util.Future
 import org.apache.amber.core.executor._
 import org.apache.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
+  BigObjectEvent,
+  BigObjectEventTriggeredRequest,
+  BigObjectEventType,
   InitializeExecutorRequest
 }
 import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
+import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.amber.operator.source.cache.CacheSourceOpExec
 import org.apache.amber.util.VirtualIdentityUtils
 
 import java.net.URI
+import java.time.Instant
 
 trait InitializeExecutorHandler {
   this: DataProcessorRPCHandlerInitializer =>
@@ -43,9 +49,22 @@ trait InitializeExecutorHandler {
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
     val workerCount = req.totalWorkerCount
 
-    // Set execution context for this thread
     ExecutionContext.setExecutionId(req.executionId)
     
ExecutionContext.setOperatorId(VirtualIdentityUtils.getPhysicalOpId(actorId).logicalOpId.id)
+    ExecutionContext.setBigObjectEventCallback((operatorId, uri, eventType) =>
+      try {
+        val event = BigObjectEvent(
+          operatorId,
+          uri,
+          if (eventType == "CREATE") BigObjectEventType.CREATE else 
BigObjectEventType.READ,
+          Timestamp(Instant.now)
+        )
+        dp.asyncRPCClient.controllerInterface.bigObjectEventTriggered(
+          BigObjectEventTriggeredRequest(event),
+          dp.asyncRPCClient.mkContext(CONTROLLER)
+        )
+      } catch { case _: Exception => }
+    )
 
     dp.executor = req.opExecInitInfo match {
       case OpExecWithClassName(className, descString) =>
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
new file mode 100644
index 0000000000..1f06a9a256
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.model.websocket.event
+
+import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent
+
+case class BigObjectUpdateEvent(events: Seq[BigObjectEvent]) extends 
TexeraWebSocketEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
index da072c80ea..d14184ca6f 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
@@ -38,7 +38,8 @@ import 
org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify
     new Type(value = classOf[PaginatedResultEvent]),
     new Type(value = classOf[PythonExpressionEvaluateResponse]),
     new Type(value = classOf[WorkerAssignmentUpdateEvent]),
-    new Type(value = classOf[ModifyLogicResponse])
+    new Type(value = classOf[ModifyLogicResponse]),
+    new Type(value = classOf[BigObjectUpdateEvent])
   )
 )
 trait TexeraWebSocketEvent {}
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
new file mode 100644
index 0000000000..abcd7baac1
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent
+import org.apache.amber.engine.common.client.AmberClient
+import org.apache.texera.web.SubscriptionManager
+import org.apache.texera.web.model.websocket.event.BigObjectUpdateEvent
+import org.apache.texera.web.storage.ExecutionStateStore
+
+/**
+  * Forwards BigObjectEvent messages from workers to frontend via websocket.
+  */
+class ExecutionBigObjectService(
+    client: AmberClient,
+    stateStore: ExecutionStateStore
+) extends SubscriptionManager {
+
+  addSubscription(
+    client.registerCallback[BigObjectEvent](evt => 
stateStore.bigObjectStore.updateState(_ :+ evt))
+  )
+
+  addSubscription(
+    stateStore.bigObjectStore.registerDiffHandler((oldEvents, newEvents) =>
+      newEvents.diff(oldEvents) match {
+        case diff if diff.nonEmpty => Seq(BigObjectUpdateEvent(diff))
+        case _                     => Seq.empty
+      }
+    )
+  )
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index bfbff230e1..0788e06b9a 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -102,6 +102,7 @@ class WorkflowExecutionService(
   var executionStatsService: ExecutionStatsService = _
   var executionRuntimeService: ExecutionRuntimeService = _
   var executionConsoleService: ExecutionConsoleService = _
+  var executionBigObjectService: ExecutionBigObjectService = _
 
   def executeWorkflow(): Unit = {
     try {
@@ -134,6 +135,7 @@ class WorkflowExecutionService(
     )
     executionConsoleService =
       new ExecutionConsoleService(client, executionStateStore, wsInput, 
workflow.context)
+    executionBigObjectService = new ExecutionBigObjectService(client, 
executionStateStore)
 
     logger.info("Starting the workflow execution.")
     resultService.attachToExecution(
diff --git 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
index 3700b44560..d97c789990 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
@@ -55,8 +55,19 @@ class ExecutionStateStore {
   val consoleStore = new StateStore(ExecutionConsoleStore())
   val breakpointStore = new StateStore(ExecutionBreakpointStore())
   val reconfigurationStore = new StateStore(ExecutionReconfigurationStore())
+  val bigObjectStore =
+    new 
StateStore[Seq[org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent]](
+      Seq.empty
+    )
 
   def getAllStores: Iterable[StateStore[_]] = {
-    Iterable(statsStore, consoleStore, breakpointStore, metadataStore, 
reconfigurationStore)
+    Iterable(
+      statsStore,
+      consoleStore,
+      breakpointStore,
+      metadataStore,
+      reconfigurationStore,
+      bigObjectStore
+    )
   }
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
index f448f3eb59..eecf554110 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
@@ -27,6 +27,8 @@ object ExecutionContext {
   private val executionIdThreadLocal: ThreadLocal[Option[Int]] = 
ThreadLocal.withInitial(() => None)
   private val operatorIdThreadLocal: ThreadLocal[Option[String]] =
     ThreadLocal.withInitial(() => None)
+  private val eventCallbackThreadLocal: ThreadLocal[Option[(String, String, 
String) => Unit]] =
+    ThreadLocal.withInitial(() => None)
 
   /**
     * Sets the execution ID for the current thread.
@@ -60,6 +62,21 @@ object ExecutionContext {
     operatorIdThreadLocal.get()
   }
 
+  /**
+    * Sets a callback for sending big object events.
+    * The callback takes (operatorId, uri, eventType) as parameters.
+    */
+  def setBigObjectEventCallback(callback: (String, String, String) => Unit): 
Unit = {
+    eventCallbackThreadLocal.set(Some(callback))
+  }
+
+  /**
+    * Gets the big object event callback for the current thread.
+    */
+  def getBigObjectEventCallback: Option[(String, String, String) => Unit] = {
+    eventCallbackThreadLocal.get()
+  }
+
   /**
     * Clears the execution context for the current thread.
     * Should be called when cleaning up.
@@ -67,5 +84,6 @@ object ExecutionContext {
   def clear(): Unit = {
     executionIdThreadLocal.remove()
     operatorIdThreadLocal.remove()
+    eventCallbackThreadLocal.remove()
   }
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index a7a3d1b2b3..c7b8edef6b 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -116,6 +116,9 @@ object BigObjectManager extends LazyLogging {
         throw new RuntimeException(s"Failed to create big object: 
${e.getMessage}", e)
     }
 
+    // Send event to frontend
+    sendBigObjectEvent(operatorId, uri, "CREATE")
+
     new BigObjectPointer(uri)
   }
 
@@ -132,9 +135,37 @@ object BigObjectManager extends LazyLogging {
     )
 
     val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, 
ptr.getObjectKey)
+
+    // Send event to frontend
+    ExecutionContext.getOperatorId.foreach { operatorId =>
+      sendBigObjectEvent(operatorId, ptr.getUri, "READ")
+    }
+
     new BigObjectStream(inputStream)
   }
 
+  /**
+    * Sends a BigObjectEvent using the registered callback.
+    *
+    * @param operatorId The operator ID
+    * @param uri The big object URI
+    * @param eventType The event type ("CREATE" or "READ")
+    */
+  private def sendBigObjectEvent(
+      operatorId: String,
+      uri: String,
+      eventType: String
+  ): Unit = {
+    ExecutionContext.getBigObjectEventCallback.foreach { callback =>
+      try {
+        callback(operatorId, uri, eventType)
+      } catch {
+        case e: Exception =>
+          logger.warn(s"Failed to send BigObjectEvent: ${e.getMessage}")
+      }
+    }
+  }
+
   /**
     * Deletes all big objects associated with a specific execution ID.
     * This is called during workflow execution cleanup.

Reply via email to