This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 85134ffab125eaa20e5dfa93f903125fec14afe2
Author: Kunwoo Park <[email protected]>
AuthorDate: Mon Oct 27 13:08:37 2025 -0700

    create() API for python + cleanup the parameters
---
 .../control/initialize_executor_handler.py         |  5 ++
 .../python/core/architecture/managers/context.py   | 13 ++++
 .../architecture/managers/execution_context.py     | 62 +++++++++++++++
 .../main/python/core/runnables/data_processor.py   |  5 ++
 .../amber/engine/architecture/rpc/__init__.py      |  1 +
 .../python/pytexera/storage/big_object_manager.py  | 91 +++++++++++-----------
 .../texera/service/util/BigObjectManager.scala     | 25 +++---
 .../source/scan/FileScanSourceOpExec.scala         | 13 +---
 8 files changed, 148 insertions(+), 67 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py
index 0d86dbc212..45e992358b 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/initialize_executor_handler.py
@@ -28,6 +28,11 @@ class InitializeExecutorHandler(ControlHandler):
 
     async def initialize_executor(self, req: InitializeExecutorRequest) -> 
EmptyReturn:
         op_exec_with_code: OpExecWithCode = get_one_of(req.op_exec_init_info)
+
+        # Store execution_id in context (from InitializeExecutorRequest)
+        # This is the real execution ID, not the workflow ID
+        self.context.execution_id = req.execution_id
+
         self.context.executor_manager.initialize_executor(
             op_exec_with_code.code, req.is_source, op_exec_with_code.language
         )
diff --git a/amber/src/main/python/core/architecture/managers/context.py 
b/amber/src/main/python/core/architecture/managers/context.py
index 826006ccb6..fa92a6db26 100644
--- a/amber/src/main/python/core/architecture/managers/context.py
+++ b/amber/src/main/python/core/architecture/managers/context.py
@@ -16,6 +16,7 @@
 # under the License.
 
 from typing import Optional
+import re
 
 from proto.org.apache.amber.core import ActorVirtualIdentity, ChannelIdentity
 from proto.org.apache.amber.engine.architecture.worker import WorkerState
@@ -23,6 +24,7 @@ from .console_message_manager import ConsoleMessageManager
 from .debug_manager import DebugManager
 from .embedded_control_message_manager import EmbeddedControlMessageManager
 from .exception_manager import ExceptionManager
+from .execution_context import ExecutionContext
 from .executor_manager import ExecutorManager
 from .pause_manager import PauseManager
 from .state_manager import StateManager
@@ -46,6 +48,17 @@ class Context:
     def __init__(self, worker_id, input_queue):
         self.worker_id = worker_id
         self.input_queue: InternalQueue = input_queue
+
+        # execution_id: set by InitializeExecutorHandler from 
InitializeExecutorRequest
+        # operator_id: parsed from worker_id (format: 
Worker:WF{wid}-{opid}-{layer}-{idx})
+        self.execution_id: Optional[int] = None
+        self.operator_id: Optional[str] = None
+
+        # Extract operator_id from worker_id (e.g., 
"Worker:WF78-op-scan-main-0" → "op-scan")
+        match = re.match(r"Worker:WF\d+-(.+)-\w+-\d+", worker_id)
+        if match:
+            self.operator_id = match.group(1)
+
         self.executor_manager = ExecutorManager()
         self.current_input_channel_id: Optional[ChannelIdentity] = None
         self.tuple_processing_manager = TupleProcessingManager()
diff --git 
a/amber/src/main/python/core/architecture/managers/execution_context.py 
b/amber/src/main/python/core/architecture/managers/execution_context.py
new file mode 100644
index 0000000000..31e02436a6
--- /dev/null
+++ b/amber/src/main/python/core/architecture/managers/execution_context.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+ExecutionContext provides thread-local storage for execution-related 
information.
+This allows BigObjectManager and other utilities to automatically access
+execution_id and operator_id without requiring explicit parameters.
+"""
+
+import threading
+from typing import Optional
+
+
+class ExecutionContext:
+    """
+    Thread-local storage for execution context information.
+    Similar to Scala's ExecutionContext.
+    """
+
+    _thread_local = threading.local()
+
+    @classmethod
+    def set_execution_id(cls, execution_id: int) -> None:
+        """Set the execution ID for the current thread."""
+        cls._thread_local.execution_id = execution_id
+
+    @classmethod
+    def get_execution_id(cls) -> Optional[int]:
+        """Get the execution ID for the current thread, or None if not set."""
+        return getattr(cls._thread_local, "execution_id", None)
+
+    @classmethod
+    def set_operator_id(cls, operator_id: str) -> None:
+        """Set the operator ID for the current thread."""
+        cls._thread_local.operator_id = operator_id
+
+    @classmethod
+    def get_operator_id(cls) -> Optional[str]:
+        """Get the operator ID for the current thread, or None if not set."""
+        return getattr(cls._thread_local, "operator_id", None)
+
+    @classmethod
+    def clear(cls) -> None:
+        """Clear all execution context information for the current thread."""
+        if hasattr(cls._thread_local, "execution_id"):
+            delattr(cls._thread_local, "execution_id")
+        if hasattr(cls._thread_local, "operator_id"):
+            delattr(cls._thread_local, "operator_id")
diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index ce8f937142..0c9747a406 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -23,6 +23,7 @@ from threading import Event
 from typing import Iterator, Optional
 
 from core.architecture.managers import Context
+from core.architecture.managers.execution_context import ExecutionContext
 from core.models import ExceptionInfo, State, TupleLike, InternalMarker
 from core.models.internal_marker import StartChannel, EndChannel
 from core.models.table import all_output_to_tuple
@@ -51,6 +52,10 @@ class DataProcessor(Runnable, Stoppable):
         self._running.set()
         self._switch_context()
         while self._running.is_set():
+            # Sync ExecutionContext from context (for thread-local storage)
+            ExecutionContext.set_execution_id(self._context.execution_id)
+            ExecutionContext.set_operator_id(self._context.operator_id)
+
             marker = 
self._context.tuple_processing_manager.get_internal_marker()
             state = self._context.state_processing_manager.get_input_state()
             tuple_ = self._context.tuple_processing_manager.current_input_tuple
diff --git 
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
index 0a913e8b88..258fc928f7 100644
--- 
a/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/amber/engine/architecture/rpc/__init__.py
@@ -378,6 +378,7 @@ class InitializeExecutorRequest(betterproto.Message):
     total_worker_count: int = betterproto.int32_field(1)
     op_exec_init_info: "___core__.OpExecInitInfo" = 
betterproto.message_field(2)
     is_source: bool = betterproto.bool_field(3)
+    execution_id: int = betterproto.int64_field(4)
 
 
 @dataclass(eq=False, repr=False)
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py 
b/amber/src/main/python/pytexera/storage/big_object_manager.py
index c116c1e392..2fa2c7398c 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -19,10 +19,11 @@
 
 import time
 import uuid
-from typing import BinaryIO, Union
+from typing import BinaryIO, Union, Optional
 from io import BytesIO
 from core.models.schema.big_object_pointer import BigObjectPointer
 from core.storage.storage_config import StorageConfig
+from core.architecture.managers.execution_context import ExecutionContext
 
 
 class BigObjectStream:
@@ -95,18 +96,31 @@ class BigObjectManager:
         if cls._db_connection is None:
             try:
                 import psycopg2
-                # Parse JDBC URL to extract host, port, database
-                # Format: jdbc:postgresql://host:port/database?params
+
+                # Parse JDBC URL to extract host, port, database, and schema
+                # Format: 
jdbc:postgresql://host:port/database?currentSchema=texera_db,public
                 jdbc_url = StorageConfig.JDBC_URL
                 if jdbc_url.startswith("jdbc:postgresql://"):
                     url_part = jdbc_url.replace("jdbc:postgresql://", "")
                     # Split by '?' to separate params
-                    url_without_params = url_part.split("?")[0]
+                    url_parts = url_part.split("?")
+                    url_without_params = url_parts[0]
+
+                    # Parse schema from params (e.g., 
currentSchema=texera_db,public)
+                    schema = "texera_db"  # default
+                    if len(url_parts) > 1:
+                        params = url_parts[1]
+                        for param in params.split("&"):
+                            if param.startswith("currentSchema="):
+                                schema_value = param.split("=")[1]
+                                # Take first schema from comma-separated list
+                                schema = schema_value.split(",")[0]
+
                     # Split by '/' to get host:port and database
                     parts = url_without_params.split("/")
                     host_port = parts[0]
                     database = parts[1] if len(parts) > 1 else "texera_db"
-                    
+
                     # Split host and port
                     if ":" in host_port:
                         host, port = host_port.split(":")
@@ -114,18 +128,22 @@ class BigObjectManager:
                     else:
                         host = host_port
                         port = 5432
-                    
+
+                    # Connect and set search_path
                     cls._db_connection = psycopg2.connect(
                         host=host,
                         port=port,
                         user=StorageConfig.JDBC_USERNAME,
                         password=StorageConfig.JDBC_PASSWORD,
-                        database=database
+                        database=database,
+                        options=f"-c search_path={schema},public",
                     )
                 else:
                     raise ValueError(f"Invalid JDBC URL format: {jdbc_url}")
             except ImportError:
-                raise RuntimeError("psycopg2 required. Install with: pip 
install psycopg2-binary")
+                raise RuntimeError(
+                    "psycopg2 required. Install with: pip install 
psycopg2-binary"
+                )
             except Exception as e:
                 raise RuntimeError(f"Failed to connect to database: {e}")
         return cls._db_connection
@@ -140,57 +158,43 @@ class BigObjectManager:
             # Bucket doesn't exist, create it
             try:
                 s3.create_bucket(Bucket=bucket)
-                print(f"Created S3 bucket: {bucket}")
             except Exception as e:
                 raise RuntimeError(f"Failed to create bucket {bucket}: {e}")
 
     @classmethod
-    def create(cls, execution_id: int, operator_id: str, data: Union[bytes, 
BinaryIO]) -> BigObjectPointer:
+    def create(cls, data: Union[bytes, BinaryIO]) -> BigObjectPointer:
         """
-        Creates a big object from bytes or a stream and uploads to S3.
-        Registers the big object in the database for cleanup tracking.
+        Creates a big object from bytes or stream, uploads to S3, and 
registers in database.
+        Automatically retrieves execution_id and operator_id from 
ExecutionContext.
 
         Args:
-            execution_id: The execution ID this big object belongs to.
-            operator_id: The operator ID that created this big object.
-            data: Either bytes or a file-like object (stream) containing the 
data.
+            data: Bytes or file-like object containing the data.
 
         Returns:
-            A BigObjectPointer that can be used in tuples.
-
-        Usage:
-            # From bytes
-            pointer = BigObjectManager.create(execution_id, operator_id, 
b"large data...")
+            BigObjectPointer for use in tuples.
+        """
+        # Get IDs from ExecutionContext
+        execution_id = ExecutionContext.get_execution_id()
+        operator_id = ExecutionContext.get_operator_id()
 
-            # From file
-            with open("large_file.bin", "rb") as f:
-                pointer = BigObjectManager.create(execution_id, operator_id, f)
+        if execution_id is None or operator_id is None:
+            raise RuntimeError(
+                "ExecutionContext not initialized. This should not happen in 
normal workflow execution."
+            )
 
-            # From BytesIO
-            buffer = BytesIO(b"data...")
-            pointer = BigObjectManager.create(execution_id, operator_id, 
buffer)
-        """
         cls._create_bucket_if_not_exist(cls.DEFAULT_BUCKET)
 
-        # Generate unique object key
-        timestamp = int(time.time() * 1000)  # milliseconds
-        unique_id = str(uuid.uuid4())
-        object_key = f"{timestamp}/{unique_id}"
+        # Generate unique S3 key and URI
+        object_key = f"{int(time.time() * 1000)}/{uuid.uuid4()}"
         uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}"
-
         s3 = cls._get_s3_client()
 
+        # Upload to S3
         try:
-            # Upload to S3
             if isinstance(data, bytes):
-                # Upload bytes directly
                 s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, 
Body=data)
             else:
-                # Upload from stream (file-like object)
                 s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key)
-
-            print(f"Uploaded big object to S3: {uri}")
-
         except Exception as e:
             raise RuntimeError(f"Failed to upload big object to S3: {e}")
 
@@ -200,19 +204,16 @@ class BigObjectManager:
             cursor = conn.cursor()
             cursor.execute(
                 "INSERT INTO big_object (execution_id, operator_id, uri) 
VALUES (%s, %s, %s)",
-                (execution_id, operator_id, uri)
+                (execution_id, operator_id, uri),
             )
             conn.commit()
             cursor.close()
-            print(f"Registered big object in database: eid={execution_id}, 
opid={operator_id}, uri={uri}")
-
         except Exception as e:
-            # Database registration failed - clean up S3 object
-            print(f"Failed to register big object in database, cleaning up: 
{uri}")
+            # Clean up S3 object if database registration fails
             try:
                 s3.delete_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key)
-            except Exception as cleanup_error:
-                print(f"Failed to cleanup orphaned S3 object {uri}: 
{cleanup_error}")
+            except:
+                pass
             raise RuntimeError(f"Failed to create big object: {e}")
 
         return BigObjectPointer(uri)
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index 6fcdb7b0e5..a7a3d1b2b3 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.service.util
 
 import com.typesafe.scalalogging.LazyLogging
+import org.apache.amber.core.executor.ExecutionContext
 import org.apache.amber.core.tuple.BigObjectPointer
 import org.apache.texera.dao.SqlServer
 import org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT
@@ -74,16 +75,20 @@ object BigObjectManager extends LazyLogging {
   private lazy val context = SqlServer.getInstance().createDSLContext()
 
   /**
-    * Creates a big object from an InputStream using multipart upload.
-    * Handles streams of any size without loading into memory.
-    * Registers the big object in the database for cleanup tracking.
+    * Creates a big object from InputStream, uploads to S3, and registers in 
database.
+    * Automatically retrieves execution ID and operator ID from 
ExecutionContext.
     *
-    * @param executionId The execution ID this big object belongs to.
-    * @param operatorId The operator ID that created this big object.
-    * @param stream The input stream containing the big object data.
-    * @return A BigObjectPointer that can be used in tuples.
+    * @param stream Input stream containing the data.
+    * @return BigObjectPointer for use in tuples.
     */
-  def create(executionId: Int, operatorId: String, stream: InputStream): 
BigObjectPointer = {
+  def create(stream: InputStream): BigObjectPointer = {
+    val executionId = ExecutionContext.getExecutionId.getOrElse(
+      throw new IllegalStateException("Execution ID not set in 
ExecutionContext")
+    )
+    val operatorId = ExecutionContext.getOperatorId.getOrElse(
+      throw new IllegalStateException("Operator ID not set in 
ExecutionContext")
+    )
+
     S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET)
 
     val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}"
@@ -102,13 +107,11 @@ object BigObjectManager extends LazyLogging {
       logger.debug(s"Registered big object: eid=$executionId, 
opid=$operatorId, uri=$uri")
     } catch {
       case e: Exception =>
-        // Database failed - clean up S3 object
         logger.error(s"Failed to register big object, cleaning up: $uri", e)
         try {
           S3StorageClient.deleteObject(DEFAULT_BUCKET, objectKey)
         } catch {
-          case cleanupError: Exception =>
-            logger.error(s"Failed to cleanup orphaned S3 object: $uri", 
cleanupError)
+          case _: Exception => // Best effort cleanup
         }
         throw new RuntimeException(s"Failed to create big object: 
${e.getMessage}", e)
     }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
index 5013289c44..a9635ed668 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -19,7 +19,7 @@
 
 package org.apache.amber.operator.source.scan
 
-import org.apache.amber.core.executor.{ExecutionContext, 
SourceOperatorExecutor}
+import org.apache.amber.core.executor.SourceOperatorExecutor
 import org.apache.amber.core.storage.DocumentFactory
 import org.apache.amber.core.tuple.AttributeTypeUtils.parseField
 import org.apache.amber.core.tuple.TupleLike
@@ -87,16 +87,7 @@ class FileScanSourceOpExec private[scan] (
                 new String(toByteArray(entry), desc.fileEncoding.getCharset)
               case FileAttributeType.BIG_OBJECT =>
                 // For big objects, create a big object pointer from the input 
stream
-                // Get execution ID and operator ID from thread-local context
-                val executionId = ExecutionContext.getExecutionId
-                  .getOrElse(
-                    throw new IllegalStateException("Execution ID not set in 
ExecutionContext")
-                  )
-                val operatorId = ExecutionContext.getOperatorId
-                  .getOrElse(
-                    throw new IllegalStateException("Operator ID not set in 
ExecutionContext")
-                  )
-                BigObjectManager.create(executionId, operatorId, entry)
+                BigObjectManager.create(entry)
               case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
             })
             TupleLike(fields.toSeq: _*)

Reply via email to