This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit cea73acbb607f0d6fc6712372b009111e2f24d0f Author: Kunwoo Park <[email protected]> AuthorDate: Mon Oct 27 13:08:37 2025 -0700 create() API for python + cleanup the parameters --- .../control/initialize_executor_handler.py | 5 ++ .../python/core/architecture/managers/context.py | 13 ++++ .../architecture/managers/execution_context.py | 62 +++++++++++++++ .../main/python/core/runnables/data_processor.py | 5 ++ .../amber/engine/architecture/rpc/__init__.py | 1 + .../python/pytexera/storage/big_object_manager.py | 91 +++++++++++----------- .../texera/service/util/BigObjectManager.scala | 25 +++--- .../source/scan/FileScanSourceOpExec.scala | 13 +--- 8 files changed, 148 insertions(+), 67 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py index 0d86dbc212..45e992358b 100644 --- a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py @@ -28,6 +28,11 @@ class InitializeExecutorHandler(ControlHandler): async def initialize_executor(self, req: InitializeExecutorRequest) -> EmptyReturn: op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info) + + # Store execution_id in context (from InitializeExecutorRequest) + # This is the real execution ID, not the workflow ID + self.context.execution_id = req.execution_id + self.context.executor_manager.initialize_executor( op_exec_with_code.code, req.is_source, op_exec_with_code.language ) diff --git a/amber/src/main/python/core/architecture/managers/context.py b/amber/src/main/python/core/architecture/managers/context.py index 826006ccb6..fa92a6db26 100644 --- a/amber/src/main/python/core/architecture/managers/context.py +++ b/amber/src/main/python/core/architecture/managers/context.py @@ -16,6 +16,7 @@ # under the License. from typing import Optional +import re from proto.org.apache.amber.core import ActorVirtualIdentity, ChannelIdentity from proto.org.apache.amber.engine.architecture.worker import WorkerState @@ -23,6 +24,7 @@ from .console_message_manager import ConsoleMessageManager from .debug_manager import DebugManager from .embedded_control_message_manager import EmbeddedControlMessageManager from .exception_manager import ExceptionManager +from .execution_context import ExecutionContext from .executor_manager import ExecutorManager from .pause_manager import PauseManager from .state_manager import StateManager @@ -46,6 +48,17 @@ class Context: def __init__(self, worker_id, input_queue): self.worker_id = worker_id self.input_queue: InternalQueue = input_queue + + # execution_id: set by InitializeExecutorHandler from InitializeExecutorRequest + # operator_id: parsed from worker_id (format: Worker:WF{wid}-{opid}-{layer}-{idx}) + self.execution_id: Optional[int] = None + self.operator_id: Optional[str] = None + + # Extract operator_id from worker_id (e.g., "Worker:WF78-op-scan-main-0" → "op-scan") + match = re.match(r"Worker:WF\d+-(.+)-\w+-\d+", worker_id) + if match: + self.operator_id = match.group(1) + self.executor_manager = ExecutorManager() self.current_input_channel_id: Optional[ChannelIdentity] = None self.tuple_processing_manager = TupleProcessingManager() diff --git a/amber/src/main/python/core/architecture/managers/execution_context.py b/amber/src/main/python/core/architecture/managers/execution_context.py new file mode 100644 index 0000000000..31e02436a6 --- /dev/null +++ b/amber/src/main/python/core/architecture/managers/execution_context.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +ExecutionContext provides thread-local storage for execution-related information. +This allows BigObjectManager and other utilities to automatically access +execution_id and operator_id without requiring explicit parameters. +""" + +import threading +from typing import Optional + + +class ExecutionContext: + """ + Thread-local storage for execution context information. + Similar to Scala's ExecutionContext. + """ + + _thread_local = threading.local() + + @classmethod + def set_execution_id(cls, execution_id: int) -> None: + """Set the execution ID for the current thread.""" + cls._thread_local.execution_id = execution_id + + @classmethod + def get_execution_id(cls) -> Optional[int]: + """Get the execution ID for the current thread, or None if not set.""" + return getattr(cls._thread_local, "execution_id", None) + + @classmethod + def set_operator_id(cls, operator_id: str) -> None: + """Set the operator ID for the current thread.""" + cls._thread_local.operator_id = operator_id + + @classmethod + def get_operator_id(cls) -> Optional[str]: + """Get the operator ID for the current thread, or None if not set.""" + return getattr(cls._thread_local, "operator_id", None) + + @classmethod + def clear(cls) -> None: + """Clear all execution context information for the current thread.""" + if hasattr(cls._thread_local, "execution_id"): + delattr(cls._thread_local, "execution_id") + if hasattr(cls._thread_local, "operator_id"): + delattr(cls._thread_local, "operator_id") diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index ce8f937142..0c9747a406 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -23,6 +23,7 @@ from threading import Event from typing import Iterator, Optional from core.architecture.managers import Context +from core.architecture.managers.execution_context import ExecutionContext from core.models import ExceptionInfo, State, TupleLike, InternalMarker from core.models.internal_marker import StartChannel, EndChannel from core.models.table import all_output_to_tuple @@ -51,6 +52,10 @@ class DataProcessor(Runnable, Stoppable): self._running.set() self._switch_context() while self._running.is_set(): + # Sync ExecutionContext from context (for thread-local storage) + ExecutionContext.set_execution_id(self._context.execution_id) + ExecutionContext.set_operator_id(self._context.operator_id) + marker = self._context.tuple_processing_manager.get_internal_marker() state = self._context.state_processing_manager.get_input_state() tuple_ = self._context.tuple_processing_manager.current_input_tuple diff --git a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py index 0a913e8b88..258fc928f7 100644 --- a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py @@ -378,6 +378,7 @@ class InitializeExecutorRequest(betterproto.Message): total_worker_count: int = betterproto.int32_field(1) op_exec_init_info: "___core__.OpExecInitInfo" = betterproto.message_field(2) is_source: bool = betterproto.bool_field(3) + execution_id: int = betterproto.int64_field(4) @dataclass(eq=False, repr=False) diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py index c116c1e392..2fa2c7398c 100644 --- a/amber/src/main/python/pytexera/storage/big_object_manager.py +++ b/amber/src/main/python/pytexera/storage/big_object_manager.py @@ -19,10 +19,11 @@ import time import uuid -from typing import BinaryIO, Union +from typing import BinaryIO, Union, Optional from io import BytesIO from core.models.schema.big_object_pointer import BigObjectPointer from core.storage.storage_config import StorageConfig +from core.architecture.managers.execution_context import ExecutionContext class BigObjectStream: @@ -95,18 +96,31 @@ class BigObjectManager: if cls._db_connection is None: try: import psycopg2 - # Parse JDBC URL to extract host, port, database - # Format: jdbc:postgresql://host:port/database?params + + # Parse JDBC URL to extract host, port, database, and schema + # Format: jdbc:postgresql://host:port/database?currentSchema=texera_db,public jdbc_url = StorageConfig.JDBC_URL if jdbc_url.startswith("jdbc:postgresql://"): url_part = jdbc_url.replace("jdbc:postgresql://", "") # Split by '?' to separate params - url_without_params = url_part.split("?")[0] + url_parts = url_part.split("?") + url_without_params = url_parts[0] + + # Parse schema from params (e.g., currentSchema=texera_db,public) + schema = "texera_db" # default + if len(url_parts) > 1: + params = url_parts[1] + for param in params.split("&"): + if param.startswith("currentSchema="): + schema_value = param.split("=")[1] + # Take first schema from comma-separated list + schema = schema_value.split(",")[0] + # Split by '/' to get host:port and database parts = url_without_params.split("/") host_port = parts[0] database = parts[1] if len(parts) > 1 else "texera_db" - + # Split host and port if ":" in host_port: host, port = host_port.split(":") @@ -114,18 +128,22 @@ class BigObjectManager: else: host = host_port port = 5432 - + + # Connect and set search_path cls._db_connection = psycopg2.connect( host=host, port=port, user=StorageConfig.JDBC_USERNAME, password=StorageConfig.JDBC_PASSWORD, - database=database + database=database, + options=f"-c search_path={schema},public", ) else: raise ValueError(f"Invalid JDBC URL format: {jdbc_url}") except ImportError: - raise RuntimeError("psycopg2 required. Install with: pip install psycopg2-binary") + raise RuntimeError( + "psycopg2 required. Install with: pip install psycopg2-binary" + ) except Exception as e: raise RuntimeError(f"Failed to connect to database: {e}") return cls._db_connection @@ -140,57 +158,43 @@ class BigObjectManager: # Bucket doesn't exist, create it try: s3.create_bucket(Bucket=bucket) - print(f"Created S3 bucket: {bucket}") except Exception as e: raise RuntimeError(f"Failed to create bucket {bucket}: {e}") @classmethod - def create(cls, execution_id: int, operator_id: str, data: Union[bytes, BinaryIO]) -> BigObjectPointer: + def create(cls, data: Union[bytes, BinaryIO]) -> BigObjectPointer: """ - Creates a big object from bytes or a stream and uploads to S3. - Registers the big object in the database for cleanup tracking. + Creates a big object from bytes or stream, uploads to S3, and registers in database. + Automatically retrieves execution_id and operator_id from ExecutionContext. Args: - execution_id: The execution ID this big object belongs to. - operator_id: The operator ID that created this big object. - data: Either bytes or a file-like object (stream) containing the data. + data: Bytes or file-like object containing the data. Returns: - A BigObjectPointer that can be used in tuples. - - Usage: - # From bytes - pointer = BigObjectManager.create(execution_id, operator_id, b"large data...") + BigObjectPointer for use in tuples. + """ + # Get IDs from ExecutionContext + execution_id = ExecutionContext.get_execution_id() + operator_id = ExecutionContext.get_operator_id() - # From file - with open("large_file.bin", "rb") as f: - pointer = BigObjectManager.create(execution_id, operator_id, f) + if execution_id is None or operator_id is None: + raise RuntimeError( + "ExecutionContext not initialized. This should not happen in normal workflow execution." + ) - # From BytesIO - buffer = BytesIO(b"data...") - pointer = BigObjectManager.create(execution_id, operator_id, buffer) - """ cls._create_bucket_if_not_exist(cls.DEFAULT_BUCKET) - # Generate unique object key - timestamp = int(time.time() * 1000) # milliseconds - unique_id = str(uuid.uuid4()) - object_key = f"{timestamp}/{unique_id}" + # Generate unique S3 key and URI + object_key = f"{int(time.time() * 1000)}/{uuid.uuid4()}" uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}" - s3 = cls._get_s3_client() + # Upload to S3 try: - # Upload to S3 if isinstance(data, bytes): - # Upload bytes directly s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=data) else: - # Upload from stream (file-like object) s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key) - - print(f"Uploaded big object to S3: {uri}") - except Exception as e: raise RuntimeError(f"Failed to upload big object to S3: {e}") @@ -200,19 +204,16 @@ class BigObjectManager: cursor = conn.cursor() cursor.execute( "INSERT INTO big_object (execution_id, operator_id, uri) VALUES (%s, %s, %s)", - (execution_id, operator_id, uri) + (execution_id, operator_id, uri), ) conn.commit() cursor.close() - print(f"Registered big object in database: eid={execution_id}, opid={operator_id}, uri={uri}") - except Exception as e: - # Database registration failed - clean up S3 object - print(f"Failed to register big object in database, cleaning up: {uri}") + # Clean up S3 object if database registration fails try: s3.delete_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key) - except Exception as cleanup_error: - print(f"Failed to cleanup orphaned S3 object {uri}: {cleanup_error}") + except: + pass raise RuntimeError(f"Failed to create big object: {e}") return BigObjectPointer(uri) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala index 6fcdb7b0e5..a7a3d1b2b3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -20,6 +20,7 @@ package org.apache.texera.service.util import com.typesafe.scalalogging.LazyLogging +import org.apache.amber.core.executor.ExecutionContext import org.apache.amber.core.tuple.BigObjectPointer import org.apache.texera.dao.SqlServer import org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT @@ -74,16 +75,20 @@ object BigObjectManager extends LazyLogging { private lazy val context = SqlServer.getInstance().createDSLContext() /** - * Creates a big object from an InputStream using multipart upload. - * Handles streams of any size without loading into memory. - * Registers the big object in the database for cleanup tracking. + * Creates a big object from InputStream, uploads to S3, and registers in database. + * Automatically retrieves execution ID and operator ID from ExecutionContext. * - * @param executionId The execution ID this big object belongs to. - * @param operatorId The operator ID that created this big object. - * @param stream The input stream containing the big object data. - * @return A BigObjectPointer that can be used in tuples. + * @param stream Input stream containing the data. + * @return BigObjectPointer for use in tuples. */ - def create(executionId: Int, operatorId: String, stream: InputStream): BigObjectPointer = { + def create(stream: InputStream): BigObjectPointer = { + val executionId = ExecutionContext.getExecutionId.getOrElse( + throw new IllegalStateException("Execution ID not set in ExecutionContext") + ) + val operatorId = ExecutionContext.getOperatorId.getOrElse( + throw new IllegalStateException("Operator ID not set in ExecutionContext") + ) + S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}" @@ -102,13 +107,11 @@ object BigObjectManager extends LazyLogging { logger.debug(s"Registered big object: eid=$executionId, opid=$operatorId, uri=$uri") } catch { case e: Exception => - // Database failed - clean up S3 object logger.error(s"Failed to register big object, cleaning up: $uri", e) try { S3StorageClient.deleteObject(DEFAULT_BUCKET, objectKey) } catch { - case cleanupError: Exception => - logger.error(s"Failed to cleanup orphaned S3 object: $uri", cleanupError) + case _: Exception => // Best effort cleanup } throw new RuntimeException(s"Failed to create big object: ${e.getMessage}", e) } diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala index 5013289c44..a9635ed668 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -19,7 +19,7 @@ package org.apache.amber.operator.source.scan -import org.apache.amber.core.executor.{ExecutionContext, SourceOperatorExecutor} +import org.apache.amber.core.executor.SourceOperatorExecutor import org.apache.amber.core.storage.DocumentFactory import org.apache.amber.core.tuple.AttributeTypeUtils.parseField import org.apache.amber.core.tuple.TupleLike @@ -87,16 +87,7 @@ class FileScanSourceOpExec private[scan] ( new String(toByteArray(entry), desc.fileEncoding.getCharset) case FileAttributeType.BIG_OBJECT => // For big objects, create a big object pointer from the input stream - // Get execution ID and operator ID from thread-local context - val executionId = ExecutionContext.getExecutionId - .getOrElse( - throw new IllegalStateException("Execution ID not set in ExecutionContext") - ) - val operatorId = ExecutionContext.getOperatorId - .getOrElse( - throw new IllegalStateException("Operator ID not set in ExecutionContext") - ) - BigObjectManager.create(executionId, operatorId, entry) + BigObjectManager.create(entry) case _ => parseField(toByteArray(entry), desc.attributeType.getType) }) TupleLike(fields.toSeq: _*)
