This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit baa5504e1bd324def2b8d97a87b55d7c7ac7600c Author: Kunwoo Park <[email protected]> AuthorDate: Mon Oct 27 10:59:30 2025 -0700 create() function for python udf --- .../src/main/python/core/storage/storage_config.py | 34 ++++- .../python/pytexera/storage/big_object_manager.py | 151 +++++++++++++++++++-- amber/src/main/python/texera_run_python_worker.py | 14 ++ .../pythonworker/PythonWorkflowWorker.scala | 9 +- .../texera/service/util/BigObjectManager.scala | 9 +- 5 files changed, 199 insertions(+), 18 deletions(-) diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index 21c918d514..33ea9654ec 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -25,6 +25,7 @@ class StorageConfig: _initialized = False + # Iceberg configs ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None ICEBERG_POSTGRES_CATALOG_USERNAME = None ICEBERG_POSTGRES_CATALOG_PASSWORD = None @@ -32,6 +33,17 @@ class StorageConfig: ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None ICEBERG_TABLE_COMMIT_BATCH_SIZE = None + # S3 configs (for BigObjectManager) + S3_ENDPOINT = None + S3_REGION = None + S3_AUTH_USERNAME = None + S3_AUTH_PASSWORD = None + + # JDBC configs (for BigObjectManager database registration) + JDBC_URL = None + JDBC_USERNAME = None + JDBC_PASSWORD = None + @classmethod def initialize( cls, @@ -41,18 +53,38 @@ class StorageConfig: table_result_namespace, directory_path, commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, + jdbc_url, + jdbc_username, + jdbc_password, ): if cls._initialized: raise RuntimeError( - "Storage config has already been initialized" "and cannot be modified." + "Storage config has already been initialized and cannot be modified." ) + # Iceberg configs cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) + + # S3 configs + cls.S3_ENDPOINT = s3_endpoint + cls.S3_REGION = s3_region + cls.S3_AUTH_USERNAME = s3_auth_username + cls.S3_AUTH_PASSWORD = s3_auth_password + + # JDBC configs + cls.JDBC_URL = jdbc_url + cls.JDBC_USERNAME = jdbc_username + cls.JDBC_PASSWORD = jdbc_password + cls._initialized = True def __new__(cls, *args, **kwargs): diff --git a/amber/src/main/python/pytexera/storage/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py index a59129c2c9..c116c1e392 100644 --- a/amber/src/main/python/pytexera/storage/big_object_manager.py +++ b/amber/src/main/python/pytexera/storage/big_object_manager.py @@ -17,9 +17,12 @@ """BigObjectManager for reading large objects from S3.""" -import os -from typing import BinaryIO +import time +import uuid +from typing import BinaryIO, Union +from io import BytesIO from core.models.schema.big_object_pointer import BigObjectPointer +from core.storage.storage_config import StorageConfig class BigObjectStream: @@ -58,6 +61,8 @@ class BigObjectManager: """Manager for reading big objects from S3.""" _s3_client = None + _db_connection = None + DEFAULT_BUCKET = "texera-big-objects" @classmethod def _get_s3_client(cls): @@ -69,16 +74,10 @@ class BigObjectManager: cls._s3_client = boto3.client( "s3", - endpoint_url=os.environ.get( - "STORAGE_S3_ENDPOINT", "http://localhost:9000" - ), - aws_access_key_id=os.environ.get( - "STORAGE_S3_AUTH_USERNAME", "texera_minio" - ), - aws_secret_access_key=os.environ.get( - "STORAGE_S3_AUTH_PASSWORD", "password" - ), - region_name=os.environ.get("STORAGE_S3_REGION", "us-west-2"), + endpoint_url=StorageConfig.S3_ENDPOINT, + aws_access_key_id=StorageConfig.S3_AUTH_USERNAME, + aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD, + region_name=StorageConfig.S3_REGION, config=Config( signature_version="s3v4", s3={"addressing_style": "path"} ), @@ -90,6 +89,134 @@ class BigObjectManager: return cls._s3_client + @classmethod + def _get_db_connection(cls): + """Get database connection for registering big objects.""" + if cls._db_connection is None: + try: + import psycopg2 + # Parse JDBC URL to extract host, port, database + # Format: jdbc:postgresql://host:port/database?params + jdbc_url = StorageConfig.JDBC_URL + if jdbc_url.startswith("jdbc:postgresql://"): + url_part = jdbc_url.replace("jdbc:postgresql://", "") + # Split by '?' to separate params + url_without_params = url_part.split("?")[0] + # Split by '/' to get host:port and database + parts = url_without_params.split("/") + host_port = parts[0] + database = parts[1] if len(parts) > 1 else "texera_db" + + # Split host and port + if ":" in host_port: + host, port = host_port.split(":") + port = int(port) + else: + host = host_port + port = 5432 + + cls._db_connection = psycopg2.connect( + host=host, + port=port, + user=StorageConfig.JDBC_USERNAME, + password=StorageConfig.JDBC_PASSWORD, + database=database + ) + else: + raise ValueError(f"Invalid JDBC URL format: {jdbc_url}") + except ImportError: + raise RuntimeError("psycopg2 required. Install with: pip install psycopg2-binary") + except Exception as e: + raise RuntimeError(f"Failed to connect to database: {e}") + return cls._db_connection + + @classmethod + def _create_bucket_if_not_exist(cls, bucket: str): + """Create S3 bucket if it doesn't exist.""" + s3 = cls._get_s3_client() + try: + s3.head_bucket(Bucket=bucket) + except: + # Bucket doesn't exist, create it + try: + s3.create_bucket(Bucket=bucket) + print(f"Created S3 bucket: {bucket}") + except Exception as e: + raise RuntimeError(f"Failed to create bucket {bucket}: {e}") + + @classmethod + def create(cls, execution_id: int, operator_id: str, data: Union[bytes, BinaryIO]) -> BigObjectPointer: + """ + Creates a big object from bytes or a stream and uploads to S3. + Registers the big object in the database for cleanup tracking. + + Args: + execution_id: The execution ID this big object belongs to. + operator_id: The operator ID that created this big object. + data: Either bytes or a file-like object (stream) containing the data. + + Returns: + A BigObjectPointer that can be used in tuples. + + Usage: + # From bytes + pointer = BigObjectManager.create(execution_id, operator_id, b"large data...") + + # From file + with open("large_file.bin", "rb") as f: + pointer = BigObjectManager.create(execution_id, operator_id, f) + + # From BytesIO + buffer = BytesIO(b"data...") + pointer = BigObjectManager.create(execution_id, operator_id, buffer) + """ + cls._create_bucket_if_not_exist(cls.DEFAULT_BUCKET) + + # Generate unique object key + timestamp = int(time.time() * 1000) # milliseconds + unique_id = str(uuid.uuid4()) + object_key = f"{timestamp}/{unique_id}" + uri = f"s3://{cls.DEFAULT_BUCKET}/{object_key}" + + s3 = cls._get_s3_client() + + try: + # Upload to S3 + if isinstance(data, bytes): + # Upload bytes directly + s3.put_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key, Body=data) + else: + # Upload from stream (file-like object) + s3.upload_fileobj(data, cls.DEFAULT_BUCKET, object_key) + + print(f"Uploaded big object to S3: {uri}") + + except Exception as e: + raise RuntimeError(f"Failed to upload big object to S3: {e}") + + # Register in database + try: + conn = cls._get_db_connection() + cursor = conn.cursor() + cursor.execute( + "INSERT INTO big_object (execution_id, operator_id, uri) VALUES (%s, %s, %s)", + (execution_id, operator_id, uri) + ) + conn.commit() + cursor.close() + print(f"Registered big object in database: eid={execution_id}, opid={operator_id}, uri={uri}") + + except Exception as e: + # Database registration failed - clean up S3 object + print(f"Failed to register big object in database, cleaning up: {uri}") + try: + s3.delete_object(Bucket=cls.DEFAULT_BUCKET, Key=object_key) + except Exception as cleanup_error: + print(f"Failed to cleanup orphaned S3 object {uri}: {cleanup_error}") + raise RuntimeError(f"Failed to create big object: {e}") + + return BigObjectPointer(uri) + @classmethod def open(cls, pointer: BigObjectPointer) -> BigObjectStream: """ diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index c9594cc218..b3a8f8e341 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -51,6 +51,13 @@ if __name__ == "__main__": iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, + jdbc_url, + jdbc_username, + jdbc_password, ) = sys.argv init_loguru_logger(logger_level) StorageConfig.initialize( @@ -60,6 +67,13 @@ if __name__ == "__main__": iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, + s3_endpoint, + s3_region, + s3_auth_username, + s3_auth_password, + jdbc_url, + jdbc_username, + jdbc_password, ) # Setting R_HOME environment variable for R-UDF usage diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index fa765aee1f..d9469bf6ba 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -183,7 +183,14 @@ class PythonWorkflowWorker( StorageConfig.icebergPostgresCatalogPassword, StorageConfig.icebergTableResultNamespace, StorageConfig.fileStorageDirectoryPath.toString, - StorageConfig.icebergTableCommitBatchSize.toString + StorageConfig.icebergTableCommitBatchSize.toString, + StorageConfig.s3Endpoint, + StorageConfig.s3Region, + StorageConfig.s3Username, + StorageConfig.s3Password, + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword ) ).run(BasicIO.standard(false)) } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala index 86688fd6b7..6fcdb7b0e5 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -79,10 +79,11 @@ object BigObjectManager extends LazyLogging { * Registers the big object in the database for cleanup tracking. * * @param executionId The execution ID this big object belongs to. + * @param operatorId The operator ID that created this big object. * @param stream The input stream containing the big object data. * @return A BigObjectPointer that can be used in tuples. */ - def create(executionId: Int, stream: InputStream): BigObjectPointer = { + def create(executionId: Int, operatorId: String, stream: InputStream): BigObjectPointer = { S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}" @@ -95,10 +96,10 @@ object BigObjectManager extends LazyLogging { try { context .insertInto(BIG_OBJECT) - .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.URI) - .values(Int.box(executionId), uri) + .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.OPERATOR_ID, BIG_OBJECT.URI) + .values(Int.box(executionId), operatorId, uri) .execute() - logger.debug(s"Registered big object: eid=$executionId, uri=$uri") + logger.debug(s"Registered big object: eid=$executionId, opid=$operatorId, uri=$uri") } catch { case e: Exception => // Database failed - clean up S3 object
