This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/chris-big-object-version-0 by
this push:
new 898ccd988a Add websocket event BigObjectUpdateEvent
898ccd988a is described below
commit 898ccd988ad1ec87648c5549394860e946b0ec8e
Author: Kunwoo Park <[email protected]>
AuthorDate: Mon Oct 27 16:19:48 2025 -0700
Add websocket event BigObjectUpdateEvent
---
.../engine/architecture/rpc/controlcommands.proto | 18 +++++++
.../architecture/rpc/controllerservice.proto | 1 +
.../python/core/architecture/managers/context.py | 1 +
.../architecture/managers/execution_context.py | 17 ++++++-
.../main/python/core/runnables/data_processor.py | 2 +
amber/src/main/python/core/runnables/main_loop.py | 3 ++
.../amber/engine/architecture/rpc/__init__.py | 57 ++++++++++++++++++++++
.../python/pytexera/storage/big_object_manager.py | 41 ++++++++++++++++
.../ControllerAsyncRPCHandlerInitializer.scala | 1 +
.../promisehandlers/BigObjectEventHandler.scala | 40 +++++++++++++++
.../InitializeExecutorHandler.scala | 21 +++++++-
.../websocket/event/BigObjectUpdateEvent.scala | 24 +++++++++
.../websocket/event/TexeraWebSocketEvent.scala | 3 +-
.../web/service/ExecutionBigObjectService.scala | 48 ++++++++++++++++++
.../web/service/WorkflowExecutionService.scala | 2 +
.../texera/web/storage/ExecutionStateStore.scala | 13 ++++-
.../amber/core/executor/ExecutionContext.scala | 18 +++++++
.../texera/service/util/BigObjectManager.scala | 31 ++++++++++++
18 files changed, 337 insertions(+), 4 deletions(-)
diff --git
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
index d596f8b044..61a38d1415 100644
---
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
+++
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
@@ -46,6 +46,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
+ BigObjectEventTriggeredRequest bigObjectEventTriggeredRequest = 12;
// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
@@ -163,6 +164,23 @@ message ConsoleMessageTriggeredRequest {
ConsoleMessage consoleMessage = 1 [(scalapb.field).no_box = true];
}
+enum BigObjectEventType {
+ CREATE = 0;
+ READ = 1;
+}
+
+message BigObjectEvent {
+ option (scalapb.message).extends =
"org.apache.amber.engine.architecture.controller.ClientEvent";
+ string operator_id = 1;
+ string uri = 2;
+ BigObjectEventType event_type = 3;
+ google.protobuf.Timestamp timestamp = 4 [(scalapb.field).no_box = true];
+}
+
+message BigObjectEventTriggeredRequest {
+ BigObjectEvent event = 1 [(scalapb.field).no_box = true];
+}
+
message PortCompletedRequest {
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
bool input = 2;
diff --git
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
index b03cd0f51f..402ee4c5aa 100644
---
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
+++
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controllerservice.proto
@@ -36,6 +36,7 @@ service ControllerService {
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns
(EvaluatePythonExpressionResponse);
rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns
(EmptyReturn);
+ rpc BigObjectEventTriggered(BigObjectEventTriggeredRequest) returns
(EmptyReturn);
rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn);
rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse);
rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn);
diff --git a/amber/src/main/python/core/architecture/managers/context.py
b/amber/src/main/python/core/architecture/managers/context.py
index fa92a6db26..c258345efe 100644
--- a/amber/src/main/python/core/architecture/managers/context.py
+++ b/amber/src/main/python/core/architecture/managers/context.py
@@ -53,6 +53,7 @@ class Context:
# operator_id: parsed from worker_id (format:
Worker:WF{wid}-{opid}-{layer}-{idx})
self.execution_id: Optional[int] = None
self.operator_id: Optional[str] = None
+ self.async_rpc_client = None # Set by MainLoop after creation
# Extract operator_id from worker_id (e.g.,
"Worker:WF78-op-scan-main-0" → "op-scan")
match = re.match(r"Worker:WF\d+-(.+)-\w+-\d+", worker_id)
diff --git
a/amber/src/main/python/core/architecture/managers/execution_context.py
b/amber/src/main/python/core/architecture/managers/execution_context.py
index 31e02436a6..508d5c6327 100644
--- a/amber/src/main/python/core/architecture/managers/execution_context.py
+++ b/amber/src/main/python/core/architecture/managers/execution_context.py
@@ -22,7 +22,10 @@ execution_id and operator_id without requiring explicit
parameters.
"""
import threading
-from typing import Optional
+from typing import Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from core.architecture.rpc.async_rpc_client import AsyncRPCClient
class ExecutionContext:
@@ -53,6 +56,16 @@ class ExecutionContext:
"""Get the operator ID for the current thread, or None if not set."""
return getattr(cls._thread_local, "operator_id", None)
+ @classmethod
+ def set_async_rpc_client(cls, client: "AsyncRPCClient") -> None:
+ """Set the AsyncRPCClient for the current thread."""
+ cls._thread_local.async_rpc_client = client
+
+ @classmethod
+ def get_async_rpc_client(cls) -> Optional["AsyncRPCClient"]:
+ """Get the AsyncRPCClient for the current thread, or None if not
set."""
+ return getattr(cls._thread_local, "async_rpc_client", None)
+
@classmethod
def clear(cls) -> None:
"""Clear all execution context information for the current thread."""
@@ -60,3 +73,5 @@ class ExecutionContext:
delattr(cls._thread_local, "execution_id")
if hasattr(cls._thread_local, "operator_id"):
delattr(cls._thread_local, "operator_id")
+ if hasattr(cls._thread_local, "async_rpc_client"):
+ delattr(cls._thread_local, "async_rpc_client")
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 0c9747a406..dc0f902710 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -55,6 +55,8 @@ class DataProcessor(Runnable, Stoppable):
# Sync ExecutionContext from context (for thread-local storage)
ExecutionContext.set_execution_id(self._context.execution_id)
ExecutionContext.set_operator_id(self._context.operator_id)
+ if self._context.async_rpc_client:
+
ExecutionContext.set_async_rpc_client(self._context.async_rpc_client)
marker =
self._context.tuple_processing_manager.get_internal_marker()
state = self._context.state_processing_manager.get_input_state()
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index ec662ec721..3a984db9df 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -81,6 +81,9 @@ class MainLoop(StoppableQueueBlockingRunnable):
self.context = Context(worker_id, input_queue)
self._async_rpc_server = AsyncRPCServer(output_queue,
context=self.context)
self._async_rpc_client = AsyncRPCClient(output_queue,
context=self.context)
+ self.context.async_rpc_client = (
+ self._async_rpc_client
+ ) # Store reference for BigObjectManager
self.data_processor = DataProcessor(self.context)
threading.Thread(
diff --git
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
index 258fc928f7..12619a516b 100644
---
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
+++
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
@@ -60,6 +60,11 @@ class ConsoleMessageType(betterproto.Enum):
DEBUGGER = 3
+class BigObjectEventType(betterproto.Enum):
+ CREATE = 0
+ READ = 1
+
+
class ErrorLanguage(betterproto.Enum):
PYTHON = 0
SCALA = 1
@@ -113,6 +118,9 @@ class ControlRequest(betterproto.Message):
link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
11, group="sealed_value"
)
+ big_object_event_triggered_request: "BigObjectEventTriggeredRequest" = (
+ betterproto.message_field(12, group="sealed_value")
+ )
add_input_channel_request: "AddInputChannelRequest" =
betterproto.message_field(
50, group="sealed_value"
)
@@ -250,6 +258,19 @@ class ConsoleMessageTriggeredRequest(betterproto.Message):
console_message: "ConsoleMessage" = betterproto.message_field(1)
+@dataclass(eq=False, repr=False)
+class BigObjectEvent(betterproto.Message):
+ operator_id: str = betterproto.string_field(1)
+ uri: str = betterproto.string_field(2)
+ event_type: "BigObjectEventType" = betterproto.enum_field(3)
+ timestamp: datetime = betterproto.message_field(4)
+
+
+@dataclass(eq=False, repr=False)
+class BigObjectEventTriggeredRequest(betterproto.Message):
+ event: "BigObjectEvent" = betterproto.message_field(1)
+
+
@dataclass(eq=False, repr=False)
class PortCompletedRequest(betterproto.Message):
port_id: "___core__.PortIdentity" = betterproto.message_field(1)
@@ -1152,6 +1173,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
metadata=metadata,
)
+ async def big_object_event_triggered(
+ self,
+ big_object_event_triggered_request: "BigObjectEventTriggeredRequest",
+ *,
+ timeout: Optional[float] = None,
+ deadline: Optional["Deadline"] = None,
+ metadata: Optional["MetadataLike"] = None
+ ) -> "EmptyReturn":
+ return await self._unary_unary(
+
"/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered",
+ big_object_event_triggered_request,
+ EmptyReturn,
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ )
+
async def port_completed(
self,
port_completed_request: "PortCompletedRequest",
@@ -1865,6 +1903,11 @@ class ControllerServiceBase(ServiceBase):
) -> "EmptyReturn":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+ async def big_object_event_triggered(
+ self, big_object_event_triggered_request:
"BigObjectEventTriggeredRequest"
+ ) -> "EmptyReturn":
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
async def port_completed(
self, port_completed_request: "PortCompletedRequest"
) -> "EmptyReturn":
@@ -1953,6 +1996,14 @@ class ControllerServiceBase(ServiceBase):
response = await self.console_message_triggered(request)
await stream.send_message(response)
+ async def __rpc_big_object_event_triggered(
+ self,
+ stream: "grpclib.server.Stream[BigObjectEventTriggeredRequest,
EmptyReturn]",
+ ) -> None:
+ request = await stream.recv_message()
+ response = await self.big_object_event_triggered(request)
+ await stream.send_message(response)
+
async def __rpc_port_completed(
self, stream: "grpclib.server.Stream[PortCompletedRequest,
EmptyReturn]"
) -> None:
@@ -2054,6 +2105,12 @@ class ControllerServiceBase(ServiceBase):
ConsoleMessageTriggeredRequest,
EmptyReturn,
),
+
"/org.apache.amber.engine.architecture.rpc.ControllerService/BigObjectEventTriggered":
grpclib.const.Handler(
+ self.__rpc_big_object_event_triggered,
+ grpclib.const.Cardinality.UNARY_UNARY,
+ BigObjectEventTriggeredRequest,
+ EmptyReturn,
+ ),
"/org.apache.amber.engine.architecture.rpc.ControllerService/PortCompleted":
grpclib.const.Handler(
self.__rpc_port_completed,
grpclib.const.Cardinality.UNARY_UNARY,
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py
b/amber/src/main/python/pytexera/storage/big_object_manager.py
index 2fa2c7398c..eb640eb57b 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -21,6 +21,7 @@ import time
import uuid
from typing import BinaryIO, Union, Optional
from io import BytesIO
+from datetime import datetime
from core.models.schema.big_object_pointer import BigObjectPointer
from core.storage.storage_config import StorageConfig
from core.architecture.managers.execution_context import ExecutionContext
@@ -216,6 +217,9 @@ class BigObjectManager:
pass
raise RuntimeError(f"Failed to create big object: {e}")
+ # Send event to frontend
+ cls._send_big_object_event(operator_id, uri, "CREATE")
+
return BigObjectPointer(uri)
@classmethod
@@ -240,6 +244,43 @@ class BigObjectManager:
try:
body = cls._get_s3_client().get_object(Bucket=bucket,
Key=key)["Body"]
+
+ # Send event to frontend
+ operator_id = ExecutionContext.get_operator_id()
+ if operator_id:
+ cls._send_big_object_event(operator_id, pointer.uri, "READ")
+
return BigObjectStream(body, pointer)
except Exception as e:
raise RuntimeError(f"Failed to open {pointer.uri}: {e}")
+
+ @classmethod
+ def _send_big_object_event(cls, operator_id: str, uri: str, event_type:
str):
+ """Sends BigObjectEvent to controller for forwarding to frontend."""
+ try:
+ from proto.org.apache.amber.engine.architecture.rpc import (
+ BigObjectEvent,
+ BigObjectEventTriggeredRequest,
+ BigObjectEventType,
+ )
+ from datetime import datetime, timezone
+
+ client = ExecutionContext.get_async_rpc_client()
+ if not client:
+ return
+
+ event = BigObjectEvent(
+ operator_id=operator_id,
+ uri=uri,
+ event_type=(
+ BigObjectEventType.CREATE
+ if event_type == "CREATE"
+ else BigObjectEventType.READ
+ ),
+ timestamp=datetime.now(timezone.utc),
+ )
+ client.controller_stub().big_object_event_triggered(
+ BigObjectEventTriggeredRequest(event=event)
+ )
+ except Exception:
+ pass
diff --git
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 61d5e3869b..226cfceda0 100644
---
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -41,6 +41,7 @@ class ControllerAsyncRPCHandlerInitializer(
with StartWorkflowHandler
with PortCompletedHandler
with ConsoleMessageHandler
+ with BigObjectEventHandler
with RetryWorkflowHandler
with EvaluatePythonExpressionHandler
with DebugCommandHandler
diff --git
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
new file mode 100644
index 0000000000..1bc799c55b
--- /dev/null
+++
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/promisehandlers/BigObjectEventHandler.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import
org.apache.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import org.apache.amber.engine.architecture.rpc.controlcommands.{
+ AsyncRPCContext,
+ BigObjectEventTriggeredRequest
+}
+import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+
+trait BigObjectEventHandler {
+ this: ControllerAsyncRPCHandlerInitializer =>
+
+ override def bigObjectEventTriggered(
+ msg: BigObjectEventTriggeredRequest,
+ ctx: AsyncRPCContext
+ ): Future[EmptyReturn] = {
+ sendToClient(msg.event)
+ EmptyReturn()
+ }
+}
diff --git
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 871d1a2efb..d3272cf95a 100644
---
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -19,18 +19,24 @@
package org.apache.amber.engine.architecture.worker.promisehandlers
+import com.google.protobuf.timestamp.Timestamp
import com.twitter.util.Future
import org.apache.amber.core.executor._
import org.apache.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
+ BigObjectEvent,
+ BigObjectEventTriggeredRequest,
+ BigObjectEventType,
InitializeExecutorRequest
}
import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import
org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
+import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.amber.operator.source.cache.CacheSourceOpExec
import org.apache.amber.util.VirtualIdentityUtils
import java.net.URI
+import java.time.Instant
trait InitializeExecutorHandler {
this: DataProcessorRPCHandlerInitializer =>
@@ -43,9 +49,22 @@ trait InitializeExecutorHandler {
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
val workerCount = req.totalWorkerCount
- // Set execution context for this thread
ExecutionContext.setExecutionId(req.executionId)
ExecutionContext.setOperatorId(VirtualIdentityUtils.getPhysicalOpId(actorId).logicalOpId.id)
+ ExecutionContext.setBigObjectEventCallback((operatorId, uri, eventType) =>
+ try {
+ val event = BigObjectEvent(
+ operatorId,
+ uri,
+ if (eventType == "CREATE") BigObjectEventType.CREATE else
BigObjectEventType.READ,
+ Timestamp(Instant.now)
+ )
+ dp.asyncRPCClient.controllerInterface.bigObjectEventTriggered(
+ BigObjectEventTriggeredRequest(event),
+ dp.asyncRPCClient.mkContext(CONTROLLER)
+ )
+ } catch { case _: Exception => }
+ )
dp.executor = req.opExecInitInfo match {
case OpExecWithClassName(className, descString) =>
diff --git
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
new file mode 100644
index 0000000000..1f06a9a256
--- /dev/null
+++
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/BigObjectUpdateEvent.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.model.websocket.event
+
+import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent
+
+case class BigObjectUpdateEvent(events: Seq[BigObjectEvent]) extends
TexeraWebSocketEvent
diff --git
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
index da072c80ea..d14184ca6f 100644
---
a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
+++
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala
@@ -38,7 +38,8 @@ import
org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify
new Type(value = classOf[PaginatedResultEvent]),
new Type(value = classOf[PythonExpressionEvaluateResponse]),
new Type(value = classOf[WorkerAssignmentUpdateEvent]),
- new Type(value = classOf[ModifyLogicResponse])
+ new Type(value = classOf[ModifyLogicResponse]),
+ new Type(value = classOf[BigObjectUpdateEvent])
)
)
trait TexeraWebSocketEvent {}
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
new file mode 100644
index 0000000000..abcd7baac1
--- /dev/null
+++
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionBigObjectService.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.web.service
+
+import org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent
+import org.apache.amber.engine.common.client.AmberClient
+import org.apache.texera.web.SubscriptionManager
+import org.apache.texera.web.model.websocket.event.BigObjectUpdateEvent
+import org.apache.texera.web.storage.ExecutionStateStore
+
+/**
+ * Forwards BigObjectEvent messages from workers to frontend via websocket.
+ */
+class ExecutionBigObjectService(
+ client: AmberClient,
+ stateStore: ExecutionStateStore
+) extends SubscriptionManager {
+
+ addSubscription(
+ client.registerCallback[BigObjectEvent](evt =>
stateStore.bigObjectStore.updateState(_ :+ evt))
+ )
+
+ addSubscription(
+ stateStore.bigObjectStore.registerDiffHandler((oldEvents, newEvents) =>
+ newEvents.diff(oldEvents) match {
+ case diff if diff.nonEmpty => Seq(BigObjectUpdateEvent(diff))
+ case _ => Seq.empty
+ }
+ )
+ )
+}
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
index bfbff230e1..0788e06b9a 100644
---
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
+++
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala
@@ -102,6 +102,7 @@ class WorkflowExecutionService(
var executionStatsService: ExecutionStatsService = _
var executionRuntimeService: ExecutionRuntimeService = _
var executionConsoleService: ExecutionConsoleService = _
+ var executionBigObjectService: ExecutionBigObjectService = _
def executeWorkflow(): Unit = {
try {
@@ -134,6 +135,7 @@ class WorkflowExecutionService(
)
executionConsoleService =
new ExecutionConsoleService(client, executionStateStore, wsInput,
workflow.context)
+ executionBigObjectService = new ExecutionBigObjectService(client,
executionStateStore)
logger.info("Starting the workflow execution.")
resultService.attachToExecution(
diff --git
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
index 3700b44560..d97c789990 100644
---
a/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
+++
b/amber/src/main/scala/org/apache/texera/web/storage/ExecutionStateStore.scala
@@ -55,8 +55,19 @@ class ExecutionStateStore {
val consoleStore = new StateStore(ExecutionConsoleStore())
val breakpointStore = new StateStore(ExecutionBreakpointStore())
val reconfigurationStore = new StateStore(ExecutionReconfigurationStore())
+ val bigObjectStore =
+ new
StateStore[Seq[org.apache.amber.engine.architecture.rpc.controlcommands.BigObjectEvent]](
+ Seq.empty
+ )
def getAllStores: Iterable[StateStore[_]] = {
- Iterable(statsStore, consoleStore, breakpointStore, metadataStore,
reconfigurationStore)
+ Iterable(
+ statsStore,
+ consoleStore,
+ breakpointStore,
+ metadataStore,
+ reconfigurationStore,
+ bigObjectStore
+ )
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
index f448f3eb59..eecf554110 100644
---
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
+++
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
@@ -27,6 +27,8 @@ object ExecutionContext {
private val executionIdThreadLocal: ThreadLocal[Option[Int]] =
ThreadLocal.withInitial(() => None)
private val operatorIdThreadLocal: ThreadLocal[Option[String]] =
ThreadLocal.withInitial(() => None)
+ private val eventCallbackThreadLocal: ThreadLocal[Option[(String, String,
String) => Unit]] =
+ ThreadLocal.withInitial(() => None)
/**
* Sets the execution ID for the current thread.
@@ -60,6 +62,21 @@ object ExecutionContext {
operatorIdThreadLocal.get()
}
+ /**
+ * Sets a callback for sending big object events.
+ * The callback takes (operatorId, uri, eventType) as parameters.
+ */
+ def setBigObjectEventCallback(callback: (String, String, String) => Unit):
Unit = {
+ eventCallbackThreadLocal.set(Some(callback))
+ }
+
+ /**
+ * Gets the big object event callback for the current thread.
+ */
+ def getBigObjectEventCallback: Option[(String, String, String) => Unit] = {
+ eventCallbackThreadLocal.get()
+ }
+
/**
* Clears the execution context for the current thread.
* Should be called when cleaning up.
@@ -67,5 +84,6 @@ object ExecutionContext {
def clear(): Unit = {
executionIdThreadLocal.remove()
operatorIdThreadLocal.remove()
+ eventCallbackThreadLocal.remove()
}
}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index a7a3d1b2b3..c7b8edef6b 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -116,6 +116,9 @@ object BigObjectManager extends LazyLogging {
throw new RuntimeException(s"Failed to create big object:
${e.getMessage}", e)
}
+ // Send event to frontend
+ sendBigObjectEvent(operatorId, uri, "CREATE")
+
new BigObjectPointer(uri)
}
@@ -132,9 +135,37 @@ object BigObjectManager extends LazyLogging {
)
val inputStream = S3StorageClient.downloadObject(ptr.getBucketName,
ptr.getObjectKey)
+
+ // Send event to frontend
+ ExecutionContext.getOperatorId.foreach { operatorId =>
+ sendBigObjectEvent(operatorId, ptr.getUri, "READ")
+ }
+
new BigObjectStream(inputStream)
}
+ /**
+ * Sends a BigObjectEvent using the registered callback.
+ *
+ * @param operatorId The operator ID
+ * @param uri The big object URI
+ * @param eventType The event type ("CREATE" or "READ")
+ */
+ private def sendBigObjectEvent(
+ operatorId: String,
+ uri: String,
+ eventType: String
+ ): Unit = {
+ ExecutionContext.getBigObjectEventCallback.foreach { callback =>
+ try {
+ callback(operatorId, uri, eventType)
+ } catch {
+ case e: Exception =>
+ logger.warn(s"Failed to send BigObjectEvent: ${e.getMessage}")
+ }
+ }
+ }
+
/**
* Deletes all big objects associated with a specific execution ID.
* This is called during workflow execution cleanup.