This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 4c844a427c11316c06c2bb3ed1a8bcddf3625936
Author: Kunwoo Park <[email protected]>
AuthorDate: Mon Oct 27 10:59:30 2025 -0700

    create() function for python udf
---
 .../src/main/python/core/storage/storage_config.py |  34 ++++-
 .../python/pytexera/storage/big_object_manager.py  | 151 +++++++++++++++++++--
 amber/src/main/python/texera_run_python_worker.py  |  14 ++
 .../pythonworker/PythonWorkflowWorker.scala        |   9 +-
 .../texera/service/util/BigObjectManager.scala     |   9 +-
 5 files changed, 199 insertions(+), 18 deletions(-)

diff --git a/amber/src/main/python/core/storage/storage_config.py 
b/amber/src/main/python/core/storage/storage_config.py
index 21c918d514..33ea9654ec 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -25,6 +25,7 @@ class StorageConfig:
 
     _initialized = False
 
+    # Iceberg configs
     ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None
     ICEBERG_POSTGRES_CATALOG_USERNAME = None
     ICEBERG_POSTGRES_CATALOG_PASSWORD = None
@@ -32,6 +33,17 @@ class StorageConfig:
     ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
     ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
 
+    # S3 configs (for BigObjectManager)
+    S3_ENDPOINT = None
+    S3_REGION = None
+    S3_AUTH_USERNAME = None
+    S3_AUTH_PASSWORD = None
+
+    # JDBC configs (for BigObjectManager database registration)
+    JDBC_URL = None
+    JDBC_USERNAME = None
+    JDBC_PASSWORD = None
+
     @classmethod
     def initialize(
         cls,
@@ -41,18 +53,38 @@ class StorageConfig:
         table_result_namespace,
         directory_path,
         commit_batch_size,
+        s3_endpoint,
+        s3_region,
+        s3_auth_username,
+        s3_auth_password,
+        jdbc_url,
+        jdbc_username,
+        jdbc_password,
     ):
         if cls._initialized:
             raise RuntimeError(
-                "Storage config has already been initialized" "and cannot be 
modified."
+                "Storage config has already been initialized and cannot be 
modified."
             )
 
+        # Iceberg configs
         cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = 
postgres_uri_without_scheme
         cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username
         cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password
         cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
         cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
         cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
+
+        # S3 configs
+        cls.S3_ENDPOINT = s3_endpoint
+        cls.S3_REGION = s3_region
+        cls.S3_AUTH_USERNAME = s3_auth_username
+        cls.S3_AUTH_PASSWORD = s3_auth_password
+
+        # JDBC configs
+        cls.JDBC_URL = jdbc_url
+        cls.JDBC_USERNAME = jdbc_username
+        cls.JDBC_PASSWORD = jdbc_password
+
         cls._initialized = True
 
     def __new__(cls, *args, **kwargs):
diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py 
b/amber/src/main/python/pytexera/storage/big_object_manager.py
index a59129c2c9..c116c1e392 100644
--- a/amber/src/main/python/pytexera/storage/big_object_manager.py
+++ b/amber/src/main/python/pytexera/storage/big_object_manager.py
@@ -17,9 +17,12 @@
 
 """BigObjectManager for reading large objects from S3."""
 
-import os
-from typing import BinaryIO
+import time
+import uuid
+from typing import BinaryIO, Union
+from io import BytesIO
 from core.models.schema.big_object_pointer import BigObjectPointer
+from core.storage.storage_config import StorageConfig
 
 
 class BigObjectStream:
@@ -58,6 +61,8 @@ class BigObjectManager:
     """Manager for reading big objects from S3."""
 
     _s3_client = None
+    _db_connection = None
+    DEFAULT_BUCKET = "texera-big-objects"
 
     @classmethod
     def _get_s3_client(cls):
@@ -69,16 +74,10 @@ class BigObjectManager:
 
                 cls._s3_client = boto3.client(
                     "s3",
-                    endpoint_url=os.environ.get(
-                        "STORAGE_S3_ENDPOINT", "http://localhost:9000";
-                    ),
-                    aws_access_key_id=os.environ.get(
-                        "STORAGE_S3_AUTH_USERNAME", "texera_minio"
-                    ),
-                    aws_secret_access_key=os.environ.get(
-                        "STORAGE_S3_AUTH_PASSWORD", "password"
-                    ),
-                    region_name=os.environ.get("STORAGE_S3_REGION", 
"us-west-2"),
+                    endpoint_url=StorageConfig.S3_ENDPOINT,
+                    aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+                    aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+                    region_name=StorageConfig.S3_REGION,
                     config=Config(
                         signature_version="s3v4", s3={"addressing_style": 
"path"}
                     ),
@@ -90,6 +89,134 @@ class BigObjectManager:
 
         return cls._s3_client
 
+    @classmethod
+    def _get_db_connection(cls):
+        """Get database connection for registering big objects."""
+        if cls._db_connection is None:
+            try:
+                import psycopg2
+                # Parse JDBC URL to extract host, port, database
+                # Format: jdbc:postgresql://host:port/database?params
+                jdbc_url = StorageConfig.JDBC_URL
+                if jdbc_url.startswith("jdbc:postgresql://"):
+                    url_part = jdbc_url.replace("jdbc:postgresql://", "")
+                    # Split by '?' to separate params
+                    url_without_params = url_part.split("?")[0]
+                    # Split by '/' to get host:port and database
+                    parts = url_without_params.split("/")
+                    host_port = parts[0]
+                    database = parts[1] if len(parts) > 1 else "texera_db"
+                    
+                    # Split host and port
+                    if ":" in host_port:
+                        host, port = host_port.split(":")
+                        port = int(port)
+                    else:
+                        host = host_port
+                        port = 5432
+                    
+                    cls._db_connection = psycopg2.connect(
+                        host=host,
+                        port=port,
+                        user=StorageConfig.JDBC_USERNAME,
+                        password=StorageConfig.JDBC_PASSWORD,
+                        database=database
+                    )
+                else:
+                    raise ValueError(f"Invalid JDBC URL format: {jdbc_url}")
+            except ImportError:
+                raise RuntimeError("psycopg2 required. Install with: pip 
install psycopg2-binary")
+            except Exception as e:
+                raise RuntimeError(f"Failed to connect to database: {e}")
+        return cls._db_connection
+
+    @classmethod
+    def _create_bucket_if_not_exist(cls, bucket: str):
+        """Create S3 bucket if it doesn't exist."""
+        s3 = cls._get_s3_client()
+        try:
+            s3.head_bucket(Bucket=bucket)
+        except:
+            # Bucket doesn't exist, create it
+            try:
+                s3.create_bucket(Bucket=bucket)
+                print(f"Created S3 bucket: {bucket}")
+            except Exception as e:
+                raise RuntimeError(f"Failed to create bucket {bucket}: {e}")
+
+    @classmethod
+    def create(cls, execution_id: int, operator_id: str, data: Union[bytes, 
BinaryIO]) -> BigObjectPointer:
+        """
+        Creates a big object from bytes or a stream and uploads to S3.
+        Registers the big object in the database for cleanup tracking.
+
+        Args:
+            execution_id: The execution ID this big object belongs to.
+            operator_id: The operator ID that created this big object.
+            data: Either bytes or a file-like object (stream) containing the 
data.
+
+        Returns:
+            A BigObjectPointer that can be used in tuples.
+
+        Usage:
+            # From bytes
+            pointer = BigObjectManager.create(execution_id, operator_id, 
b"large data...")
+
+            # From file
+            with open("large_file.bin", "rb") as f:
+                pointer = BigObjectManager.create(execution_id, operator_id, f)
+
+            # From BytesIO
+            buffer = BytesIO(b"data...")
+            pointer = BigObjectManager.create(execution_id, operator_id, 
buffer)
+        """
+        cls._create_bucket_if_not_exist(cls.DEFAULT_BUCKET)
+
+        # Generate unique object key
+        timestamp = int(time.time() * 1000)  # milliseconds
+        unique_id = str(uuid.uuid4())
+        object_key = f"{timestamp}/{unique_id}"
+        uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}"
+
+        s3 = cls._get_s3_client()
+
+        try:
+            # Upload to S3
+            if isinstance(data, bytes):
+                # Upload bytes directly
+                s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, 
Body=data)
+            else:
+                # Upload from stream (file-like object)
+                s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key)
+
+            print(f"Uploaded big object to S3: {uri}")
+
+        except Exception as e:
+            raise RuntimeError(f"Failed to upload big object to S3: {e}")
+
+        # Register in database
+        try:
+            conn = cls._get_db_connection()
+            cursor = conn.cursor()
+            cursor.execute(
+                "INSERT INTO big_object (execution_id, operator_id, uri) 
VALUES (%s, %s, %s)",
+                (execution_id, operator_id, uri)
+            )
+            conn.commit()
+            cursor.close()
+            print(f"Registered big object in database: eid={execution_id}, 
opid={operator_id}, uri={uri}")
+
+        except Exception as e:
+            # Database registration failed - clean up S3 object
+            print(f"Failed to register big object in database, cleaning up: 
{uri}")
+            try:
+                s3.delete_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key)
+            except Exception as cleanup_error:
+                print(f"Failed to cleanup orphaned S3 object {uri}: 
{cleanup_error}")
+            raise RuntimeError(f"Failed to create big object: {e}")
+
+        return BigObjectPointer(uri)
+
     @classmethod
     def open(cls, pointer: BigObjectPointer) -> BigObjectStream:
         """
diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index c9594cc218..b3a8f8e341 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -51,6 +51,13 @@ if __name__ == "__main__":
         iceberg_table_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
+        s3_endpoint,
+        s3_region,
+        s3_auth_username,
+        s3_auth_password,
+        jdbc_url,
+        jdbc_username,
+        jdbc_password,
     ) = sys.argv
     init_loguru_logger(logger_level)
     StorageConfig.initialize(
@@ -60,6 +67,13 @@ if __name__ == "__main__":
         iceberg_table_namespace,
         iceberg_file_storage_directory_path,
         iceberg_table_commit_batch_size,
+        s3_endpoint,
+        s3_region,
+        s3_auth_username,
+        s3_auth_password,
+        jdbc_url,
+        jdbc_username,
+        jdbc_password,
     )
 
     # Setting R_HOME environment variable for R-UDF usage
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index fa765aee1f..d9469bf6ba 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -183,7 +183,14 @@ class PythonWorkflowWorker(
         StorageConfig.icebergPostgresCatalogPassword,
         StorageConfig.icebergTableResultNamespace,
         StorageConfig.fileStorageDirectoryPath.toString,
-        StorageConfig.icebergTableCommitBatchSize.toString
+        StorageConfig.icebergTableCommitBatchSize.toString,
+        StorageConfig.s3Endpoint,
+        StorageConfig.s3Region,
+        StorageConfig.s3Username,
+        StorageConfig.s3Password,
+        StorageConfig.jdbcUrl,
+        StorageConfig.jdbcUsername,
+        StorageConfig.jdbcPassword
       )
     ).run(BasicIO.standard(false))
   }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index 86688fd6b7..6fcdb7b0e5 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -79,10 +79,11 @@ object BigObjectManager extends LazyLogging {
     * Registers the big object in the database for cleanup tracking.
     *
     * @param executionId The execution ID this big object belongs to.
+    * @param operatorId The operator ID that created this big object.
     * @param stream The input stream containing the big object data.
     * @return A BigObjectPointer that can be used in tuples.
     */
-  def create(executionId: Int, stream: InputStream): BigObjectPointer = {
+  def create(executionId: Int, operatorId: String, stream: InputStream): 
BigObjectPointer = {
     S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET)
 
     val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}"
@@ -95,10 +96,10 @@ object BigObjectManager extends LazyLogging {
     try {
       context
         .insertInto(BIG_OBJECT)
-        .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.URI)
-        .values(Int.box(executionId), uri)
+        .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.OPERATOR_ID, 
BIG_OBJECT.URI)
+        .values(Int.box(executionId), operatorId, uri)
         .execute()
-      logger.debug(s"Registered big object: eid=$executionId, uri=$uri")
+      logger.debug(s"Registered big object: eid=$executionId, 
opid=$operatorId, uri=$uri")
     } catch {
       case e: Exception =>
         // Database failed - clean up S3 object

Reply via email to