This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit c2c59409e3a332a6eb9c5c9112b92601ec953e92
Author: Kunwoo Park <[email protected]>
AuthorDate: Wed Oct 15 22:01:16 2025 -0700

    Big Object Read for Python UDF Operators
---
 amber/operator-requirements.txt                    |   1 +
 .../core/architecture/packaging/output_manager.py  |  12 +-
 .../python/core/models/schema/attribute_type.py    |   6 +
 amber/src/main/python/core/models/schema/schema.py |  36 +++--
 amber/src/main/python/core/models/tuple.py         |  18 ++-
 .../main/python/core/storage/big_object_pointer.py |  65 +++++++++
 .../main/python/core/storage/document_factory.py   |   4 +-
 .../python/core/storage/iceberg/iceberg_utils.py   | 153 +++++++++++++++++++--
 amber/src/main/python/pytexera/__init__.py         |   2 +
 .../src/main/python/pytexera/big_object_manager.py | 114 +++++++++++++++
 common/workflow-core/build.sbt                     |   4 +-
 .../amber/core/tuple/AttributeTypeUtils.scala      |   2 +-
 .../scala/org/apache/amber/util/ArrowUtils.scala   |  62 ++++++---
 .../texera/service/util/BigObjectManager.scala     |  31 +----
 14 files changed, 441 insertions(+), 69 deletions(-)

diff --git a/amber/operator-requirements.txt b/amber/operator-requirements.txt
index fd33ef7470..b9014ba78a 100644
--- a/amber/operator-requirements.txt
+++ b/amber/operator-requirements.txt
@@ -22,3 +22,4 @@ pillow==10.2.0
 pybase64==1.3.2
 torch==2.8.0
 scikit-learn==1.5.0
+boto3==1.40.53
diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 76be4116b4..eb027920ee 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -259,10 +259,20 @@ class OutputManager:
         )
 
     def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame:
+        from core.storage.big_object_pointer import BigObjectPointer
+
         return DataFrame(
             frame=Table.from_pydict(
                 {
-                    name: [t[name] for t in tuples]
+                    name: [
+                        (
+                            # Convert BigObjectPointer objects to URI strings 
for Arrow serialization
+                            t[name].uri
+                            if isinstance(t[name], BigObjectPointer)
+                            else t[name]
+                        )
+                        for t in tuples
+                    ]
                     for name in self.get_port().get_schema().get_attr_names()
                 },
                 schema=self.get_port().get_schema().as_arrow_schema(),
diff --git a/amber/src/main/python/core/models/schema/attribute_type.py 
b/amber/src/main/python/core/models/schema/attribute_type.py
index 72799b015c..63c952d1f8 100644
--- a/amber/src/main/python/core/models/schema/attribute_type.py
+++ b/amber/src/main/python/core/models/schema/attribute_type.py
@@ -20,6 +20,7 @@ import pyarrow as pa
 from bidict import bidict
 from enum import Enum
 from pyarrow import lib
+from core.storage.big_object_pointer import BigObjectPointer
 
 
 class AttributeType(Enum):
@@ -37,6 +38,7 @@ class AttributeType(Enum):
     DOUBLE = 5
     TIMESTAMP = 6
     BINARY = 7
+    BIG_OBJECT = 8
 
 
 RAW_TYPE_MAPPING = bidict(
@@ -48,6 +50,7 @@ RAW_TYPE_MAPPING = bidict(
         "BOOLEAN": AttributeType.BOOL,
         "TIMESTAMP": AttributeType.TIMESTAMP,
         "BINARY": AttributeType.BINARY,
+        "BIG_OBJECT": AttributeType.BIG_OBJECT,
     }
 )
 
@@ -59,6 +62,7 @@ TO_ARROW_MAPPING = {
     AttributeType.BOOL: pa.bool_(),
     AttributeType.BINARY: pa.binary(),
     AttributeType.TIMESTAMP: pa.timestamp("us"),
+    AttributeType.BIG_OBJECT: pa.string(),  # Serialized as URI string
 }
 
 FROM_ARROW_MAPPING = {
@@ -83,6 +87,7 @@ TO_PYOBJECT_MAPPING = {
     AttributeType.BOOL: bool,
     AttributeType.BINARY: bytes,
     AttributeType.TIMESTAMP: datetime.datetime,
+    AttributeType.BIG_OBJECT: BigObjectPointer,
 }
 
 FROM_PYOBJECT_MAPPING = {
@@ -92,4 +97,5 @@ FROM_PYOBJECT_MAPPING = {
     bool: AttributeType.BOOL,
     bytes: AttributeType.BINARY,
     datetime.datetime: AttributeType.TIMESTAMP,
+    BigObjectPointer: AttributeType.BIG_OBJECT,
 }
diff --git a/amber/src/main/python/core/models/schema/schema.py 
b/amber/src/main/python/core/models/schema/schema.py
index 132ca23884..bcb323f523 100644
--- a/amber/src/main/python/core/models/schema/schema.py
+++ b/amber/src/main/python/core/models/schema/schema.py
@@ -81,26 +81,46 @@ class Schema:
     def _from_arrow_schema(self, arrow_schema: pa.Schema) -> None:
         """
         Resets the Schema by converting a pyarrow.Schema.
+        Checks field metadata to detect BIG_OBJECT types.
         :param arrow_schema: a pyarrow.Schema.
         :return:
         """
         self._name_type_mapping = OrderedDict()
         for attr_name in arrow_schema.names:
-            arrow_type = arrow_schema.field(attr_name).type  # type: ignore
-            attr_type = FROM_ARROW_MAPPING[arrow_type.id]
+            field = arrow_schema.field(attr_name)
+
+            # Check metadata for BIG_OBJECT type (stored by Scala ArrowUtils)
+            is_big_object = (
+                field.metadata and field.metadata.get(b"texera_type") == 
b"BIG_OBJECT"
+            )
+
+            attr_type = (
+                AttributeType.BIG_OBJECT
+                if is_big_object
+                else FROM_ARROW_MAPPING[field.type.id]
+            )
+
             self.add(attr_name, attr_type)
 
     def as_arrow_schema(self) -> pa.Schema:
         """
         Creates a new pyarrow.Schema according to the current Schema.
+        Includes metadata for BIG_OBJECT types to preserve type information.
         :return: pyarrow.Schema
         """
-        return pa.schema(
-            [
-                pa.field(attr_name, TO_ARROW_MAPPING[attr_type])
-                for attr_name, attr_type in self._name_type_mapping.items()
-            ]
-        )
+        fields = [
+            pa.field(
+                attr_name,
+                TO_ARROW_MAPPING[attr_type],
+                metadata=(
+                    {b"texera_type": b"BIG_OBJECT"}
+                    if attr_type == AttributeType.BIG_OBJECT
+                    else None
+                ),
+            )
+            for attr_name, attr_type in self._name_type_mapping.items()
+        ]
+        return pa.schema(fields)
 
     def get_attr_names(self) -> List[str]:
         """
diff --git a/amber/src/main/python/core/models/tuple.py 
b/amber/src/main/python/core/models/tuple.py
index e88f08286d..f88d75b4fe 100644
--- a/amber/src/main/python/core/models/tuple.py
+++ b/amber/src/main/python/core/models/tuple.py
@@ -29,6 +29,7 @@ from pympler import asizeof
 from typing import Any, List, Iterator, Callable
 from typing_extensions import Protocol, runtime_checkable
 
+from core.storage.big_object_pointer import BigObjectPointer
 from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType
 from .schema.field import Field
 from .schema.schema import Schema
@@ -86,6 +87,7 @@ class ArrowTableTupleProvider:
             """
             value = 
self._table.column(field_name).chunks[chunk_idx][tuple_idx].as_py()
             field_type = self._table.schema.field(field_name).type
+            field_metadata = self._table.schema.field(field_name).metadata
 
             # for binary types, convert pickled objects back.
             if (
@@ -94,6 +96,16 @@ class ArrowTableTupleProvider:
                 and value[:6] == b"pickle"
             ):
                 value = pickle.loads(value[10:])
+
+            # Convert URI string to BigObjectPointer for BIG_OBJECT types
+            # Metadata is set by Scala ArrowUtils or Python iceberg_utils
+            elif (
+                value is not None
+                and field_metadata
+                and field_metadata.get(b"texera_type") == b"BIG_OBJECT"
+            ):
+                value = BigObjectPointer(value)
+
             return value
 
         self._current_idx += 1
@@ -322,8 +334,10 @@ class Tuple:
 
         for field_name, field_value in self.as_key_value_pairs():
             expected = schema.get_attr_type(field_name)
-            if not isinstance(
-                field_value, (TO_PYOBJECT_MAPPING.get(expected), type(None))
+            expected_type = TO_PYOBJECT_MAPPING.get(expected)
+
+            if expected_type is not None and not isinstance(
+                field_value, (expected_type, type(None))
             ):
                 raise TypeError(
                     f"Unmatched type for field '{field_name}', expected 
{expected}, "
diff --git a/amber/src/main/python/core/storage/big_object_pointer.py 
b/amber/src/main/python/core/storage/big_object_pointer.py
new file mode 100644
index 0000000000..4aac9d3a63
--- /dev/null
+++ b/amber/src/main/python/core/storage/big_object_pointer.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+BigObjectPointer represents a reference to a large object stored externally 
(e.g., S3).
+This is a storage reference class used throughout the system for handling big 
objects.
+"""
+
+
+class BigObjectPointer:
+    """
+    Represents a pointer to a large object stored in S3.
+    The pointer is formatted as a URI: s3://bucket/path/to/object
+
+    This is a lightweight value object that only stores the URI reference.
+    To actually read the object's content, use BigObjectManager.open(pointer).
+    """
+
+    def __init__(self, uri: str):
+        """
+        Create a new BigObjectPointer.
+
+        Args:
+            uri: S3 URI in the format s3://bucket/path/to/object
+
+        Raises:
+            ValueError: If the URI is invalid or doesn't start with s3://
+        """
+        if not uri or not uri.startswith("s3://"):
+            raise ValueError(
+                f"BigObjectPointer URI must start with 's3://' but was: {uri}"
+            )
+        self.uri = uri
+
+    def __str__(self) -> str:
+        """Return the URI as string representation."""
+        return self.uri
+
+    def __repr__(self) -> str:
+        """Return developer-friendly representation."""
+        return f"BigObjectPointer('{self.uri}')"
+
+    def __eq__(self, other) -> bool:
+        """Check equality based on URI."""
+        if isinstance(other, BigObjectPointer):
+            return self.uri == other.uri
+        return False
+
+    def __hash__(self) -> int:
+        """Make BigObjectPointer hashable for use in sets/dicts."""
+        return hash(self.uri)
diff --git a/amber/src/main/python/core/storage/document_factory.py 
b/amber/src/main/python/core/storage/document_factory.py
index ba15069817..5e680b30fc 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -25,6 +25,7 @@ from core.storage.iceberg.iceberg_catalog_instance import 
IcebergCatalogInstance
 from core.storage.iceberg.iceberg_document import IcebergDocument
 from core.storage.iceberg.iceberg_utils import (
     create_table,
+    amber_schema_to_iceberg_schema,
     amber_tuples_to_arrow_table,
     arrow_table_to_amber_tuples,
     load_table_metadata,
@@ -63,7 +64,8 @@ class DocumentFactory:
             if resource_type in {VFSResourceType.RESULT}:
                 storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
 
-                iceberg_schema = Schema.as_arrow_schema(schema)
+                # Convert Amber Schema to Iceberg Schema with BIG_OBJECT field 
name encoding
+                iceberg_schema = amber_schema_to_iceberg_schema(schema)
 
                 create_table(
                     IcebergCatalogInstance.get_instance(),
diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py 
b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
index 12c841ed71..3c23df73b0 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
@@ -29,6 +29,112 @@ from typing import Optional, Iterable
 import core
 from core.models import ArrowTableTupleProvider, Tuple
 
+# Suffix used to encode BIG_OBJECT fields in Iceberg (must match Scala 
IcebergUtil)
+BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr"
+
+# Shared type mappings
+_ICEBERG_TO_AMBER_TYPE_MAPPING = {
+    "string": "STRING",
+    "int": "INT",
+    "integer": "INT",
+    "long": "LONG",
+    "double": "DOUBLE",
+    "float": "DOUBLE",
+    "boolean": "BOOL",
+    "timestamp": "TIMESTAMP",
+    "binary": "BINARY",
+}
+
+
+def encode_big_object_field_name(field_name: str, attr_type) -> str:
+    """Encodes BIG_OBJECT field names with suffix for Iceberg storage."""
+    from core.models.schema.attribute_type import AttributeType
+
+    return (
+        f"{field_name}{BIG_OBJECT_FIELD_SUFFIX}"
+        if attr_type == AttributeType.BIG_OBJECT
+        else field_name
+    )
+
+
+def decode_big_object_field_name(field_name: str) -> str:
+    """Decodes field names by removing BIG_OBJECT suffix if present."""
+    return (
+        field_name[: -len(BIG_OBJECT_FIELD_SUFFIX)]
+        if field_name.endswith(BIG_OBJECT_FIELD_SUFFIX)
+        else field_name
+    )
+
+
+def iceberg_schema_to_amber_schema(iceberg_schema: Schema):
+    """
+    Converts PyIceberg Schema to Amber Schema.
+    Decodes BIG_OBJECT field names and adds Arrow metadata.
+    """
+    from core.models.schema.attribute_type import AttributeType, 
TO_ARROW_MAPPING
+    import core.models
+
+    arrow_fields = []
+    for field in iceberg_schema.fields:
+        # Decode field name and detect BIG_OBJECT by suffix
+        decoded_name = decode_big_object_field_name(field.name)
+        is_big_object = field.name.endswith(BIG_OBJECT_FIELD_SUFFIX)
+
+        # Determine attribute type
+        if is_big_object:
+            attr_type = AttributeType.BIG_OBJECT
+        else:
+            iceberg_type_str = str(field.field_type).lower()
+            attr_type_name = _ICEBERG_TO_AMBER_TYPE_MAPPING.get(
+                iceberg_type_str, "STRING"
+            )
+            attr_type = getattr(AttributeType, attr_type_name)
+
+        # Create Arrow field with metadata
+        arrow_field = pa.field(
+            decoded_name,
+            TO_ARROW_MAPPING[attr_type],
+            metadata={b"texera_type": b"BIG_OBJECT"} if is_big_object else 
None,
+        )
+        arrow_fields.append(arrow_field)
+
+    return core.models.Schema(pa.schema(arrow_fields))
+
+
+def amber_schema_to_iceberg_schema(amber_schema) -> Schema:
+    """
+    Converts Amber Schema to PyIceberg Schema.
+    Encodes BIG_OBJECT field names with suffix.
+    """
+    from pyiceberg import types as iceberg_types
+    from core.models.schema.attribute_type import AttributeType
+
+    # Mapping from Amber AttributeType to Iceberg Type
+    TYPE_MAPPING = {
+        AttributeType.STRING: iceberg_types.StringType(),
+        AttributeType.INT: iceberg_types.IntegerType(),
+        AttributeType.LONG: iceberg_types.LongType(),
+        AttributeType.DOUBLE: iceberg_types.DoubleType(),
+        AttributeType.BOOL: iceberg_types.BooleanType(),
+        AttributeType.TIMESTAMP: iceberg_types.TimestampType(),
+        AttributeType.BINARY: iceberg_types.BinaryType(),
+        AttributeType.BIG_OBJECT: iceberg_types.StringType(),
+    }
+
+    fields = [
+        iceberg_types.NestedField(
+            field_id=idx,
+            name=encode_big_object_field_name(field_name, attr_type),
+            field_type=TYPE_MAPPING[attr_type],
+            required=False,
+        )
+        for idx, (field_name, attr_type) in enumerate(
+            amber_schema._name_type_mapping.items(), start=1
+        )
+    ]
+
+    return Schema(*fields)
+
 
 def create_postgres_catalog(
     catalog_name: str,
@@ -135,12 +241,29 @@ def amber_tuples_to_arrow_table(
 ) -> pa.Table:
     """
     Converts a list of amber tuples to a pyarrow table for serialization.
+    Handles BIG_OBJECT field name encoding and serialization.
     """
+    from core.storage.big_object_pointer import BigObjectPointer
+
+    # Build data dict using Iceberg schema field names (encoded)
+    data_dict = {}
+    for encoded_name in iceberg_schema.as_arrow().names:
+        # Decode to get the original field name used in tuples
+        decoded_name = decode_big_object_field_name(encoded_name)
+
+        # Extract values from tuples and serialize BigObjectPointer
+        values = []
+        for t in tuple_list:
+            value = t[decoded_name]
+            # Serialize BigObjectPointer to URI string for Iceberg storage
+            if isinstance(value, BigObjectPointer):
+                value = value.uri
+            values.append(value)
+
+        data_dict[encoded_name] = values
+
     return pa.Table.from_pydict(
-        {
-            name: [t[name] for t in tuple_list]
-            for name in iceberg_schema.as_arrow().names
-        },
+        data_dict,
         schema=iceberg_schema.as_arrow(),
     )
 
@@ -149,13 +272,27 @@ def arrow_table_to_amber_tuples(
     iceberg_schema: Schema, arrow_table: pa.Table
 ) -> Iterable[Tuple]:
     """
-    Converts an arrow table to a list of amber tuples for deserialization.
+    Converts an arrow table read from Iceberg to Amber tuples.
+    Properly handles BIG_OBJECT field name decoding and type detection.
     """
-    tuple_provider = ArrowTableTupleProvider(arrow_table)
+    # Convert Iceberg schema to Amber schema with proper metadata
+    amber_schema = iceberg_schema_to_amber_schema(iceberg_schema)
+
+    # Create an Arrow table with proper metadata by rebuilding it with the 
correct schema
+    # This ensures BIG_OBJECT metadata is available during tuple field access
+    arrow_table_with_metadata = pa.Table.from_arrays(
+        [arrow_table.column(col_name) for col_name in 
arrow_table.column_names],
+        schema=amber_schema.as_arrow_schema(),
+    )
+
+    tuple_provider = ArrowTableTupleProvider(arrow_table_with_metadata)
     return (
         Tuple(
-            {name: field_accessor for name in arrow_table.column_names},
-            schema=core.models.Schema(iceberg_schema.as_arrow()),
+            {
+                decode_big_object_field_name(name): field_accessor
+                for name in arrow_table.column_names
+            },
+            schema=amber_schema,
         )
         for field_accessor in tuple_provider
     )
diff --git a/amber/src/main/python/pytexera/__init__.py 
b/amber/src/main/python/pytexera/__init__.py
index db78319f49..2b3d59be10 100644
--- a/amber/src/main/python/pytexera/__init__.py
+++ b/amber/src/main/python/pytexera/__init__.py
@@ -27,6 +27,7 @@ from .udf.udf_operator import (
     UDFBatchOperator,
     UDFSourceOperator,
 )
+from .big_object_manager import BigObjectManager
 
 __all__ = [
     "State",
@@ -41,6 +42,7 @@ __all__ = [
     "UDFBatchOperator",
     "UDFSourceOperator",
     "DatasetFileDocument",
+    "BigObjectManager",
     # export external tools to be used
     "overrides",
     "logger",
diff --git a/amber/src/main/python/pytexera/big_object_manager.py 
b/amber/src/main/python/pytexera/big_object_manager.py
new file mode 100644
index 0000000000..f61549fb5f
--- /dev/null
+++ b/amber/src/main/python/pytexera/big_object_manager.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""BigObjectManager for reading large objects from S3."""
+
+import os
+from typing import BinaryIO
+from core.storage.big_object_pointer import BigObjectPointer
+
+
+class BigObjectStream:
+    """Stream for reading big objects (matches Scala BigObjectStream)."""
+
+    def __init__(self, body: BinaryIO, pointer: BigObjectPointer):
+        self._body = body
+        self._pointer = pointer
+        self._closed = False
+
+    def read(self, amt=None):
+        if self._closed:
+            raise ValueError("I/O operation on closed stream")
+        return self._body.read(amt)
+
+    def close(self):
+        if not self._closed:
+            self._closed = True
+            self._body.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    @property
+    def closed(self):
+        return self._closed
+
+
+class BigObjectManager:
+    """Manager for reading big objects from S3."""
+
+    _s3_client = None
+
+    @classmethod
+    def _get_s3_client(cls):
+        """Initialize S3 client (lazy, cached)."""
+        if cls._s3_client is None:
+            try:
+                import boto3
+                from botocore.config import Config
+
+                cls._s3_client = boto3.client(
+                    "s3",
+                    endpoint_url=os.environ.get(
+                        "STORAGE_S3_ENDPOINT", "http://localhost:9000";
+                    ),
+                    aws_access_key_id=os.environ.get(
+                        "STORAGE_S3_AUTH_USERNAME", "texera_minio"
+                    ),
+                    aws_secret_access_key=os.environ.get(
+                        "STORAGE_S3_AUTH_PASSWORD", "password"
+                    ),
+                    region_name=os.environ.get("STORAGE_S3_REGION", 
"us-west-2"),
+                    config=Config(
+                        signature_version="s3v4", s3={"addressing_style": 
"path"}
+                    ),
+                )
+            except ImportError:
+                raise RuntimeError("boto3 required. Install with: pip install 
boto3")
+            except Exception as e:
+                raise RuntimeError(f"Failed to initialize S3 client: {e}")
+
+        return cls._s3_client
+
+    @classmethod
+    def open(cls, pointer: BigObjectPointer) -> BigObjectStream:
+        """
+        Open a big object for reading.
+
+        Usage: with BigObjectManager.open(tuple_["document"]) as stream:
+                   content = stream.read().decode('utf-8')
+        """
+        if not isinstance(pointer, BigObjectPointer):
+            raise TypeError(f"Expected BigObjectPointer, got {type(pointer)}")
+
+        # Parse s3://bucket/key
+        parts = pointer.uri.removeprefix("s3://").split("/", 1)
+        if len(parts) != 2:
+            raise ValueError(
+                f"Invalid S3 URI (expected s3://bucket/key): {pointer.uri}"
+            )
+
+        bucket, key = parts
+
+        try:
+            body = cls._get_s3_client().get_object(Bucket=bucket, 
Key=key)["Body"]
+            return BigObjectStream(body, pointer)
+        except Exception as e:
+            raise RuntimeError(f"Failed to open {pointer.uri}: {e}")
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index 122c267271..e4cd11c197 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -183,7 +183,9 @@ libraryDependencies ++= Seq(
   "io.lakefs" % "sdk" % "1.51.0",                                     // for 
lakeFS api calls
   "com.typesafe" % "config" % "1.4.3",                                 // 
config reader
   "org.apache.commons" % "commons-jcs3-core" % "3.2",                 // 
Apache Commons JCS
-  "software.amazon.awssdk" % "s3" % "2.29.51",
+  "software.amazon.awssdk" % "s3" % "2.29.51" excludeAll(
+    ExclusionRule(organization = "io.netty")
+  ),
   "software.amazon.awssdk" % "auth" % "2.29.51",
   "software.amazon.awssdk" % "regions" % "2.29.51",
 )
\ No newline at end of file
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
index 8184560204..0efeea960f 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
@@ -128,7 +128,7 @@ object AttributeTypeUtils extends Serializable {
       case AttributeType.TIMESTAMP  => parseTimestamp(field)
       case AttributeType.STRING     => field.toString
       case AttributeType.BINARY     => field
-      case AttributeType.BIG_OBJECT => field // Big objects are created 
programmatically, not parsed
+      case AttributeType.BIG_OBJECT => new BigObjectPointer(field.toString)
       case AttributeType.ANY | _    => field
     }
   }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala 
b/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala
index 25443d5dc7..20f8775974 100644
--- a/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala
+++ b/common/workflow-core/src/main/scala/org/apache/amber/util/ArrowUtils.scala
@@ -73,28 +73,28 @@ object ArrowUtils extends LazyLogging {
     Tuple
       .builder(schema)
       .addSequentially(
-        vectorSchemaRoot.getFieldVectors.asScala
-          .map((fieldVector: FieldVector) => {
+        vectorSchemaRoot.getFieldVectors.asScala.zipWithIndex.map {
+          case (fieldVector: FieldVector, index: Int) =>
             val value: AnyRef = fieldVector.getObject(rowIndex)
             try {
-              val arrowType = fieldVector.getField.getFieldType.getType
-              val attributeType = toAttributeType(arrowType)
+              // Use the attribute type from the schema (which includes 
metadata)
+              // instead of deriving it from the Arrow type
+              val attributeType = schema.getAttributes(index).getType
               AttributeTypeUtils.parseField(value, attributeType)
-
             } catch {
               case e: Exception =>
                 logger.warn("Caught error during parsing Arrow value back to 
Texera value", e)
                 null
             }
 
-          })
-          .toArray
+        }.toArray
       )
       .build()
   }
 
   /**
     * Converts an Arrow Schema into Texera Schema.
+    * Checks field metadata to detect BIG_OBJECT types.
     *
     * @param arrowSchema The Arrow Schema to be converted.
     * @return A Texera Schema.
@@ -102,7 +102,12 @@ object ArrowUtils extends LazyLogging {
   def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema): 
Schema =
     Schema(
       arrowSchema.getFields.asScala.map { field =>
-        new Attribute(field.getName, toAttributeType(field.getType))
+        val isBigObject = Option(field.getMetadata)
+          .exists(m => m.containsKey("texera_type") && m.get("texera_type") == 
"BIG_OBJECT")
+
+        val attributeType =
+          if (isBigObject) AttributeType.BIG_OBJECT else 
toAttributeType(field.getType)
+        new Attribute(field.getName, attributeType)
       }.toList
     )
 
@@ -207,11 +212,17 @@ object ArrowUtils extends LazyLogging {
             )
 
         case _: ArrowType.Utf8 =>
-          if (isNull) vector.asInstanceOf[VarCharVector].setNull(index)
-          else
+          if (isNull) {
+            vector.asInstanceOf[VarCharVector].setNull(index)
+          } else {
+            val stringValue = value match {
+              case ptr: BigObjectPointer => ptr.getUri
+              case _                     => value.toString
+            }
             vector
               .asInstanceOf[VarCharVector]
-              .setSafe(index, 
value.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+              .setSafe(index, stringValue.getBytes(StandardCharsets.UTF_8))
+          }
         case _: ArrowType.Binary | _: ArrowType.LargeBinary =>
           if (isNull) vector.asInstanceOf[VarBinaryVector].setNull(index)
           else
@@ -227,19 +238,32 @@ object ArrowUtils extends LazyLogging {
 
   /**
     * Converts an Amber schema into Arrow schema.
+    * Stores AttributeType in field metadata to preserve BIG_OBJECT type 
information.
     *
     * @param schema The Texera Schema.
     * @return An Arrow Schema.
     */
   def fromTexeraSchema(schema: Schema): 
org.apache.arrow.vector.types.pojo.Schema = {
-    val arrowFields = new util.ArrayList[Field]
-
-    for (amberAttribute <- schema.getAttributes) {
-      val name = amberAttribute.getName
-      val field = Field.nullablePrimitive(name, 
fromAttributeType(amberAttribute.getType))
-      arrowFields.add(field)
+    val arrowFields = schema.getAttributes.map { attribute =>
+      val metadata = if (attribute.getType == AttributeType.BIG_OBJECT) {
+        val map = new util.HashMap[String, String]()
+        map.put("texera_type", "BIG_OBJECT")
+        map
+      } else null
+
+      new Field(
+        attribute.getName,
+        new org.apache.arrow.vector.types.pojo.FieldType(
+          true, // nullable
+          fromAttributeType(attribute.getType),
+          null, // dictionary encoding
+          metadata
+        ),
+        null // children
+      )
     }
-    new org.apache.arrow.vector.types.pojo.Schema(arrowFields)
+
+    new 
org.apache.arrow.vector.types.pojo.Schema(util.Arrays.asList(arrowFields: _*))
   }
 
   /**
@@ -270,7 +294,7 @@ object ArrowUtils extends LazyLogging {
       case AttributeType.BINARY =>
         new ArrowType.Binary
 
-      case AttributeType.STRING | AttributeType.ANY =>
+      case AttributeType.STRING | AttributeType.BIG_OBJECT | AttributeType.ANY 
=>
         ArrowType.Utf8.INSTANCE
 
       case _ =>
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
index 19586372cc..86688fd6b7 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -26,15 +26,12 @@ import 
org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT
 
 import java.io.{Closeable, InputStream}
 import java.util.UUID
-import java.util.concurrent.ConcurrentHashMap
 import scala.jdk.CollectionConverters._
 
 /**
-  * BigObjectStream wraps an InputStream and tracks its lifecycle for proper 
cleanup.
+  * BigObjectStream wraps an InputStream for reading big objects.
   */
-class BigObjectStream(private val inputStream: InputStream, private val 
pointer: BigObjectPointer)
-    extends InputStream
-    with Closeable {
+class BigObjectStream(private val inputStream: InputStream) extends 
InputStream with Closeable {
 
   @volatile private var closed = false
 
@@ -63,12 +60,10 @@ class BigObjectStream(private val inputStream: InputStream, 
private val pointer:
     if (!closed) {
       closed = true
       inputStream.close()
-      BigObjectManager.closeStream(pointer)
     }
   }
 
   def isClosed: Boolean = closed
-  def getPointer: BigObjectPointer = pointer
 }
 
 /**
@@ -76,7 +71,6 @@ class BigObjectStream(private val inputStream: InputStream, 
private val pointer:
   */
 object BigObjectManager extends LazyLogging {
   private val DEFAULT_BUCKET = "texera-big-objects"
-  private val openStreams = new ConcurrentHashMap[BigObjectPointer, 
BigObjectStream]()
   private lazy val context = SqlServer.getInstance().createDSLContext()
 
   /**
@@ -134,9 +128,7 @@ object BigObjectManager extends LazyLogging {
     )
 
     val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, 
ptr.getObjectKey)
-    val stream = new BigObjectStream(inputStream, ptr)
-    openStreams.put(ptr, stream)
-    stream
+    new BigObjectStream(inputStream)
   }
 
   /**
@@ -166,7 +158,6 @@ object BigObjectManager extends LazyLogging {
     uris.foreach { uri =>
       try {
         val ptr = new BigObjectPointer(uri)
-        Option(openStreams.get(ptr)).foreach(_.close())
         S3StorageClient.deleteObject(ptr.getBucketName, ptr.getObjectKey)
       } catch {
         case e: Exception =>
@@ -180,20 +171,4 @@ object BigObjectManager extends LazyLogging {
       .where(BIG_OBJECT.EXECUTION_ID.eq(executionId))
       .execute()
   }
-
-  /**
-    * Closes a big object stream. Typically called automatically when the 
stream is closed.
-    */
-  def close(ptr: BigObjectPointer): Unit = {
-    Option(openStreams.get(ptr)).foreach { stream =>
-      if (!stream.isClosed) stream.close()
-    }
-  }
-
-  /**
-    * Internal method to remove a stream from tracking when it's closed.
-    */
-  private[util] def closeStream(ptr: BigObjectPointer): Unit = {
-    openStreams.remove(ptr)
-  }
 }

Reply via email to