This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit c2c59409e3a332a6eb9c5c9112b92601ec953e92 Author: Kunwoo Park <[email protected]> AuthorDate: Wed Oct 15 22:01:16 2025 -0700 Big Object Read for Python UDF Operators --- amber/operator-requirements.txt | 1 + .../core/architecture/packaging/output_manager.py | 12 +- .../python/core/models/schema/attribute_type.py | 6 + amber/src/main/python/core/models/schema/schema.py | 36 +++-- amber/src/main/python/core/models/tuple.py | 18 ++- .../main/python/core/storage/big_object_pointer.py | 65 +++++++++ .../main/python/core/storage/document_factory.py | 4 +- .../python/core/storage/iceberg/iceberg_utils.py | 153 +++++++++++++++++++-- amber/src/main/python/pytexera/__init__.py | 2 + .../src/main/python/pytexera/big_object_manager.py | 114 +++++++++++++++ common/workflow-core/build.sbt | 4 +- .../amber/core/tuple/AttributeTypeUtils.scala | 2 +- .../scala/org/apache/amber/util/ArrowUtils.scala | 62 ++++++--- .../texera/service/util/BigObjectManager.scala | 31 +---- 14 files changed, 441 insertions(+), 69 deletions(-) diff --git a/amber/operator-requirements.txt b/amber/operator-requirements.txt index fd33ef7470..b9014ba78a 100644 --- a/amber/operator-requirements.txt +++ b/amber/operator-requirements.txt @@ -22,3 +22,4 @@ pillow==10.2.0 pybase64==1.3.2 torch==2.8.0 scikit-learn==1.5.0 +boto3==1.40.53 diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index 76be4116b4..eb027920ee 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -259,10 +259,20 @@ class OutputManager: ) def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame: + from core.storage.big_object_pointer import BigObjectPointer + return DataFrame( frame=Table.from_pydict( { - name: [t[name] for t in tuples] + name: [ + ( + # Convert BigObjectPointer objects to URI strings for Arrow serialization + t[name].uri + if isinstance(t[name], BigObjectPointer) + else t[name] + ) + for t in tuples + ] for name in self.get_port().get_schema().get_attr_names() }, schema=self.get_port().get_schema().as_arrow_schema(), diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 72799b015c..63c952d1f8 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -20,6 +20,7 @@ import pyarrow as pa from bidict import bidict from enum import Enum from pyarrow import lib +from core.storage.big_object_pointer import BigObjectPointer class AttributeType(Enum): @@ -37,6 +38,7 @@ class AttributeType(Enum): DOUBLE = 5 TIMESTAMP = 6 BINARY = 7 + BIG_OBJECT = 8 RAW_TYPE_MAPPING = bidict( @@ -48,6 +50,7 @@ RAW_TYPE_MAPPING = bidict( "BOOLEAN": AttributeType.BOOL, "TIMESTAMP": AttributeType.TIMESTAMP, "BINARY": AttributeType.BINARY, + "BIG_OBJECT": AttributeType.BIG_OBJECT, } ) @@ -59,6 +62,7 @@ TO_ARROW_MAPPING = { AttributeType.BOOL: pa.bool_(), AttributeType.BINARY: pa.binary(), AttributeType.TIMESTAMP: pa.timestamp("us"), + AttributeType.BIG_OBJECT: pa.string(), # Serialized as URI string } FROM_ARROW_MAPPING = { @@ -83,6 +87,7 @@ TO_PYOBJECT_MAPPING = { AttributeType.BOOL: bool, AttributeType.BINARY: bytes, AttributeType.TIMESTAMP: datetime.datetime, + AttributeType.BIG_OBJECT: BigObjectPointer, } FROM_PYOBJECT_MAPPING = { @@ -92,4 +97,5 @@ FROM_PYOBJECT_MAPPING = { bool: AttributeType.BOOL, bytes: AttributeType.BINARY, datetime.datetime: AttributeType.TIMESTAMP, + BigObjectPointer: AttributeType.BIG_OBJECT, } diff --git a/amber/src/main/python/core/models/schema/schema.py b/amber/src/main/python/core/models/schema/schema.py index 132ca23884..bcb323f523 100644 --- a/amber/src/main/python/core/models/schema/schema.py +++ b/amber/src/main/python/core/models/schema/schema.py @@ -81,26 +81,46 @@ class Schema: def _from_arrow_schema(self, arrow_schema: pa.Schema) -> None: """ Resets the Schema by converting a pyarrow.Schema. + Checks field metadata to detect BIG_OBJECT types. :param arrow_schema: a pyarrow.Schema. :return: """ self._name_type_mapping = OrderedDict() for attr_name in arrow_schema.names: - arrow_type = arrow_schema.field(attr_name).type # type: ignore - attr_type = FROM_ARROW_MAPPING[arrow_type.id] + field = arrow_schema.field(attr_name) + + # Check metadata for BIG_OBJECT type (stored by Scala ArrowUtils) + is_big_object = ( + field.metadata and field.metadata.get(b"texera_type") == b"BIG_OBJECT" + ) + + attr_type = ( + AttributeType.BIG_OBJECT + if is_big_object + else FROM_ARROW_MAPPING[field.type.id] + ) + self.add(attr_name, attr_type) def as_arrow_schema(self) -> pa.Schema: """ Creates a new pyarrow.Schema according to the current Schema. + Includes metadata for BIG_OBJECT types to preserve type information. :return: pyarrow.Schema """ - return pa.schema( - [ - pa.field(attr_name, TO_ARROW_MAPPING[attr_type]) - for attr_name, attr_type in self._name_type_mapping.items() - ] - ) + fields = [ + pa.field( + attr_name, + TO_ARROW_MAPPING[attr_type], + metadata=( + {b"texera_type": b"BIG_OBJECT"} + if attr_type == AttributeType.BIG_OBJECT + else None + ), + ) + for attr_name, attr_type in self._name_type_mapping.items() + ] + return pa.schema(fields) def get_attr_names(self) -> List[str]: """ diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index e88f08286d..f88d75b4fe 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -29,6 +29,7 @@ from pympler import asizeof from typing import Any, List, Iterator, Callable from typing_extensions import Protocol, runtime_checkable +from core.storage.big_object_pointer import BigObjectPointer from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType from .schema.field import Field from .schema.schema import Schema @@ -86,6 +87,7 @@ class ArrowTableTupleProvider: """ value = self._table.column(field_name).chunks[chunk_idx][tuple_idx].as_py() field_type = self._table.schema.field(field_name).type + field_metadata = self._table.schema.field(field_name).metadata # for binary types, convert pickled objects back. if ( @@ -94,6 +96,16 @@ class ArrowTableTupleProvider: and value[:6] == b"pickle" ): value = pickle.loads(value[10:]) + + # Convert URI string to BigObjectPointer for BIG_OBJECT types + # Metadata is set by Scala ArrowUtils or Python iceberg_utils + elif ( + value is not None + and field_metadata + and field_metadata.get(b"texera_type") == b"BIG_OBJECT" + ): + value = BigObjectPointer(value) + return value self._current_idx += 1 @@ -322,8 +334,10 @@ class Tuple: for field_name, field_value in self.as_key_value_pairs(): expected = schema.get_attr_type(field_name) - if not isinstance( - field_value, (TO_PYOBJECT_MAPPING.get(expected), type(None)) + expected_type = TO_PYOBJECT_MAPPING.get(expected) + + if expected_type is not None and not isinstance( + field_value, (expected_type, type(None)) ): raise TypeError( f"Unmatched type for field '{field_name}', expected {expected}, " diff --git a/amber/src/main/python/core/storage/big_object_pointer.py b/amber/src/main/python/core/storage/big_object_pointer.py new file mode 100644 index 0000000000..4aac9d3a63 --- /dev/null +++ b/amber/src/main/python/core/storage/big_object_pointer.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +BigObjectPointer represents a reference to a large object stored externally (e.g., S3). +This is a storage reference class used throughout the system for handling big objects. +""" + + +class BigObjectPointer: + """ + Represents a pointer to a large object stored in S3. + The pointer is formatted as a URI: s3://bucket/path/to/object + + This is a lightweight value object that only stores the URI reference. + To actually read the object's content, use BigObjectManager.open(pointer). + """ + + def __init__(self, uri: str): + """ + Create a new BigObjectPointer. + + Args: + uri: S3 URI in the format s3://bucket/path/to/object + + Raises: + ValueError: If the URI is invalid or doesn't start with s3:// + """ + if not uri or not uri.startswith("s3://"): + raise ValueError( + f"BigObjectPointer URI must start with 's3://' but was: {uri}" + ) + self.uri = uri + + def __str__(self) -> str: + """Return the URI as string representation.""" + return self.uri + + def __repr__(self) -> str: + """Return developer-friendly representation.""" + return f"BigObjectPointer('{self.uri}')" + + def __eq__(self, other) -> bool: + """Check equality based on URI.""" + if isinstance(other, BigObjectPointer): + return self.uri == other.uri + return False + + def __hash__(self) -> int: + """Make BigObjectPointer hashable for use in sets/dicts.""" + return hash(self.uri) diff --git a/amber/src/main/python/core/storage/document_factory.py b/amber/src/main/python/core/storage/document_factory.py index ba15069817..5e680b30fc 100644 --- a/amber/src/main/python/core/storage/document_factory.py +++ b/amber/src/main/python/core/storage/document_factory.py @@ -25,6 +25,7 @@ from core.storage.iceberg.iceberg_catalog_instance import IcebergCatalogInstance from core.storage.iceberg.iceberg_document import IcebergDocument from core.storage.iceberg.iceberg_utils import ( create_table, + amber_schema_to_iceberg_schema, amber_tuples_to_arrow_table, arrow_table_to_amber_tuples, load_table_metadata, @@ -63,7 +64,8 @@ class DocumentFactory: if resource_type in {VFSResourceType.RESULT}: storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) - iceberg_schema = Schema.as_arrow_schema(schema) + # Convert Amber Schema to Iceberg Schema with BIG_OBJECT field name encoding + iceberg_schema = amber_schema_to_iceberg_schema(schema) create_table( IcebergCatalogInstance.get_instance(), diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 12c841ed71..3c23df73b0 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -29,6 +29,112 @@ from typing import Optional, Iterable import core from core.models import ArrowTableTupleProvider, Tuple +# Suffix used to encode BIG_OBJECT fields in Iceberg (must match Scala IcebergUtil) +BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr" + +# Shared type mappings +_ICEBERG_TO_AMBER_TYPE_MAPPING = { + "string": "STRING", + "int": "INT", + "integer": "INT", + "long": "LONG", + "double": "DOUBLE", + "float": "DOUBLE", + "boolean": "BOOL", + "timestamp": "TIMESTAMP", + "binary": "BINARY", +} + + +def encode_big_object_field_name(field_name: str, attr_type) -> str: + """Encodes BIG_OBJECT field names with suffix for Iceberg storage.""" + from core.models.schema.attribute_type import AttributeType + + return ( + f"{field_name}{BIG_OBJECT_FIELD_SUFFIX}" + if attr_type == AttributeType.BIG_OBJECT + else field_name + ) + + +def decode_big_object_field_name(field_name: str) -> str: + """Decodes field names by removing BIG_OBJECT suffix if present.""" + return ( + field_name[: -len(BIG_OBJECT_FIELD_SUFFIX)] + if field_name.endswith(BIG_OBJECT_FIELD_SUFFIX) + else field_name + ) + + +def iceberg_schema_to_amber_schema(iceberg_schema: Schema): + """ + Converts PyIceberg Schema to Amber Schema. + Decodes BIG_OBJECT field names and adds Arrow metadata. + """ + from core.models.schema.attribute_type import AttributeType, TO_ARROW_MAPPING + import core.models + + arrow_fields = [] + for field in iceberg_schema.fields: + # Decode field name and detect BIG_OBJECT by suffix + decoded_name = decode_big_object_field_name(field.name) + is_big_object = field.name.endswith(BIG_OBJECT_FIELD_SUFFIX) + + # Determine attribute type + if is_big_object: + attr_type = AttributeType.BIG_OBJECT + else: + iceberg_type_str = str(field.field_type).lower() + attr_type_name = _ICEBERG_TO_AMBER_TYPE_MAPPING.get( + iceberg_type_str, "STRING" + ) + attr_type = getattr(AttributeType, attr_type_name) + + # Create Arrow field with metadata + arrow_field = pa.field( + decoded_name, + TO_ARROW_MAPPING[attr_type], + metadata={b"texera_type": b"BIG_OBJECT"} if is_big_object else None, + ) + arrow_fields.append(arrow_field) + + return core.models.Schema(pa.schema(arrow_fields)) + + +def amber_schema_to_iceberg_schema(amber_schema) -> Schema: + """ + Converts Amber Schema to PyIceberg Schema. + Encodes BIG_OBJECT field names with suffix. + """ + from pyiceberg import types as iceberg_types + from core.models.schema.attribute_type import AttributeType + + # Mapping from Amber AttributeType to Iceberg Type + TYPE_MAPPING = { + AttributeType.STRING: iceberg_types.StringType(), + AttributeType.INT: iceberg_types.IntegerType(), + AttributeType.LONG: iceberg_types.LongType(), + AttributeType.DOUBLE: iceberg_types.DoubleType(), + AttributeType.BOOL: iceberg_types.BooleanType(), + AttributeType.TIMESTAMP: iceberg_types.TimestampType(), + AttributeType.BINARY: iceberg_types.BinaryType(), + AttributeType.BIG_OBJECT: iceberg_types.StringType(), + } + + fields = [ + iceberg_types.NestedField( + field_id=idx, + name=encode_big_object_field_name(field_name, attr_type), + field_type=TYPE_MAPPING[attr_type], + required=False, + ) + for idx, (field_name, attr_type) in enumerate( + amber_schema._name_type_mapping.items(), start=1 + ) + ] + + return Schema(*fields) + def create_postgres_catalog( catalog_name: str, @@ -135,12 +241,29 @@ def amber_tuples_to_arrow_table( ) -> pa.Table: """ Converts a list of amber tuples to a pyarrow table for serialization. + Handles BIG_OBJECT field name encoding and serialization. """ + from core.storage.big_object_pointer import BigObjectPointer + + # Build data dict using Iceberg schema field names (encoded) + data_dict = {} + for encoded_name in iceberg_schema.as_arrow().names: + # Decode to get the original field name used in tuples + decoded_name = decode_big_object_field_name(encoded_name) + + # Extract values from tuples and serialize BigObjectPointer + values = [] + for t in tuple_list: + value = t[decoded_name] + # Serialize BigObjectPointer to URI string for Iceberg storage + if isinstance(value, BigObjectPointer): + value = value.uri + values.append(value) + + data_dict[encoded_name] = values + return pa.Table.from_pydict( - { - name: [t[name] for t in tuple_list] - for name in iceberg_schema.as_arrow().names - }, + data_dict, schema=iceberg_schema.as_arrow(), ) @@ -149,13 +272,27 @@ def arrow_table_to_amber_tuples( iceberg_schema: Schema, arrow_table: pa.Table ) -> Iterable[Tuple]: """ - Converts an arrow table to a list of amber tuples for deserialization. + Converts an arrow table read from Iceberg to Amber tuples. + Properly handles BIG_OBJECT field name decoding and type detection. """ - tuple_provider = ArrowTableTupleProvider(arrow_table) + # Convert Iceberg schema to Amber schema with proper metadata + amber_schema = iceberg_schema_to_amber_schema(iceberg_schema) + + # Create an Arrow table with proper metadata by rebuilding it with the correct schema + # This ensures BIG_OBJECT metadata is available during tuple field access + arrow_table_with_metadata = pa.Table.from_arrays( + [arrow_table.column(col_name) for col_name in arrow_table.column_names], + schema=amber_schema.as_arrow_schema(), + ) + + tuple_provider = ArrowTableTupleProvider(arrow_table_with_metadata) return ( Tuple( - {name: field_accessor for name in arrow_table.column_names}, - schema=core.models.Schema(iceberg_schema.as_arrow()), + { + decode_big_object_field_name(name): field_accessor + for name in arrow_table.column_names + }, + schema=amber_schema, ) for field_accessor in tuple_provider ) diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index db78319f49..2b3d59be10 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -27,6 +27,7 @@ from .udf.udf_operator import ( UDFBatchOperator, UDFSourceOperator, ) +from .big_object_manager import BigObjectManager __all__ = [ "State", @@ -41,6 +42,7 @@ __all__ = [ "UDFBatchOperator", "UDFSourceOperator", "DatasetFileDocument", + "BigObjectManager", # export external tools to be used "overrides", "logger", diff --git a/amber/src/main/python/pytexera/big_object_manager.py b/amber/src/main/python/pytexera/big_object_manager.py new file mode 100644 index 0000000000..f61549fb5f --- /dev/null +++ b/amber/src/main/python/pytexera/big_object_manager.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""BigObjectManager for reading large objects from S3.""" + +import os +from typing import BinaryIO +from core.storage.big_object_pointer import BigObjectPointer + + +class BigObjectStream: + """Stream for reading big objects (matches Scala BigObjectStream).""" + + def __init__(self, body: BinaryIO, pointer: BigObjectPointer): + self._body = body + self._pointer = pointer + self._closed = False + + def read(self, amt=None): + if self._closed: + raise ValueError("I/O operation on closed stream") + return self._body.read(amt) + + def close(self): + if not self._closed: + self._closed = True + self._body.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + @property + def closed(self): + return self._closed + + +class BigObjectManager: + """Manager for reading big objects from S3.""" + + _s3_client = None + + @classmethod + def _get_s3_client(cls): + """Initialize S3 client (lazy, cached).""" + if cls._s3_client is None: + try: + import boto3 + from botocore.config import Config + + cls._s3_client = boto3.client( + "s3", + endpoint_url=os.environ.get( + "STORAGE_S3_ENDPOINT", "http://localhost:9000" + ), + aws_access_key_id=os.environ.get( + "STORAGE_S3_AUTH_USERNAME", "texera_minio" + ), + aws_secret_access_key=os.environ.get( + "STORAGE_S3_AUTH_PASSWORD", "password" + ), + region_name=os.environ.get("STORAGE_S3_REGION", "us-west-2"), + config=Config( + signature_version="s3v4", s3={"addressing_style": "path"} + ), + ) + except ImportError: + raise RuntimeError("boto3 required. Install with: pip install boto3") + except Exception as e: + raise RuntimeError(f"Failed to initialize S3 client: {e}") + + return cls._s3_client + + @classmethod + def open(cls, pointer: BigObjectPointer) -> BigObjectStream: + """ + Open a big object for reading. + + Usage: with BigObjectManager.open(tuple_["document"]) as stream: + content = stream.read().decode('utf-8') + """ + if not isinstance(pointer, BigObjectPointer): + raise TypeError(f"Expected BigObjectPointer, got {type(pointer)}") + + # Parse s3://bucket/key + parts = pointer.uri.removeprefix("s3://").split("/", 1) + if len(parts) != 2: + raise ValueError( + f"Invalid S3 URI (expected s3://bucket/key): {pointer.uri}" + ) + + bucket, key = parts + + try: + body = cls._get_s3_client().get_object(Bucket=bucket, Key=key)["Body"] + return BigObjectStream(body, pointer) + except Exception as e: + raise RuntimeError(f"Failed to open {pointer.uri}: {e}") diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index 122c267271..e4cd11c197 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -183,7 +183,9 @@ libraryDependencies ++= Seq( "io.lakefs" % "sdk" % "1.51.0", // for lakeFS api calls "com.typesafe" % "config" % "1.4.3", // config reader "org.apache.commons" % "commons-jcs3-core" % "3.2", // Apache Commons JCS - "software.amazon.awssdk" % "s3" % "2.29.51", + "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll( + ExclusionRule(organization = "io.netty") + ), "software.amazon.awssdk" % "auth" % "2.29.51", "software.amazon.awssdk" % "regions" % "2.29.51", ) \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala index 8184560204..0efeea960f 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala @@ -128,7 +128,7 @@ object AttributeTypeUtils extends Serializable { case AttributeType.TIMESTAMP => parseTimestamp(field) case AttributeType.STRING => field.toString case AttributeType.BINARY => field - case AttributeType.BIG_OBJECT => field // Big objects are created programmatically, not parsed + case AttributeType.BIG_OBJECT => new BigObjectPointer(field.toString) case AttributeType.ANY | _ => field } } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala b/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala index 25443d5dc7..20f8775974 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala @@ -73,28 +73,28 @@ object ArrowUtils extends LazyLogging { Tuple .builder(schema) .addSequentially( - vectorSchemaRoot.getFieldVectors.asScala - .map((fieldVector: FieldVector) => { + vectorSchemaRoot.getFieldVectors.asScala.zipWithIndex.map { + case (fieldVector: FieldVector, index: Int) => val value: AnyRef = fieldVector.getObject(rowIndex) try { - val arrowType = fieldVector.getField.getFieldType.getType - val attributeType = toAttributeType(arrowType) + // Use the attribute type from the schema (which includes metadata) + // instead of deriving it from the Arrow type + val attributeType = schema.getAttributes(index).getType AttributeTypeUtils.parseField(value, attributeType) - } catch { case e: Exception => logger.warn("Caught error during parsing Arrow value back to Texera value", e) null } - }) - .toArray + }.toArray ) .build() } /** * Converts an Arrow Schema into Texera Schema. + * Checks field metadata to detect BIG_OBJECT types. * * @param arrowSchema The Arrow Schema to be converted. * @return A Texera Schema. @@ -102,7 +102,12 @@ object ArrowUtils extends LazyLogging { def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): Schema = Schema( arrowSchema.getFields.asScala.map { field => - new Attribute(field.getName, toAttributeType(field.getType)) + val isBigObject = Option(field.getMetadata) + .exists(m => m.containsKey("texera_type") && m.get("texera_type") == "BIG_OBJECT") + + val attributeType = + if (isBigObject) AttributeType.BIG_OBJECT else toAttributeType(field.getType) + new Attribute(field.getName, attributeType) }.toList ) @@ -207,11 +212,17 @@ object ArrowUtils extends LazyLogging { ) case _: ArrowType.Utf8 => - if (isNull) vector.asInstanceOf[VarCharVector].setNull(index) - else + if (isNull) { + vector.asInstanceOf[VarCharVector].setNull(index) + } else { + val stringValue = value match { + case ptr: BigObjectPointer => ptr.getUri + case _ => value.toString + } vector .asInstanceOf[VarCharVector] - .setSafe(index, value.asInstanceOf[String].getBytes(StandardCharsets.UTF_8)) + .setSafe(index, stringValue.getBytes(StandardCharsets.UTF_8)) + } case _: ArrowType.Binary | _: ArrowType.LargeBinary => if (isNull) vector.asInstanceOf[VarBinaryVector].setNull(index) else @@ -227,19 +238,32 @@ object ArrowUtils extends LazyLogging { /** * Converts an Amber schema into Arrow schema. + * Stores AttributeType in field metadata to preserve BIG_OBJECT type information. * * @param schema The Texera Schema. * @return An Arrow Schema. */ def fromTexeraSchema(schema: Schema): org.apache.arrow.vector.types.pojo.Schema = { - val arrowFields = new util.ArrayList[Field] - - for (amberAttribute <- schema.getAttributes) { - val name = amberAttribute.getName - val field = Field.nullablePrimitive(name, fromAttributeType(amberAttribute.getType)) - arrowFields.add(field) + val arrowFields = schema.getAttributes.map { attribute => + val metadata = if (attribute.getType == AttributeType.BIG_OBJECT) { + val map = new util.HashMap[String, String]() + map.put("texera_type", "BIG_OBJECT") + map + } else null + + new Field( + attribute.getName, + new org.apache.arrow.vector.types.pojo.FieldType( + true, // nullable + fromAttributeType(attribute.getType), + null, // dictionary encoding + metadata + ), + null // children + ) } - new org.apache.arrow.vector.types.pojo.Schema(arrowFields) + + new org.apache.arrow.vector.types.pojo.Schema(util.Arrays.asList(arrowFields: _*)) } /** @@ -270,7 +294,7 @@ object ArrowUtils extends LazyLogging { case AttributeType.BINARY => new ArrowType.Binary - case AttributeType.STRING | AttributeType.ANY => + case AttributeType.STRING | AttributeType.BIG_OBJECT | AttributeType.ANY => ArrowType.Utf8.INSTANCE case _ => diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala index 19586372cc..86688fd6b7 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -26,15 +26,12 @@ import org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT import java.io.{Closeable, InputStream} import java.util.UUID -import java.util.concurrent.ConcurrentHashMap import scala.jdk.CollectionConverters._ /** - * BigObjectStream wraps an InputStream and tracks its lifecycle for proper cleanup. + * BigObjectStream wraps an InputStream for reading big objects. */ -class BigObjectStream(private val inputStream: InputStream, private val pointer: BigObjectPointer) - extends InputStream - with Closeable { +class BigObjectStream(private val inputStream: InputStream) extends InputStream with Closeable { @volatile private var closed = false @@ -63,12 +60,10 @@ class BigObjectStream(private val inputStream: InputStream, private val pointer: if (!closed) { closed = true inputStream.close() - BigObjectManager.closeStream(pointer) } } def isClosed: Boolean = closed - def getPointer: BigObjectPointer = pointer } /** @@ -76,7 +71,6 @@ class BigObjectStream(private val inputStream: InputStream, private val pointer: */ object BigObjectManager extends LazyLogging { private val DEFAULT_BUCKET = "texera-big-objects" - private val openStreams = new ConcurrentHashMap[BigObjectPointer, BigObjectStream]() private lazy val context = SqlServer.getInstance().createDSLContext() /** @@ -134,9 +128,7 @@ object BigObjectManager extends LazyLogging { ) val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, ptr.getObjectKey) - val stream = new BigObjectStream(inputStream, ptr) - openStreams.put(ptr, stream) - stream + new BigObjectStream(inputStream) } /** @@ -166,7 +158,6 @@ object BigObjectManager extends LazyLogging { uris.foreach { uri => try { val ptr = new BigObjectPointer(uri) - Option(openStreams.get(ptr)).foreach(_.close()) S3StorageClient.deleteObject(ptr.getBucketName, ptr.getObjectKey) } catch { case e: Exception => @@ -180,20 +171,4 @@ object BigObjectManager extends LazyLogging { .where(BIG_OBJECT.EXECUTION_ID.eq(executionId)) .execute() } - - /** - * Closes a big object stream. Typically called automatically when the stream is closed. - */ - def close(ptr: BigObjectPointer): Unit = { - Option(openStreams.get(ptr)).foreach { stream => - if (!stream.isClosed) stream.close() - } - } - - /** - * Internal method to remove a stream from tracking when it's closed. - */ - private[util] def closeStream(ptr: BigObjectPointer): Unit = { - openStreams.remove(ptr) - } }
