This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 4f822e376a feat: Python Support for Large Binary (#4100)
4f822e376a is described below
commit 4f822e376afb40be3c6035762d98d7851f995dee
Author: Chris <[email protected]>
AuthorDate: Mon Dec 15 03:41:52 2025 -0800
feat: Python Support for Large Binary (#4100)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR introduces Python support for the `large_binary` attribute type,
enabling Python UDF operators to process data larger than 2 GB. Data is
offloaded to MinIO (S3), and the tuple retains only a pointer (URI).
This mirrors the existing Java LargeBinary implementation, ensuring
cross-language compatibility. (See #4067 for system diagram and #4111
for renaming)
## Key Features
### 1. MinIO/S3 Integration
- Utilizes the shared `texera-large-binaries` bucket.
- Implements lazy initialization of S3 clients and automatic bucket
creation.
### 2. Streaming I/O
- **`LargeBinaryOutputStream`:** Writes data to S3 using multipart
uploads (64KB chunks) to prevent blocking the main execution.
- **`LargeBinaryInputStream`:** Lazily downloads data only when the read
operation begins. Implements standard Python `io.IOBase`.
### 3. Tuple & Iceberg Compatibility
- `largebinary` instances are automatically serialized to URI strings
for Iceberg storage and Arrow tables.
- Uses a magic suffix (`__texera_large_binary_ptr`) to distinguish
pointers from standard strings.
### 4. Serialization
- Pointers are stored as strings with metadata (`texera_type:
LARGE_BINARY`). Auto-conversion ensures UDFs always see `largebinary`
instances, not raw strings.
## User API Usage
### 1. Creating & Writing (Output)
Use `LargeBinaryOutputStream` to stream large data into a new object.
```python
from pytexera import largebinary, LargeBinaryOutputStream
# Create a new handle
large_binary = largebinary()
# Stream data to S3
with LargeBinaryOutputStream(large_binary) as out:
out.write(my_large_data_bytes)
# Supports bytearray, bytes, etc.
```
### 2. Reading (Input)
Use `LargeBinaryInputStream` to read data back. It supports all standard
Python stream methods.
```python
from pytexera import LargeBinaryInputStream
with LargeBinaryInputStream(large_binary) as stream:
# Option A: Read everything
all_data = stream.read()
# Option B: Chunked reading
chunk = stream.read(1024)
# Option C: Iteration
for line in stream:
process(line)
```
## Dependencies
- `boto3`: Required for S3 interactions.
- `StorageConfig`: Uses existing configuration for
endpoints/credentials.
## Future Direction
- Support for R UDF Operators
- Check #4123
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
Design: #3787
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Tested by running this workflow multiple times and check MinIO dashboard
to see whether six objects are created and deleted. Specify the file
scan operator's property to use any file bigger than 2GB.
[Large Binary
Python.json](https://github.com/user-attachments/files/24062982/Large.Binary.Python.json)
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No.
---------
Signed-off-by: Chris <[email protected]>
---
amber/operator-requirements.txt | 1 +
.../core/architecture/packaging/output_manager.py | 2 +-
amber/src/main/python/core/models/__init__.py | 3 +-
.../src/main/python/core/models/schema/__init__.py | 2 +
.../core/models/schema/arrow_schema_utils.py | 63 ++++++
.../python/core/models/schema/attribute_type.py | 6 +
.../core/models/schema/attribute_type_utils.py | 72 ++++++
amber/src/main/python/core/models/schema/schema.py | 19 +-
.../main/python/core/models/schema/test_schema.py | 65 ++++++
amber/src/main/python/core/models/test_tuple.py | 98 +++++++++
amber/src/main/python/core/models/tuple.py | 33 +++
.../main/python/core/models/type/__init__.py} | 11 +-
.../main/python/core/models/type/large_binary.py | 98 +++++++++
.../python/core/models/type/test_large_binary.py | 88 ++++++++
.../main/python/core/storage/document_factory.py | 5 +-
.../python/core/storage/iceberg/iceberg_utils.py | 136 +++++++++++-
.../core/storage/iceberg/test_iceberg_document.py | 4 +
.../iceberg/test_iceberg_utils_large_binary.py | 230 +++++++++++++++++++
.../src/main/python/core/storage/storage_config.py | 19 +-
amber/src/main/python/pytexera/__init__.py | 6 +
.../pytexera/storage/large_binary_input_stream.py | 121 ++++++++++
.../pytexera/storage/large_binary_manager.py | 78 +++++++
.../pytexera/storage/large_binary_output_stream.py | 244 +++++++++++++++++++++
.../storage/test_large_binary_input_stream.py | 222 +++++++++++++++++++
.../pytexera/storage/test_large_binary_manager.py | 150 +++++++++++++
.../storage/test_large_binary_output_stream.py | 238 ++++++++++++++++++++
amber/src/main/python/texera_run_python_worker.py | 8 +
.../pythonworker/PythonWorkflowWorker.scala | 6 +-
.../texera/web/service/WorkflowService.scala | 2 +-
build.sbt | 14 +-
common/workflow-core/build.sbt | 17 ++
.../org/apache/texera/amber/util/ArrowUtils.scala | 49 +++--
.../apache/texera/amber/util/ArrowUtilsSpec.scala | 144 +++++++++++-
33 files changed, 2197 insertions(+), 57 deletions(-)
diff --git a/amber/operator-requirements.txt b/amber/operator-requirements.txt
index ce114ea841..3328d152f0 100644
--- a/amber/operator-requirements.txt
+++ b/amber/operator-requirements.txt
@@ -23,3 +23,4 @@ pybase64==1.3.2
torch==2.8.0
scikit-learn==1.5.0
transformers==4.57.3
+boto3==1.40.53
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index 4003429baf..bf4afbf396 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -262,7 +262,7 @@ class OutputManager:
return DataFrame(
frame=Table.from_pydict(
{
- name: [t[name] for t in tuples]
+ name: [t.get_serialized_field(name) for t in tuples]
for name in self.get_port().get_schema().get_attr_names()
},
schema=self.get_port().get_schema().as_arrow_schema(),
diff --git a/amber/src/main/python/core/models/__init__.py
b/amber/src/main/python/core/models/__init__.py
index 9011c1db88..d24fe0a277 100644
--- a/amber/src/main/python/core/models/__init__.py
+++ b/amber/src/main/python/core/models/__init__.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import builtins
from inspect import Traceback
from typing import NamedTuple
@@ -36,7 +37,7 @@ from .payload import DataFrame, DataPayload, StateFrame
class ExceptionInfo(NamedTuple):
- exc: type
+ exc: builtins.type
value: Exception
tb: Traceback
diff --git a/amber/src/main/python/core/models/schema/__init__.py
b/amber/src/main/python/core/models/schema/__init__.py
index 7c8b57bb31..306fe4e1d4 100644
--- a/amber/src/main/python/core/models/schema/__init__.py
+++ b/amber/src/main/python/core/models/schema/__init__.py
@@ -16,12 +16,14 @@
# under the License.
from .attribute_type import AttributeType
+from core.models.type.large_binary import largebinary
from .field import Field
from .schema import Schema
__all__ = [
"AttributeType",
+ "largebinary",
"Field",
"Schema",
]
diff --git a/amber/src/main/python/core/models/schema/arrow_schema_utils.py
b/amber/src/main/python/core/models/schema/arrow_schema_utils.py
new file mode 100644
index 0000000000..527e0095e6
--- /dev/null
+++ b/amber/src/main/python/core/models/schema/arrow_schema_utils.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Utilities for converting between Arrow schemas and Amber schemas,
+handling LARGE_BINARY metadata preservation.
+"""
+
+import pyarrow as pa
+from typing import Mapping
+
+from core.models.schema.attribute_type import AttributeType
+from core.models.schema.attribute_type_utils import (
+ detect_attribute_type_from_arrow_field,
+ create_arrow_field_with_metadata,
+)
+
+
+def arrow_schema_to_attr_types(arrow_schema: pa.Schema) -> dict[str,
AttributeType]:
+ """
+ Converts an Arrow schema to a dictionary of attribute name to
AttributeType.
+ Handles LARGE_BINARY metadata detection.
+
+ :param arrow_schema: PyArrow schema that may contain LARGE_BINARY metadata
+ :return: Dictionary mapping attribute names to AttributeTypes
+ """
+ attr_types = {}
+ for attr_name in arrow_schema.names:
+ field = arrow_schema.field(attr_name)
+ attr_types[attr_name] = detect_attribute_type_from_arrow_field(field)
+ return attr_types
+
+
+def attr_types_to_arrow_schema(
+ attr_types: Mapping[str, AttributeType],
+) -> pa.Schema:
+ """
+ Converts a mapping of attribute name to AttributeType into an Arrow schema.
+ Adds metadata for LARGE_BINARY types.
+ Preserves the order of attributes from the input mapping.
+
+ :param attr_types: Mapping of attribute names to AttributeTypes (e.g.,
OrderedDict)
+ :return: PyArrow schema with metadata for LARGE_BINARY types
+ """
+ fields = [
+ create_arrow_field_with_metadata(attr_name, attr_type)
+ for attr_name, attr_type in attr_types.items()
+ ]
+ return pa.schema(fields)
diff --git a/amber/src/main/python/core/models/schema/attribute_type.py
b/amber/src/main/python/core/models/schema/attribute_type.py
index 72799b015c..24d0745f41 100644
--- a/amber/src/main/python/core/models/schema/attribute_type.py
+++ b/amber/src/main/python/core/models/schema/attribute_type.py
@@ -20,6 +20,7 @@ import pyarrow as pa
from bidict import bidict
from enum import Enum
from pyarrow import lib
+from core.models.type.large_binary import largebinary
class AttributeType(Enum):
@@ -37,6 +38,7 @@ class AttributeType(Enum):
DOUBLE = 5
TIMESTAMP = 6
BINARY = 7
+ LARGE_BINARY = 8
RAW_TYPE_MAPPING = bidict(
@@ -48,6 +50,7 @@ RAW_TYPE_MAPPING = bidict(
"BOOLEAN": AttributeType.BOOL,
"TIMESTAMP": AttributeType.TIMESTAMP,
"BINARY": AttributeType.BINARY,
+ "LARGE_BINARY": AttributeType.LARGE_BINARY,
}
)
@@ -59,6 +62,7 @@ TO_ARROW_MAPPING = {
AttributeType.BOOL: pa.bool_(),
AttributeType.BINARY: pa.binary(),
AttributeType.TIMESTAMP: pa.timestamp("us"),
+ AttributeType.LARGE_BINARY: pa.string(), # Serialized as URI string
}
FROM_ARROW_MAPPING = {
@@ -83,6 +87,7 @@ TO_PYOBJECT_MAPPING = {
AttributeType.BOOL: bool,
AttributeType.BINARY: bytes,
AttributeType.TIMESTAMP: datetime.datetime,
+ AttributeType.LARGE_BINARY: largebinary,
}
FROM_PYOBJECT_MAPPING = {
@@ -92,4 +97,5 @@ FROM_PYOBJECT_MAPPING = {
bool: AttributeType.BOOL,
bytes: AttributeType.BINARY,
datetime.datetime: AttributeType.TIMESTAMP,
+ largebinary: AttributeType.LARGE_BINARY,
}
diff --git a/amber/src/main/python/core/models/schema/attribute_type_utils.py
b/amber/src/main/python/core/models/schema/attribute_type_utils.py
new file mode 100644
index 0000000000..3918fdfc34
--- /dev/null
+++ b/amber/src/main/python/core/models/schema/attribute_type_utils.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Utilities for converting between AttributeTypes and Arrow field types,
+handling LARGE_BINARY metadata preservation.
+"""
+
+import pyarrow as pa
+
+from core.models.schema.attribute_type import (
+ AttributeType,
+ FROM_ARROW_MAPPING,
+ TO_ARROW_MAPPING,
+)
+
+# Metadata key used to mark LARGE_BINARY fields in Arrow schemas
+TEXERA_TYPE_METADATA_KEY = b"texera_type"
+LARGE_BINARY_METADATA_VALUE = b"LARGE_BINARY"
+
+
+def detect_attribute_type_from_arrow_field(field: pa.Field) -> AttributeType:
+ """
+ Detects the AttributeType from an Arrow field, checking metadata for
LARGE_BINARY.
+
+ :param field: PyArrow field that may contain metadata
+ :return: The detected AttributeType
+ """
+ # Check metadata for LARGE_BINARY type
+ # (can be stored by either Scala ArrowUtils or Python)
+ is_large_binary = (
+ field.metadata
+ and field.metadata.get(TEXERA_TYPE_METADATA_KEY) ==
LARGE_BINARY_METADATA_VALUE
+ )
+
+ if is_large_binary:
+ return AttributeType.LARGE_BINARY
+ else:
+ return FROM_ARROW_MAPPING[field.type.id]
+
+
+def create_arrow_field_with_metadata(
+ attr_name: str, attr_type: AttributeType
+) -> pa.Field:
+ """
+ Creates a PyArrow field with appropriate metadata for the given
AttributeType.
+
+ :param attr_name: Name of the attribute
+ :param attr_type: The AttributeType
+ :return: PyArrow field with metadata if needed
+ """
+ metadata = (
+ {TEXERA_TYPE_METADATA_KEY: LARGE_BINARY_METADATA_VALUE}
+ if attr_type == AttributeType.LARGE_BINARY
+ else None
+ )
+
+ return pa.field(attr_name, TO_ARROW_MAPPING[attr_type], metadata=metadata)
diff --git a/amber/src/main/python/core/models/schema/schema.py
b/amber/src/main/python/core/models/schema/schema.py
index 132ca23884..d349807ab7 100644
--- a/amber/src/main/python/core/models/schema/schema.py
+++ b/amber/src/main/python/core/models/schema/schema.py
@@ -22,8 +22,10 @@ from typing import MutableMapping, Optional, Mapping, List,
Tuple
from core.models.schema.attribute_type import (
AttributeType,
RAW_TYPE_MAPPING,
- FROM_ARROW_MAPPING,
- TO_ARROW_MAPPING,
+)
+from core.models.schema.arrow_schema_utils import (
+ arrow_schema_to_attr_types,
+ attr_types_to_arrow_schema,
)
@@ -85,22 +87,17 @@ class Schema:
:return:
"""
self._name_type_mapping = OrderedDict()
+ attr_types = arrow_schema_to_attr_types(arrow_schema)
+ # Preserve field order from arrow_schema
for attr_name in arrow_schema.names:
- arrow_type = arrow_schema.field(attr_name).type # type: ignore
- attr_type = FROM_ARROW_MAPPING[arrow_type.id]
- self.add(attr_name, attr_type)
+ self.add(attr_name, attr_types[attr_name])
def as_arrow_schema(self) -> pa.Schema:
"""
Creates a new pyarrow.Schema according to the current Schema.
:return: pyarrow.Schema
"""
- return pa.schema(
- [
- pa.field(attr_name, TO_ARROW_MAPPING[attr_type])
- for attr_name, attr_type in self._name_type_mapping.items()
- ]
- )
+ return attr_types_to_arrow_schema(self._name_type_mapping)
def get_attr_names(self) -> List[str]:
"""
diff --git a/amber/src/main/python/core/models/schema/test_schema.py
b/amber/src/main/python/core/models/schema/test_schema.py
index 2da4e70655..60e4c848a5 100644
--- a/amber/src/main/python/core/models/schema/test_schema.py
+++ b/amber/src/main/python/core/models/schema/test_schema.py
@@ -89,3 +89,68 @@ class TestSchema:
def test_convert_from_arrow_schema(self, arrow_schema, schema):
assert schema == Schema(arrow_schema=arrow_schema)
assert schema.as_arrow_schema() == arrow_schema
+
+ def test_large_binary_in_raw_schema(self):
+ """Test creating schema with LARGE_BINARY from raw schema."""
+ raw_schema = {
+ "regular_field": "STRING",
+ "large_binary_field": "LARGE_BINARY",
+ }
+ schema = Schema(raw_schema=raw_schema)
+ assert schema.get_attr_type("regular_field") == AttributeType.STRING
+ assert schema.get_attr_type("large_binary_field") ==
AttributeType.LARGE_BINARY
+
+ def test_large_binary_in_arrow_schema_with_metadata(self):
+ """Test creating schema with LARGE_BINARY from Arrow schema with
metadata."""
+ arrow_schema = pa.schema(
+ [
+ pa.field("regular_field", pa.string()),
+ pa.field(
+ "large_binary_field",
+ pa.string(),
+ metadata={b"texera_type": b"LARGE_BINARY"},
+ ),
+ ]
+ )
+ schema = Schema(arrow_schema=arrow_schema)
+ assert schema.get_attr_type("regular_field") == AttributeType.STRING
+ assert schema.get_attr_type("large_binary_field") ==
AttributeType.LARGE_BINARY
+
+ def test_large_binary_as_arrow_schema_includes_metadata(self):
+ """Test that LARGE_BINARY fields include metadata in Arrow schema."""
+ schema = Schema()
+ schema.add("regular_field", AttributeType.STRING)
+ schema.add("large_binary_field", AttributeType.LARGE_BINARY)
+
+ arrow_schema = schema.as_arrow_schema()
+
+ # Regular field should have no metadata
+ regular_field = arrow_schema.field("regular_field")
+ assert (
+ regular_field.metadata is None
+ or b"texera_type" not in regular_field.metadata
+ )
+
+ # LARGE_BINARY field should have metadata
+ large_binary_field = arrow_schema.field("large_binary_field")
+ assert large_binary_field.metadata is not None
+ assert large_binary_field.metadata.get(b"texera_type") ==
b"LARGE_BINARY"
+ assert (
+ large_binary_field.type == pa.string()
+ ) # LARGE_BINARY is stored as string
+
+ def test_round_trip_large_binary_schema(self):
+ """Test round-trip conversion of schema with LARGE_BINARY."""
+ original_schema = Schema()
+ original_schema.add("field1", AttributeType.STRING)
+ original_schema.add("field2", AttributeType.LARGE_BINARY)
+ original_schema.add("field3", AttributeType.INT)
+
+ # Convert to Arrow and back
+ arrow_schema = original_schema.as_arrow_schema()
+ round_trip_schema = Schema(arrow_schema=arrow_schema)
+
+ assert round_trip_schema == original_schema
+ assert round_trip_schema.get_attr_type("field1") ==
AttributeType.STRING
+ assert round_trip_schema.get_attr_type("field2") ==
AttributeType.LARGE_BINARY
+ assert round_trip_schema.get_attr_type("field3") == AttributeType.INT
diff --git a/amber/src/main/python/core/models/test_tuple.py
b/amber/src/main/python/core/models/test_tuple.py
index bfce7bb94f..efb4fdf5c7 100644
--- a/amber/src/main/python/core/models/test_tuple.py
+++ b/amber/src/main/python/core/models/test_tuple.py
@@ -221,3 +221,101 @@ class TestTuple:
schema,
)
assert hash(tuple5) == -2099556631 # calculated with Java
+
+ def test_tuple_with_large_binary(self):
+ """Test tuple with largebinary field."""
+ from core.models.type.large_binary import largebinary
+
+ schema = Schema(
+ raw_schema={
+ "regular_field": "STRING",
+ "large_binary_field": "LARGE_BINARY",
+ }
+ )
+
+ large_binary = largebinary("s3://test-bucket/path/to/object")
+ tuple_ = Tuple(
+ {
+ "regular_field": "test string",
+ "large_binary_field": large_binary,
+ },
+ schema=schema,
+ )
+
+ assert tuple_["regular_field"] == "test string"
+ assert tuple_["large_binary_field"] == large_binary
+ assert isinstance(tuple_["large_binary_field"], largebinary)
+ assert tuple_["large_binary_field"].uri ==
"s3://test-bucket/path/to/object"
+
+ def test_tuple_from_arrow_with_large_binary(self):
+ """Test creating tuple from Arrow table with LARGE_BINARY metadata."""
+ import pyarrow as pa
+ from core.models.type.large_binary import largebinary
+
+ # Create Arrow schema with LARGE_BINARY metadata
+ arrow_schema = pa.schema(
+ [
+ pa.field("regular_field", pa.string()),
+ pa.field(
+ "large_binary_field",
+ pa.string(),
+ metadata={b"texera_type": b"LARGE_BINARY"},
+ ),
+ ]
+ )
+
+ # Create Arrow table with URI string for large_binary_field
+ arrow_table = pa.Table.from_pydict(
+ {
+ "regular_field": ["test"],
+ "large_binary_field": ["s3://test-bucket/path/to/object"],
+ },
+ schema=arrow_schema,
+ )
+
+ # Create tuple from Arrow table
+ tuple_provider = ArrowTableTupleProvider(arrow_table)
+ tuples = [
+ Tuple({name: field_accessor for name in arrow_table.column_names})
+ for field_accessor in tuple_provider
+ ]
+
+ assert len(tuples) == 1
+ tuple_ = tuples[0]
+ assert tuple_["regular_field"] == "test"
+ assert isinstance(tuple_["large_binary_field"], largebinary)
+ assert tuple_["large_binary_field"].uri ==
"s3://test-bucket/path/to/object"
+
+ def test_tuple_with_null_large_binary(self):
+ """Test tuple with null largebinary field."""
+ import pyarrow as pa
+
+ # Create Arrow schema with LARGE_BINARY metadata
+ arrow_schema = pa.schema(
+ [
+ pa.field(
+ "large_binary_field",
+ pa.string(),
+ metadata={b"texera_type": b"LARGE_BINARY"},
+ ),
+ ]
+ )
+
+ # Create Arrow table with null value
+ arrow_table = pa.Table.from_pydict(
+ {
+ "large_binary_field": [None],
+ },
+ schema=arrow_schema,
+ )
+
+ # Create tuple from Arrow table
+ tuple_provider = ArrowTableTupleProvider(arrow_table)
+ tuples = [
+ Tuple({name: field_accessor for name in arrow_table.column_names})
+ for field_accessor in tuple_provider
+ ]
+
+ assert len(tuples) == 1
+ tuple_ = tuples[0]
+ assert tuple_["large_binary_field"] is None
diff --git a/amber/src/main/python/core/models/tuple.py
b/amber/src/main/python/core/models/tuple.py
index d6ae12862b..916301406f 100644
--- a/amber/src/main/python/core/models/tuple.py
+++ b/amber/src/main/python/core/models/tuple.py
@@ -29,6 +29,7 @@ from pympler import asizeof
from typing import Any, List, Iterator, Callable
from typing_extensions import Protocol, runtime_checkable
+from core.models.type.large_binary import largebinary
from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType
from .schema.field import Field
from .schema.schema import Schema
@@ -86,6 +87,7 @@ class ArrowTableTupleProvider:
"""
value =
self._table.column(field_name).chunks[chunk_idx][tuple_idx].as_py()
field_type = self._table.schema.field(field_name).type
+ field_metadata = self._table.schema.field(field_name).metadata
# for binary types, convert pickled objects back.
if (
@@ -94,6 +96,16 @@ class ArrowTableTupleProvider:
and value[:6] == b"pickle"
):
value = pickle.loads(value[10:])
+
+ # Convert URI string to largebinary for LARGE_BINARY types
+ # Metadata is set by Scala ArrowUtils or Python iceberg_utils
+ elif (
+ value is not None
+ and field_metadata
+ and field_metadata.get(b"texera_type") == b"LARGE_BINARY"
+ ):
+ value = largebinary(value)
+
return value
self._current_idx += 1
@@ -234,6 +246,27 @@ class Tuple:
def as_key_value_pairs(self) -> List[typing.Tuple[str, Field]]:
return [(k, v) for k, v in self.as_dict().items()]
+ def get_serialized_field(self, field_name: str) -> Field:
+ """
+ Get a field value serialized for Arrow table conversion.
+ For LARGE_BINARY fields, converts largebinary instances to URI strings.
+ For other fields, returns the value as-is.
+
+ :param field_name: field name
+ :return: field value (URI string for LARGE_BINARY fields with
largebinary values)
+ """
+ value = self[field_name]
+
+ # Convert largebinary to URI string for LARGE_BINARY fields when
schema available
+ if (
+ self._schema is not None
+ and self._schema.get_attr_type(field_name) ==
AttributeType.LARGE_BINARY
+ and isinstance(value, largebinary)
+ ):
+ return value.uri
+
+ return value
+
def get_field_names(self) -> typing.Tuple[str]:
return tuple(map(str, self._field_data.keys()))
diff --git a/amber/operator-requirements.txt
b/amber/src/main/python/core/models/type/__init__.py
similarity index 85%
copy from amber/operator-requirements.txt
copy to amber/src/main/python/core/models/type/__init__.py
index ce114ea841..41344433aa 100644
--- a/amber/operator-requirements.txt
+++ b/amber/src/main/python/core/models/type/__init__.py
@@ -15,11 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-wordcloud==1.9.3
-plotly==5.24.1
-praw==7.6.1
-pillow==10.2.0
-pybase64==1.3.2
-torch==2.8.0
-scikit-learn==1.5.0
-transformers==4.57.3
+from .large_binary import largebinary
+
+__all__ = ["largebinary"]
diff --git a/amber/src/main/python/core/models/type/large_binary.py
b/amber/src/main/python/core/models/type/large_binary.py
new file mode 100644
index 0000000000..581a688912
--- /dev/null
+++ b/amber/src/main/python/core/models/type/large_binary.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+largebinary represents a reference to a large object stored externally (e.g.,
S3).
+This is a schema type class used throughout the system for handling
+LARGE_BINARY attribute types.
+"""
+
+from typing import Optional
+from urllib.parse import urlparse
+
+
+class largebinary:
+ """
+ largebinary represents a reference to a large object stored in S3.
+
+ Each largebinary is identified by an S3 URI (s3://bucket/path/to/object).
+ largebinary objects are automatically tracked and cleaned up when the
workflow
+ execution completes.
+
+ Usage:
+ from pytexera import largebinary, LargeBinaryInputStream,
LargeBinaryOutputStream
+
+ # Create a new largebinary for writing
+ large_binary = largebinary()
+ with LargeBinaryOutputStream(large_binary) as out:
+ out.write(b"data")
+ # large_binary is now ready to be added to tuples
+
+ # Read from an existing largebinary
+ with LargeBinaryInputStream(large_binary) as stream:
+ content = stream.read()
+
+ # Create from existing URI (e.g., from deserialization)
+ large_binary = largebinary("s3://bucket/path/to/object")
+ """
+
+ def __init__(self, uri: Optional[str] = None):
+ """
+ Create a largebinary.
+
+ Args:
+ uri: Optional S3 URI in the format s3://bucket/path/to/object.
+ If None, creates a new largebinary with a unique S3 URI.
+
+ Raises:
+ ValueError: If URI is provided but doesn't start with "s3://"
+ """
+ if uri is None:
+ # Lazy import to avoid circular dependencies
+ from pytexera.storage import large_binary_manager
+
+ uri = large_binary_manager.create()
+
+ if not uri.startswith("s3://"):
+ raise ValueError(f"largebinary URI must start with 's3://', got:
{uri}")
+
+ self._uri = uri
+
+ @property
+ def uri(self) -> str:
+ """Get the S3 URI of this largebinary."""
+ return self._uri
+
+ def get_bucket_name(self) -> str:
+ """Get the S3 bucket name from the URI."""
+ return urlparse(self._uri).netloc
+
+ def get_object_key(self) -> str:
+ """Get the S3 object key (path) from the URI, without leading slash."""
+ return urlparse(self._uri).path.lstrip("/")
+
+ def __str__(self) -> str:
+ return self._uri
+
+ def __repr__(self) -> str:
+ return f"largebinary('{self._uri}')"
+
+ def __eq__(self, other) -> bool:
+ return isinstance(other, largebinary) and self._uri == other._uri
+
+ def __hash__(self) -> int:
+ return hash(self._uri)
diff --git a/amber/src/main/python/core/models/type/test_large_binary.py
b/amber/src/main/python/core/models/type/test_large_binary.py
new file mode 100644
index 0000000000..36310e1dd5
--- /dev/null
+++ b/amber/src/main/python/core/models/type/test_large_binary.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from unittest.mock import patch
+from core.models.type.large_binary import largebinary
+
+
+class TestLargeBinary:
+ def test_create_with_uri(self):
+ """Test creating largebinary with a valid S3 URI."""
+ uri = "s3://test-bucket/path/to/object"
+ large_binary = largebinary(uri)
+ assert large_binary.uri == uri
+ assert str(large_binary) == uri
+ assert repr(large_binary) == f"largebinary('{uri}')"
+
+ def test_create_without_uri(self):
+ """Test creating largebinary without URI (calls
large_binary_manager.create)."""
+ with patch("pytexera.storage.large_binary_manager.create") as
mock_create:
+ mock_create.return_value = "s3://bucket/objects/123/uuid"
+ large_binary = largebinary()
+ assert large_binary.uri == "s3://bucket/objects/123/uuid"
+ mock_create.assert_called_once()
+
+ def test_invalid_uri_raises_value_error(self):
+ """Test that invalid URI (not starting with s3://) raises
ValueError."""
+ with pytest.raises(ValueError, match="largebinary URI must start with
's3://'"):
+ largebinary("http://invalid-uri")
+
+ with pytest.raises(ValueError, match="largebinary URI must start with
's3://'"):
+ largebinary("invalid-uri")
+
+ def test_get_bucket_name(self):
+ """Test extracting bucket name from URI."""
+ large_binary = largebinary("s3://my-bucket/path/to/object")
+ assert large_binary.get_bucket_name() == "my-bucket"
+
+ def test_get_object_key(self):
+ """Test extracting object key from URI."""
+ large_binary = largebinary("s3://my-bucket/path/to/object")
+ assert large_binary.get_object_key() == "path/to/object"
+
+ def test_get_object_key_with_leading_slash(self):
+ """Test extracting object key when URI has leading slash."""
+ large_binary = largebinary("s3://my-bucket/path/to/object")
+ # urlparse includes leading slash, but get_object_key removes it
+ assert large_binary.get_object_key() == "path/to/object"
+
+ def test_equality(self):
+ """Test largebinary equality comparison."""
+ uri = "s3://bucket/path"
+ obj1 = largebinary(uri)
+ obj2 = largebinary(uri)
+ obj3 = largebinary("s3://bucket/different")
+
+ assert obj1 == obj2
+ assert obj1 != obj3
+ assert obj1 != "not a largebinary"
+
+ def test_hash(self):
+ """Test largebinary hashing."""
+ uri = "s3://bucket/path"
+ obj1 = largebinary(uri)
+ obj2 = largebinary(uri)
+
+ assert hash(obj1) == hash(obj2)
+ assert hash(obj1) == hash(uri)
+
+ def test_uri_property(self):
+ """Test URI property access."""
+ uri = "s3://test-bucket/test/path"
+ large_binary = largebinary(uri)
+ assert large_binary.uri == uri
diff --git a/amber/src/main/python/core/storage/document_factory.py
b/amber/src/main/python/core/storage/document_factory.py
index ba15069817..9b686ab66b 100644
--- a/amber/src/main/python/core/storage/document_factory.py
+++ b/amber/src/main/python/core/storage/document_factory.py
@@ -25,6 +25,7 @@ from core.storage.iceberg.iceberg_catalog_instance import
IcebergCatalogInstance
from core.storage.iceberg.iceberg_document import IcebergDocument
from core.storage.iceberg.iceberg_utils import (
create_table,
+ amber_schema_to_iceberg_schema,
amber_tuples_to_arrow_table,
arrow_table_to_amber_tuples,
load_table_metadata,
@@ -63,7 +64,9 @@ class DocumentFactory:
if resource_type in {VFSResourceType.RESULT}:
storage_key = DocumentFactory.sanitize_uri_path(parsed_uri)
- iceberg_schema = Schema.as_arrow_schema(schema)
+ # Convert Amber Schema to Iceberg Schema with LARGE_BINARY
+ # field name encoding
+ iceberg_schema = amber_schema_to_iceberg_schema(schema)
create_table(
IcebergCatalogInstance.get_instance(),
diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
index 12c841ed71..9e17b2e0e8 100644
--- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
+++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py
@@ -25,9 +25,103 @@ from pyiceberg.partitioning import
UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from typing import Optional, Iterable
+from pyiceberg import types as iceberg_types
import core
+import core.models
from core.models import ArrowTableTupleProvider, Tuple
+from core.models.schema.attribute_type import AttributeType, TO_ARROW_MAPPING
+
+# Suffix used to encode LARGE_BINARY fields in Iceberg (must match Scala
IcebergUtil)
+LARGE_BINARY_FIELD_SUFFIX = "__texera_large_binary_ptr"
+
+# Type mappings
+_ICEBERG_TO_AMBER_TYPE_MAPPING = {
+ "string": "STRING",
+ "int": "INT",
+ "integer": "INT",
+ "long": "LONG",
+ "double": "DOUBLE",
+ "float": "DOUBLE",
+ "boolean": "BOOL",
+ "timestamp": "TIMESTAMP",
+ "binary": "BINARY",
+}
+
+_AMBER_TO_ICEBERG_TYPE_MAPPING = {
+ AttributeType.STRING: iceberg_types.StringType(),
+ AttributeType.INT: iceberg_types.IntegerType(),
+ AttributeType.LONG: iceberg_types.LongType(),
+ AttributeType.DOUBLE: iceberg_types.DoubleType(),
+ AttributeType.BOOL: iceberg_types.BooleanType(),
+ AttributeType.TIMESTAMP: iceberg_types.TimestampType(),
+ AttributeType.BINARY: iceberg_types.BinaryType(),
+ AttributeType.LARGE_BINARY: iceberg_types.StringType(),
+}
+
+
+def encode_large_binary_field_name(field_name: str, attr_type) -> str:
+ """Encodes LARGE_BINARY field names with suffix for Iceberg storage."""
+ if attr_type == AttributeType.LARGE_BINARY:
+ return f"{field_name}{LARGE_BINARY_FIELD_SUFFIX}"
+ return field_name
+
+
+def decode_large_binary_field_name(field_name: str) -> str:
+ """Decodes field names by removing LARGE_BINARY suffix if present."""
+ if field_name.endswith(LARGE_BINARY_FIELD_SUFFIX):
+ return field_name[: -len(LARGE_BINARY_FIELD_SUFFIX)]
+ return field_name
+
+
+def iceberg_schema_to_amber_schema(iceberg_schema: Schema):
+ """
+ Converts PyIceberg Schema to Amber Schema.
+ Decodes LARGE_BINARY field names and adds Arrow metadata.
+ """
+ arrow_fields = []
+ for field in iceberg_schema.fields:
+ decoded_name = decode_large_binary_field_name(field.name)
+ is_large_binary = field.name != decoded_name
+
+ if is_large_binary:
+ attr_type = AttributeType.LARGE_BINARY
+ else:
+ iceberg_type_str = str(field.field_type).lower()
+ attr_type_name = _ICEBERG_TO_AMBER_TYPE_MAPPING.get(
+ iceberg_type_str, "STRING"
+ )
+ attr_type = getattr(AttributeType, attr_type_name)
+
+ arrow_fields.append(
+ pa.field(
+ decoded_name,
+ TO_ARROW_MAPPING[attr_type],
+ metadata={b"texera_type": b"LARGE_BINARY"} if is_large_binary
else None,
+ )
+ )
+
+ return core.models.Schema(pa.schema(arrow_fields))
+
+
+def amber_schema_to_iceberg_schema(amber_schema) -> Schema:
+ """
+ Converts Amber Schema to PyIceberg Schema.
+ Encodes LARGE_BINARY field names with suffix.
+ """
+ fields = [
+ iceberg_types.NestedField(
+ field_id=idx,
+ name=encode_large_binary_field_name(field_name, attr_type),
+ field_type=_AMBER_TO_ICEBERG_TYPE_MAPPING[attr_type],
+ required=False,
+ )
+ for idx, (field_name, attr_type) in enumerate(
+ amber_schema._name_type_mapping.items(), start=1
+ )
+ ]
+
+ return Schema(*fields)
def create_postgres_catalog(
@@ -135,27 +229,47 @@ def amber_tuples_to_arrow_table(
) -> pa.Table:
"""
Converts a list of amber tuples to a pyarrow table for serialization.
+ Handles LARGE_BINARY field name encoding and serialization.
"""
- return pa.Table.from_pydict(
- {
- name: [t[name] for t in tuple_list]
- for name in iceberg_schema.as_arrow().names
- },
- schema=iceberg_schema.as_arrow(),
- )
+ from core.models.type.large_binary import largebinary
+
+ tuple_list = list(tuple_list) # Convert to list to allow multiple
iterations
+ data_dict = {}
+ for encoded_name in iceberg_schema.as_arrow().names:
+ decoded_name = decode_large_binary_field_name(encoded_name)
+ data_dict[encoded_name] = [
+ (
+ t[decoded_name].uri
+ if isinstance(t[decoded_name], largebinary)
+ else t[decoded_name]
+ )
+ for t in tuple_list
+ ]
+
+ return pa.Table.from_pydict(data_dict, schema=iceberg_schema.as_arrow())
def arrow_table_to_amber_tuples(
iceberg_schema: Schema, arrow_table: pa.Table
) -> Iterable[Tuple]:
"""
- Converts an arrow table to a list of amber tuples for deserialization.
+ Converts an arrow table read from Iceberg to Amber tuples.
+ Properly handles LARGE_BINARY field name decoding and type detection.
"""
- tuple_provider = ArrowTableTupleProvider(arrow_table)
+ amber_schema = iceberg_schema_to_amber_schema(iceberg_schema)
+ arrow_table_with_metadata = pa.Table.from_arrays(
+ [arrow_table.column(name) for name in arrow_table.column_names],
+ schema=amber_schema.as_arrow_schema(),
+ )
+
+ tuple_provider = ArrowTableTupleProvider(arrow_table_with_metadata)
return (
Tuple(
- {name: field_accessor for name in arrow_table.column_names},
- schema=core.models.Schema(iceberg_schema.as_arrow()),
+ {
+ decode_large_binary_field_name(name): field_accessor
+ for name in arrow_table.column_names
+ },
+ schema=amber_schema,
)
for field_accessor in tuple_provider
)
diff --git
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index ad3f49067e..34711beb65 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -43,6 +43,10 @@ StorageConfig.initialize(
table_result_namespace="operator-port-result",
directory_path="../../../../../../amber/user-resources/workflow-results",
commit_batch_size=4096,
+ s3_endpoint="http://localhost:9000",
+ s3_region="us-east-1",
+ s3_auth_username="minioadmin",
+ s3_auth_password="minioadmin",
)
diff --git
a/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py
b/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py
new file mode 100644
index 0000000000..c601d4f719
--- /dev/null
+++
b/amber/src/main/python/core/storage/iceberg/test_iceberg_utils_large_binary.py
@@ -0,0 +1,230 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+from pyiceberg import types as iceberg_types
+from pyiceberg.schema import Schema as IcebergSchema
+from core.models import Schema, Tuple
+from core.models.schema.attribute_type import AttributeType
+from core.models.type.large_binary import largebinary
+from core.storage.iceberg.iceberg_utils import (
+ encode_large_binary_field_name,
+ decode_large_binary_field_name,
+ iceberg_schema_to_amber_schema,
+ amber_schema_to_iceberg_schema,
+ amber_tuples_to_arrow_table,
+ arrow_table_to_amber_tuples,
+)
+
+
+class TestIcebergUtilsLargeBinary:
+ def test_encode_large_binary_field_name(self):
+ """Test encoding LARGE_BINARY field names with suffix."""
+ assert (
+ encode_large_binary_field_name("my_field",
AttributeType.LARGE_BINARY)
+ == "my_field__texera_large_binary_ptr"
+ )
+ assert (
+ encode_large_binary_field_name("my_field", AttributeType.STRING)
+ == "my_field"
+ )
+
+ def test_decode_large_binary_field_name(self):
+ """Test decoding LARGE_BINARY field names by removing suffix."""
+ assert (
+ decode_large_binary_field_name("my_field__texera_large_binary_ptr")
+ == "my_field"
+ )
+ assert decode_large_binary_field_name("my_field") == "my_field"
+ assert decode_large_binary_field_name("regular_field") ==
"regular_field"
+
+ def test_amber_schema_to_iceberg_schema_with_large_binary(self):
+ """Test converting Amber schema with LARGE_BINARY to Iceberg schema."""
+ amber_schema = Schema()
+ amber_schema.add("regular_field", AttributeType.STRING)
+ amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY)
+ amber_schema.add("int_field", AttributeType.INT)
+
+ iceberg_schema = amber_schema_to_iceberg_schema(amber_schema)
+
+ # Check field names are encoded
+ field_names = [field.name for field in iceberg_schema.fields]
+ assert "regular_field" in field_names
+ assert "large_binary_field__texera_large_binary_ptr" in field_names
+ assert "int_field" in field_names
+
+ # Check types
+ large_binary_field = next(
+ f for f in iceberg_schema.fields if "large_binary" in f.name
+ )
+ assert isinstance(large_binary_field.field_type,
iceberg_types.StringType)
+
+ def test_iceberg_schema_to_amber_schema_with_large_binary(self):
+ """Test converting Iceberg schema with LARGE_BINARY to Amber schema."""
+ iceberg_schema = IcebergSchema(
+ iceberg_types.NestedField(
+ 1, "regular_field", iceberg_types.StringType(), required=False
+ ),
+ iceberg_types.NestedField(
+ 2,
+ "large_binary_field__texera_large_binary_ptr",
+ iceberg_types.StringType(),
+ required=False,
+ ),
+ iceberg_types.NestedField(
+ 3, "int_field", iceberg_types.IntegerType(), required=False
+ ),
+ )
+
+ amber_schema = iceberg_schema_to_amber_schema(iceberg_schema)
+
+ assert amber_schema.get_attr_type("regular_field") ==
AttributeType.STRING
+ assert (
+ amber_schema.get_attr_type("large_binary_field")
+ == AttributeType.LARGE_BINARY
+ )
+ assert amber_schema.get_attr_type("int_field") == AttributeType.INT
+
+ # Check Arrow schema has metadata for LARGE_BINARY
+ arrow_schema = amber_schema.as_arrow_schema()
+ large_binary_field = arrow_schema.field("large_binary_field")
+ assert large_binary_field.metadata is not None
+ assert large_binary_field.metadata.get(b"texera_type") ==
b"LARGE_BINARY"
+
+ def test_amber_tuples_to_arrow_table_with_large_binary(self):
+ """Test converting Amber tuples with largebinary to Arrow table."""
+ amber_schema = Schema()
+ amber_schema.add("regular_field", AttributeType.STRING)
+ amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY)
+
+ large_binary1 = largebinary("s3://bucket/path1")
+ large_binary2 = largebinary("s3://bucket/path2")
+
+ tuples = [
+ Tuple(
+ {"regular_field": "value1", "large_binary_field":
large_binary1},
+ schema=amber_schema,
+ ),
+ Tuple(
+ {"regular_field": "value2", "large_binary_field":
large_binary2},
+ schema=amber_schema,
+ ),
+ ]
+
+ iceberg_schema = amber_schema_to_iceberg_schema(amber_schema)
+ arrow_table = amber_tuples_to_arrow_table(iceberg_schema, tuples)
+
+ # Check that largebinary values are converted to URI strings
+ regular_values = arrow_table.column("regular_field").to_pylist()
+ large_binary_values = arrow_table.column(
+ "large_binary_field__texera_large_binary_ptr"
+ ).to_pylist()
+
+ assert regular_values == ["value1", "value2"]
+ assert large_binary_values == ["s3://bucket/path1",
"s3://bucket/path2"]
+
+ def test_arrow_table_to_amber_tuples_with_large_binary(self):
+ """Test converting Arrow table with LARGE_BINARY to Amber tuples."""
+ # Create Iceberg schema with encoded field name
+ iceberg_schema = IcebergSchema(
+ iceberg_types.NestedField(
+ 1, "regular_field", iceberg_types.StringType(), required=False
+ ),
+ iceberg_types.NestedField(
+ 2,
+ "large_binary_field__texera_large_binary_ptr",
+ iceberg_types.StringType(),
+ required=False,
+ ),
+ )
+
+ # Create Arrow table with URI strings
+ arrow_table = pa.Table.from_pydict(
+ {
+ "regular_field": ["value1", "value2"],
+ "large_binary_field__texera_large_binary_ptr": [
+ "s3://bucket/path1",
+ "s3://bucket/path2",
+ ],
+ }
+ )
+
+ tuples = list(arrow_table_to_amber_tuples(iceberg_schema, arrow_table))
+
+ assert len(tuples) == 2
+ assert tuples[0]["regular_field"] == "value1"
+ assert isinstance(tuples[0]["large_binary_field"], largebinary)
+ assert tuples[0]["large_binary_field"].uri == "s3://bucket/path1"
+
+ assert tuples[1]["regular_field"] == "value2"
+ assert isinstance(tuples[1]["large_binary_field"], largebinary)
+ assert tuples[1]["large_binary_field"].uri == "s3://bucket/path2"
+
+ def test_round_trip_large_binary_tuples(self):
+ """Test round-trip conversion of tuples with largebinary."""
+ amber_schema = Schema()
+ amber_schema.add("regular_field", AttributeType.STRING)
+ amber_schema.add("large_binary_field", AttributeType.LARGE_BINARY)
+
+ large_binary = largebinary("s3://bucket/path/to/object")
+ original_tuples = [
+ Tuple(
+ {"regular_field": "value1", "large_binary_field":
large_binary},
+ schema=amber_schema,
+ ),
+ ]
+
+ # Convert to Iceberg and Arrow
+ iceberg_schema = amber_schema_to_iceberg_schema(amber_schema)
+ arrow_table = amber_tuples_to_arrow_table(iceberg_schema,
original_tuples)
+
+ # Convert back to Amber tuples
+ retrieved_tuples = list(
+ arrow_table_to_amber_tuples(iceberg_schema, arrow_table)
+ )
+
+ assert len(retrieved_tuples) == 1
+ assert retrieved_tuples[0]["regular_field"] == "value1"
+ assert isinstance(retrieved_tuples[0]["large_binary_field"],
largebinary)
+ assert retrieved_tuples[0]["large_binary_field"].uri ==
large_binary.uri
+
+ def test_arrow_table_to_amber_tuples_with_null_large_binary(self):
+ """Test converting Arrow table with null largebinary values."""
+ iceberg_schema = IcebergSchema(
+ iceberg_types.NestedField(
+ 1, "regular_field", iceberg_types.StringType(), required=False
+ ),
+ iceberg_types.NestedField(
+ 2,
+ "large_binary_field__texera_large_binary_ptr",
+ iceberg_types.StringType(),
+ required=False,
+ ),
+ )
+
+ arrow_table = pa.Table.from_pydict(
+ {
+ "regular_field": ["value1"],
+ "large_binary_field__texera_large_binary_ptr": [None],
+ }
+ )
+
+ tuples = list(arrow_table_to_amber_tuples(iceberg_schema, arrow_table))
+
+ assert len(tuples) == 1
+ assert tuples[0]["regular_field"] == "value1"
+ assert tuples[0]["large_binary_field"] is None
diff --git a/amber/src/main/python/core/storage/storage_config.py
b/amber/src/main/python/core/storage/storage_config.py
index 9a03aa713f..c55495ea14 100644
--- a/amber/src/main/python/core/storage/storage_config.py
+++ b/amber/src/main/python/core/storage/storage_config.py
@@ -32,6 +32,12 @@ class StorageConfig:
ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None
ICEBERG_TABLE_COMMIT_BATCH_SIZE = None
+ # S3 configs (for large_binary_manager module)
+ S3_ENDPOINT = None
+ S3_REGION = None
+ S3_AUTH_USERNAME = None
+ S3_AUTH_PASSWORD = None
+
@classmethod
def initialize(
cls,
@@ -41,10 +47,14 @@ class StorageConfig:
table_result_namespace,
directory_path,
commit_batch_size,
+ s3_endpoint,
+ s3_region,
+ s3_auth_username,
+ s3_auth_password,
):
if cls._initialized:
raise RuntimeError(
- "Storage config has already been initializedand cannot be
modified."
+ "Storage config has already been initialized and cannot be
modified."
)
cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME =
postgres_uri_without_scheme
@@ -53,6 +63,13 @@ class StorageConfig:
cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace
cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path
cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size)
+
+ # S3 configs
+ cls.S3_ENDPOINT = s3_endpoint
+ cls.S3_REGION = s3_region
+ cls.S3_AUTH_USERNAME = s3_auth_username
+ cls.S3_AUTH_PASSWORD = s3_auth_password
+
cls._initialized = True
def __new__(cls, *args, **kwargs):
diff --git a/amber/src/main/python/pytexera/__init__.py
b/amber/src/main/python/pytexera/__init__.py
index db78319f49..e40d1a43fe 100644
--- a/amber/src/main/python/pytexera/__init__.py
+++ b/amber/src/main/python/pytexera/__init__.py
@@ -21,12 +21,15 @@ from typing import Iterator, Optional, Union
from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
+from .storage.large_binary_input_stream import LargeBinaryInputStream
+from .storage.large_binary_output_stream import LargeBinaryOutputStream
from .udf.udf_operator import (
UDFOperatorV2,
UDFTableOperator,
UDFBatchOperator,
UDFSourceOperator,
)
+from core.models.type.large_binary import largebinary
__all__ = [
"State",
@@ -41,6 +44,9 @@ __all__ = [
"UDFBatchOperator",
"UDFSourceOperator",
"DatasetFileDocument",
+ "largebinary",
+ "LargeBinaryInputStream",
+ "LargeBinaryOutputStream",
# export external tools to be used
"overrides",
"logger",
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
new file mode 100644
index 0000000000..8e7d864040
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/large_binary_input_stream.py
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+LargeBinaryInputStream for reading largebinary data from S3.
+
+Usage:
+ with LargeBinaryInputStream(large_binary) as stream:
+ content = stream.read()
+"""
+
+from typing import BinaryIO, Optional
+from functools import wraps
+from io import IOBase
+from core.models.type.large_binary import largebinary
+
+
+def _require_open(func):
+ """Decorator to ensure stream is open before reading operations."""
+
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ if self._closed:
+ raise ValueError("I/O operation on closed stream")
+ if self._underlying is None:
+ self._lazy_init()
+ return func(self, *args, **kwargs)
+
+ return wrapper
+
+
+class LargeBinaryInputStream(IOBase):
+ """
+ InputStream for reading largebinary data from S3.
+
+ Lazily downloads from S3 on first read. Supports context manager and
iteration.
+ """
+
+ def __init__(self, large_binary: largebinary):
+ """Initialize stream for reading the given largebinary."""
+ super().__init__()
+ if large_binary is None:
+ raise ValueError("largebinary cannot be None")
+ self._large_binary = large_binary
+ self._underlying: Optional[BinaryIO] = None
+ self._closed = False
+
+ def _lazy_init(self):
+ """Download from S3 on first read operation."""
+ from pytexera.storage import large_binary_manager
+
+ s3 = large_binary_manager._get_s3_client()
+ response = s3.get_object(
+ Bucket=self._large_binary.get_bucket_name(),
+ Key=self._large_binary.get_object_key(),
+ )
+ self._underlying = response["Body"]
+
+ @_require_open
+ def read(self, n: int = -1) -> bytes:
+ """Read and return up to n bytes (-1 reads all)."""
+ return self._underlying.read(n)
+
+ @_require_open
+ def readline(self, size: int = -1) -> bytes:
+ """Read and return one line from the stream."""
+ return self._underlying.readline(size)
+
+ @_require_open
+ def readlines(self, hint: int = -1) -> list[bytes]:
+ """Read and return a list of lines from the stream."""
+ return self._underlying.readlines(hint)
+
+ def readable(self) -> bool:
+ """Return True if the stream can be read from."""
+ return not self._closed
+
+ def seekable(self) -> bool:
+ """Return False - this stream does not support seeking."""
+ return False
+
+ @property
+ def closed(self) -> bool:
+ """Return True if the stream is closed."""
+ return self._closed
+
+ def close(self) -> None:
+ """Close the stream and release resources."""
+ if not self._closed:
+ self._closed = True
+ if self._underlying is not None:
+ self._underlying.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> bytes:
+ line = self.readline()
+ if not line:
+ raise StopIteration
+ return line
diff --git a/amber/src/main/python/pytexera/storage/large_binary_manager.py
b/amber/src/main/python/pytexera/storage/large_binary_manager.py
new file mode 100644
index 0000000000..e061eac622
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/large_binary_manager.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Internal largebinary manager for S3 operations.
+
+Users should not interact with this module directly. Use largebinary()
constructor
+and LargeBinaryInputStream/LargeBinaryOutputStream instead.
+"""
+
+import time
+import uuid
+from loguru import logger
+from core.storage.storage_config import StorageConfig
+
+# Module-level state
+_s3_client = None
+DEFAULT_BUCKET = "texera-large-binaries"
+
+
+def _get_s3_client():
+ """Get or initialize S3 client (lazy initialization, cached)."""
+ global _s3_client
+ if _s3_client is None:
+ try:
+ import boto3
+ from botocore.config import Config
+ except ImportError as e:
+ raise RuntimeError("boto3 required. Install with: pip install
boto3") from e
+
+ _s3_client = boto3.client(
+ "s3",
+ endpoint_url=StorageConfig.S3_ENDPOINT,
+ aws_access_key_id=StorageConfig.S3_AUTH_USERNAME,
+ aws_secret_access_key=StorageConfig.S3_AUTH_PASSWORD,
+ region_name=StorageConfig.S3_REGION,
+ config=Config(signature_version="s3v4", s3={"addressing_style":
"path"}),
+ )
+ return _s3_client
+
+
+def _ensure_bucket_exists(bucket: str):
+ """Ensure S3 bucket exists, creating it if necessary."""
+ s3 = _get_s3_client()
+ try:
+ s3.head_bucket(Bucket=bucket)
+ except s3.exceptions.NoSuchBucket:
+ logger.debug(f"Bucket {bucket} not found, creating it")
+ s3.create_bucket(Bucket=bucket)
+ logger.info(f"Created bucket: {bucket}")
+
+
+def create() -> str:
+ """
+ Creates a new largebinary reference with a unique S3 URI.
+
+ Returns:
+ S3 URI string (format: s3://bucket/key)
+ """
+ _ensure_bucket_exists(DEFAULT_BUCKET)
+ timestamp_ms = int(time.time() * 1000)
+ unique_id = uuid.uuid4()
+ object_key = f"objects/{timestamp_ms}/{unique_id}"
+ return f"s3://{DEFAULT_BUCKET}/{object_key}"
diff --git
a/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
new file mode 100644
index 0000000000..af4f1a275c
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/large_binary_output_stream.py
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+LargeBinaryOutputStream for streaming largebinary data to S3.
+
+Usage:
+ from pytexera import largebinary, LargeBinaryOutputStream
+
+ large_binary = largebinary()
+ with LargeBinaryOutputStream(large_binary) as out:
+ out.write(b"data")
+"""
+
+from typing import Optional, Union
+from io import IOBase
+from core.models.type.large_binary import largebinary
+from pytexera.storage import large_binary_manager
+import threading
+import queue
+
+# Constants
+_CHUNK_SIZE = 64 * 1024 # 64KB
+_QUEUE_TIMEOUT = 0.1
+
+
+class _QueueReader:
+ """File-like object that reads from a queue."""
+
+ def __init__(self, q: queue.Queue):
+ self._queue = q
+ self._buffer = b""
+ self._eof = False
+
+ def read(self, size=-1):
+ """Read bytes from the queue."""
+ if self._eof and not self._buffer:
+ return b""
+
+ # Collect chunks until we have enough data or reach EOF
+ chunks = [self._buffer] if self._buffer else []
+ total_size = len(self._buffer)
+ self._buffer = b""
+ needed = size if size != -1 else None
+
+ while not self._eof and (needed is None or total_size < needed):
+ try:
+ chunk = self._queue.get(timeout=_QUEUE_TIMEOUT)
+ if chunk is None: # EOF marker
+ self._eof = True
+ break
+ chunks.append(chunk)
+ total_size += len(chunk)
+ except queue.Empty:
+ continue
+
+ result = b"".join(chunks)
+
+ # If size was specified, split and buffer remainder
+ if needed is not None and len(result) > needed:
+ self._buffer = result[needed:]
+ result = result[:needed]
+
+ return result
+
+
+class LargeBinaryOutputStream(IOBase):
+ """
+ OutputStream for streaming largebinary data to S3.
+
+ Data is uploaded in the background using multipart upload as you write.
+ Call close() to complete the upload and ensure all data is persisted.
+
+ This class follows Python's standard I/O interface (io.IOBase).
+
+ Usage:
+ from pytexera import largebinary, LargeBinaryOutputStream
+
+ # Create a new largebinary and write to it
+ large_binary = largebinary()
+ with LargeBinaryOutputStream(large_binary) as out:
+ out.write(b"Hello, World!")
+ out.write(b"More data")
+ # large_binary is now ready to be added to tuples
+
+ Note: Not thread-safe. Do not access from multiple threads concurrently.
+ """
+
+ def __init__(self, large_binary: largebinary):
+ """
+ Initialize a LargeBinaryOutputStream.
+
+ Args:
+ large_binary: The largebinary reference to write to
+
+ Raises:
+ ValueError: If large_binary is None
+ """
+ super().__init__()
+ if large_binary is None:
+ raise ValueError("largebinary cannot be None")
+
+ self._large_binary = large_binary
+ self._bucket_name = large_binary.get_bucket_name()
+ self._object_key = large_binary.get_object_key()
+ self._closed = False
+
+ # Background upload thread state
+ self._queue: queue.Queue = queue.Queue(maxsize=_CHUNK_SIZE)
+ self._upload_exception: Optional[Exception] = None
+ self._upload_complete = threading.Event()
+ self._upload_thread: Optional[threading.Thread] = None
+ self._lock = threading.Lock()
+
+ def write(self, b: Union[bytes, bytearray]) -> int:
+ """
+ Write bytes to the stream.
+
+ Args:
+ b: Bytes to write
+
+ Returns:
+ Number of bytes written
+
+ Raises:
+ ValueError: If stream is closed
+ IOError: If previous upload failed
+ """
+ if self._closed:
+ raise ValueError("I/O operation on closed stream")
+
+ # Check if upload has failed
+ with self._lock:
+ if self._upload_exception is not None:
+ raise IOError(
+ f"Background upload failed: {self._upload_exception}"
+ ) from self._upload_exception
+
+ # Start upload thread on first write
+ if self._upload_thread is None:
+
+ def upload_worker():
+ try:
+
large_binary_manager._ensure_bucket_exists(self._bucket_name)
+ s3 = large_binary_manager._get_s3_client()
+ reader = _QueueReader(self._queue)
+ s3.upload_fileobj(reader, self._bucket_name,
self._object_key)
+ except Exception as e:
+ with self._lock:
+ self._upload_exception = e
+ finally:
+ self._upload_complete.set()
+
+ self._upload_thread = threading.Thread(target=upload_worker,
daemon=True)
+ self._upload_thread.start()
+
+ # Write data in chunks
+ data = bytes(b)
+ for offset in range(0, len(data), _CHUNK_SIZE):
+ self._queue.put(data[offset : offset + _CHUNK_SIZE], block=True)
+
+ return len(data)
+
+ def writable(self) -> bool:
+ """Return True if the stream can be written to."""
+ return not self._closed
+
+ def seekable(self) -> bool:
+ """Return False - this stream does not support seeking."""
+ return False
+
+ @property
+ def closed(self) -> bool:
+ """Return True if the stream is closed."""
+ return self._closed
+
+ def flush(self) -> None:
+ """
+ Flush the write buffer.
+
+ Note: This doesn't guarantee data is uploaded to S3 yet.
+ Call close() to ensure upload completion.
+ """
+ # No-op: data is already being consumed by the upload thread
+ pass
+
+ def close(self) -> None:
+ """
+ Close the stream and complete the S3 upload.
+ Blocks until upload is complete. Raises IOError if upload failed.
+
+ Raises:
+ IOError: If upload failed
+ """
+ if self._closed:
+ return
+
+ self._closed = True
+
+ # Signal EOF to upload thread and wait for completion
+ if self._upload_thread is not None:
+ self._queue.put(None, block=True) # EOF marker
+ self._upload_thread.join()
+ self._upload_complete.wait()
+
+ # Check for errors and cleanup if needed
+ with self._lock:
+ exception = self._upload_exception
+
+ if exception is not None:
+ self._cleanup_failed_upload()
+ raise IOError(f"Failed to complete upload: {exception}") from
exception
+
+ def _cleanup_failed_upload(self):
+ """Clean up a failed upload by deleting the S3 object."""
+ try:
+ s3 = large_binary_manager._get_s3_client()
+ s3.delete_object(Bucket=self._bucket_name, Key=self._object_key)
+ except Exception:
+ # Ignore cleanup errors - we're already handling an upload failure
+ pass
+
+ def __enter__(self):
+ """Context manager entry."""
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Context manager exit - automatically cleanup."""
+ self.close()
+ return False
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
new file mode 100644
index 0000000000..85bdbd13fa
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_input_stream.py
@@ -0,0 +1,222 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from unittest.mock import patch, MagicMock
+from io import BytesIO
+from core.models.type.large_binary import largebinary
+from pytexera.storage.large_binary_input_stream import LargeBinaryInputStream
+from pytexera.storage import large_binary_manager
+
+
+class TestLargeBinaryInputStream:
+ @pytest.fixture
+ def large_binary(self):
+ """Create a test largebinary."""
+ return largebinary("s3://test-bucket/path/to/object")
+
+ @pytest.fixture
+ def mock_s3_response(self):
+ """Create a mock S3 response with a BytesIO body."""
+ return {"Body": BytesIO(b"test data content")}
+
+ def test_init_with_valid_large_binary(self, large_binary):
+ """Test initialization with a valid largebinary."""
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ assert stream._large_binary == large_binary
+ assert stream._underlying is None
+ assert not stream._closed
+ finally:
+ stream.close()
+
+ def test_init_with_none_raises_error(self):
+ """Test that initializing with None raises ValueError."""
+ with pytest.raises(ValueError, match="largebinary cannot be None"):
+ LargeBinaryInputStream(None)
+
+ def test_lazy_init_downloads_from_s3(self, large_binary, mock_s3_response):
+ """Test that _lazy_init downloads from S3 on first read."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ assert stream._underlying is None # Not initialized yet
+
+ # Trigger lazy init by reading
+ data = stream.read()
+ assert data == b"test data content"
+ assert stream._underlying is not None
+
+ # Verify S3 was called correctly
+ mock_s3_client.get_object.assert_called_once_with(
+ Bucket="test-bucket", Key="path/to/object"
+ )
+ finally:
+ stream.close()
+
+ def test_read_all(self, large_binary, mock_s3_response):
+ """Test reading all data."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ data = stream.read()
+ assert data == b"test data content"
+ finally:
+ stream.close()
+
+ def test_read_partial(self, large_binary, mock_s3_response):
+ """Test reading partial data."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ data = stream.read(4)
+ assert data == b"test"
+ finally:
+ stream.close()
+
+ def test_readline(self, large_binary):
+ """Test reading a line."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ response = {"Body": BytesIO(b"line1\nline2\nline3")}
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ line = stream.readline()
+ assert line == b"line1\n"
+ finally:
+ stream.close()
+
+ def test_readlines(self, large_binary):
+ """Test reading all lines."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ response = {"Body": BytesIO(b"line1\nline2\nline3")}
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ lines = stream.readlines()
+ assert lines == [b"line1\n", b"line2\n", b"line3"]
+ finally:
+ stream.close()
+
+ def test_readable(self, large_binary):
+ """Test readable() method."""
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ assert stream.readable() is True
+
+ stream.close()
+ assert stream.readable() is False
+ finally:
+ if not stream._closed:
+ stream.close()
+
+ def test_seekable(self, large_binary):
+ """Test seekable() method (should always return False)."""
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ assert stream.seekable() is False
+ finally:
+ stream.close()
+
+ def test_closed_property(self, large_binary):
+ """Test closed property."""
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ assert stream.closed is False
+
+ stream.close()
+ assert stream.closed is True
+ finally:
+ if not stream._closed:
+ stream.close()
+
+ def test_close(self, large_binary, mock_s3_response):
+ """Test closing the stream."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ stream.read(1) # Trigger lazy init
+ assert stream._underlying is not None
+
+ stream.close()
+ assert stream._closed is True
+ assert stream._underlying.closed
+
+ def test_context_manager(self, large_binary, mock_s3_response):
+ """Test using as context manager."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ with LargeBinaryInputStream(large_binary) as stream:
+ data = stream.read()
+ assert data == b"test data content"
+ assert not stream._closed
+
+ # Stream should be closed after context exit
+ assert stream._closed
+
+ def test_iteration(self, large_binary):
+ """Test iteration over lines."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ response = {"Body": BytesIO(b"line1\nline2\nline3")}
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ try:
+ lines = list(stream)
+ assert lines == [b"line1\n", b"line2\n", b"line3"]
+ finally:
+ stream.close()
+
+ def test_read_after_close_raises_error(self, large_binary,
mock_s3_response):
+ """Test that reading after close raises ValueError."""
+ with patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client:
+ mock_s3_client = MagicMock()
+ mock_s3_client.get_object.return_value = mock_s3_response
+ mock_get_s3_client.return_value = mock_s3_client
+
+ stream = LargeBinaryInputStream(large_binary)
+ stream.close()
+
+ with pytest.raises(ValueError, match="I/O operation on closed
stream"):
+ stream.read()
+ # Stream is already closed, no need to close again
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
new file mode 100644
index 0000000000..a657f244f3
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from unittest.mock import patch, MagicMock
+from pytexera.storage import large_binary_manager
+from core.storage.storage_config import StorageConfig
+
+
+class TestLargeBinaryManager:
+ @pytest.fixture(autouse=True)
+ def setup_storage_config(self):
+ """Initialize StorageConfig for tests."""
+ if not StorageConfig._initialized:
+ StorageConfig.initialize(
+ postgres_uri_without_scheme="localhost:5432/test",
+ postgres_username="test",
+ postgres_password="test",
+ table_result_namespace="test",
+ directory_path="/tmp/test",
+ commit_batch_size=1000,
+ s3_endpoint="http://localhost:9000",
+ s3_region="us-east-1",
+ s3_auth_username="minioadmin",
+ s3_auth_password="minioadmin",
+ )
+
+ def test_get_s3_client_initializes_once(self):
+ """Test that S3 client is initialized and cached."""
+ # Reset the client
+ large_binary_manager._s3_client = None
+
+ with patch("boto3.client") as mock_boto3_client:
+ mock_client = MagicMock()
+ mock_boto3_client.return_value = mock_client
+
+ # First call should create client
+ client1 = large_binary_manager._get_s3_client()
+ assert client1 == mock_client
+ assert mock_boto3_client.call_count == 1
+
+ # Second call should return cached client
+ client2 = large_binary_manager._get_s3_client()
+ assert client2 == mock_client
+ assert mock_boto3_client.call_count == 1 # Still 1, not 2
+
+ def test_get_s3_client_without_boto3_raises_error(self):
+ """Test that missing boto3 raises RuntimeError."""
+ large_binary_manager._s3_client = None
+
+ import sys
+
+ # Temporarily remove boto3 from sys.modules to simulate it not being
installed
+ boto3_backup = sys.modules.pop("boto3", None)
+ try:
+ # Mock the import to raise ImportError
+ original_import = __import__
+
+ def mock_import(name, *args, **kwargs):
+ if name == "boto3":
+ raise ImportError("No module named boto3")
+ return original_import(name, *args, **kwargs)
+
+ with patch("builtins.__import__", side_effect=mock_import):
+ with pytest.raises(RuntimeError, match="boto3 required"):
+ large_binary_manager._get_s3_client()
+ finally:
+ # Restore boto3 if it was there
+ if boto3_backup is not None:
+ sys.modules["boto3"] = boto3_backup
+
+ def test_ensure_bucket_exists_when_bucket_exists(self):
+ """Test that existing bucket doesn't trigger creation."""
+ large_binary_manager._s3_client = None
+
+ with patch("boto3.client") as mock_boto3_client:
+ mock_client = MagicMock()
+ mock_boto3_client.return_value = mock_client
+ # head_bucket doesn't raise exception (bucket exists)
+ mock_client.head_bucket.return_value = None
+ mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
+
+ large_binary_manager._ensure_bucket_exists("test-bucket")
+
mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket")
+ mock_client.create_bucket.assert_not_called()
+
+ def test_ensure_bucket_exists_creates_bucket_when_missing(self):
+ """Test that missing bucket triggers creation."""
+ large_binary_manager._s3_client = None
+
+ with patch("boto3.client") as mock_boto3_client:
+ mock_client = MagicMock()
+ mock_boto3_client.return_value = mock_client
+ # head_bucket raises NoSuchBucket exception
+ no_such_bucket = type("NoSuchBucket", (Exception,), {})
+ mock_client.exceptions.NoSuchBucket = no_such_bucket
+ mock_client.head_bucket.side_effect = no_such_bucket()
+
+ large_binary_manager._ensure_bucket_exists("test-bucket")
+
mock_client.head_bucket.assert_called_once_with(Bucket="test-bucket")
+
mock_client.create_bucket.assert_called_once_with(Bucket="test-bucket")
+
+ def test_create_generates_unique_uri(self):
+ """Test that create() generates a unique S3 URI."""
+ large_binary_manager._s3_client = None
+
+ with patch("boto3.client") as mock_boto3_client:
+ mock_client = MagicMock()
+ mock_boto3_client.return_value = mock_client
+ mock_client.head_bucket.return_value = None
+ mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
+
+ uri = large_binary_manager.create()
+
+ # Check URI format
+ assert uri.startswith("s3://")
+ assert
uri.startswith(f"s3://{large_binary_manager.DEFAULT_BUCKET}/")
+ assert "objects/" in uri
+
+ # Verify bucket was checked/created
+ mock_client.head_bucket.assert_called_once_with(
+ Bucket=large_binary_manager.DEFAULT_BUCKET
+ )
+
+ def test_create_uses_default_bucket(self):
+ """Test that create() uses the default bucket."""
+ large_binary_manager._s3_client = None
+
+ with patch("boto3.client") as mock_boto3_client:
+ mock_client = MagicMock()
+ mock_boto3_client.return_value = mock_client
+ mock_client.head_bucket.return_value = None
+ mock_client.exceptions.NoSuchBucket = type("NoSuchBucket",
(Exception,), {})
+
+ uri = large_binary_manager.create()
+ assert large_binary_manager.DEFAULT_BUCKET in uri
diff --git
a/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
new file mode 100644
index 0000000000..7ebcc9b4cf
--- /dev/null
+++ b/amber/src/main/python/pytexera/storage/test_large_binary_output_stream.py
@@ -0,0 +1,238 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import time
+from unittest.mock import patch, MagicMock
+from core.models.type.large_binary import largebinary
+from pytexera.storage.large_binary_output_stream import LargeBinaryOutputStream
+from pytexera.storage import large_binary_manager
+
+
+class TestLargeBinaryOutputStream:
+ @pytest.fixture
+ def large_binary(self):
+ """Create a test largebinary."""
+ return largebinary("s3://test-bucket/path/to/object")
+
+ def test_init_with_valid_large_binary(self, large_binary):
+ """Test initialization with a valid largebinary."""
+ stream = LargeBinaryOutputStream(large_binary)
+ assert stream._large_binary == large_binary
+ assert stream._bucket_name == "test-bucket"
+ assert stream._object_key == "path/to/object"
+ assert not stream._closed
+ assert stream._upload_thread is None
+
+ def test_init_with_none_raises_error(self):
+ """Test that initializing with None raises ValueError."""
+ with pytest.raises(ValueError, match="largebinary cannot be None"):
+ LargeBinaryOutputStream(None)
+
+ def test_write_starts_upload_thread(self, large_binary):
+ """Test that write() starts the upload thread."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ stream = LargeBinaryOutputStream(large_binary)
+ assert stream._upload_thread is None
+
+ stream.write(b"test data")
+ assert stream._upload_thread is not None
+ # Thread may have already completed, so just check it was created
+ assert stream._upload_thread is not None
+
+ # Wait for thread to finish
+ stream.close()
+
+ def test_write_data(self, large_binary):
+ """Test writing data to the stream."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ stream = LargeBinaryOutputStream(large_binary)
+ bytes_written = stream.write(b"test data")
+ assert bytes_written == len(b"test data")
+
+ stream.close()
+
+ def test_write_multiple_chunks(self, large_binary):
+ """Test writing multiple chunks of data."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"chunk1")
+ stream.write(b"chunk2")
+ stream.write(b"chunk3")
+
+ stream.close()
+
+ def test_writable(self, large_binary):
+ """Test writable() method."""
+ stream = LargeBinaryOutputStream(large_binary)
+ assert stream.writable() is True
+
+ stream.close()
+ assert stream.writable() is False
+
+ def test_seekable(self, large_binary):
+ """Test seekable() method (should always return False)."""
+ stream = LargeBinaryOutputStream(large_binary)
+ assert stream.seekable() is False
+
+ def test_closed_property(self, large_binary):
+ """Test closed property."""
+ stream = LargeBinaryOutputStream(large_binary)
+ assert stream.closed is False
+
+ stream.close()
+ assert stream.closed is True
+
+ def test_flush(self, large_binary):
+ """Test flush() method (should be a no-op)."""
+ stream = LargeBinaryOutputStream(large_binary)
+ # Should not raise any exception
+ stream.flush()
+
+ def test_close_completes_upload(self, large_binary):
+ """Test that close() completes the upload."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"test data")
+
+ # Close should wait for upload to complete
+ stream.close()
+
+ # Verify upload_fileobj was called
+ assert mock_s3.upload_fileobj.called
+
+ def test_context_manager(self, large_binary):
+ """Test using as context manager."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ with LargeBinaryOutputStream(large_binary) as stream:
+ stream.write(b"test data")
+ assert not stream._closed
+
+ # Stream should be closed after context exit
+ assert stream._closed
+
+ def test_write_after_close_raises_error(self, large_binary):
+ """Test that writing after close raises ValueError."""
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.close()
+
+ with pytest.raises(ValueError, match="I/O operation on closed stream"):
+ stream.write(b"data")
+
+ def test_close_handles_upload_error(self, large_binary):
+ """Test that close() raises IOError if upload fails."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+ mock_s3.upload_fileobj.side_effect = Exception("Upload failed")
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"test data")
+
+ with pytest.raises(IOError, match="Failed to complete upload"):
+ stream.close()
+
+ def test_write_after_upload_error_raises_error(self, large_binary):
+ """Test that writing after upload error raises IOError."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+ mock_s3.upload_fileobj.side_effect = Exception("Upload failed")
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"test data")
+
+ # Wait a bit for the error to be set
+ time.sleep(0.1)
+
+ with pytest.raises(IOError, match="Background upload failed"):
+ stream.write(b"more data")
+
+ def test_multiple_close_calls(self, large_binary):
+ """Test that multiple close() calls are safe."""
+ with (
+ patch.object(large_binary_manager, "_get_s3_client") as
mock_get_s3_client,
+ patch.object(
+ large_binary_manager, "_ensure_bucket_exists"
+ ) as mock_ensure_bucket,
+ ):
+ mock_s3 = MagicMock()
+ mock_get_s3_client.return_value = mock_s3
+ mock_ensure_bucket.return_value = None
+
+ stream = LargeBinaryOutputStream(large_binary)
+ stream.write(b"test data")
+ stream.close()
+ # Second close should not raise error
+ stream.close()
diff --git a/amber/src/main/python/texera_run_python_worker.py
b/amber/src/main/python/texera_run_python_worker.py
index c9594cc218..3ebf81c201 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -51,6 +51,10 @@ if __name__ == "__main__":
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
+ s3_endpoint,
+ s3_region,
+ s3_auth_username,
+ s3_auth_password,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
@@ -60,6 +64,10 @@ if __name__ == "__main__":
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
+ s3_endpoint,
+ s3_region,
+ s3_auth_username,
+ s3_auth_password,
)
# Setting R_HOME environment variable for R-UDF usage
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index f32d227fc7..558b99c9b7 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -183,7 +183,11 @@ class PythonWorkflowWorker(
StorageConfig.icebergPostgresCatalogPassword,
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
- StorageConfig.icebergTableCommitBatchSize.toString
+ StorageConfig.icebergTableCommitBatchSize.toString,
+ StorageConfig.s3Endpoint,
+ StorageConfig.s3Region,
+ StorageConfig.s3Username,
+ StorageConfig.s3Password
)
).run(BasicIO.standard(false))
}
diff --git
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index ee7cbe5544..aa593cdcc6 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -311,7 +311,7 @@ class WorkflowService(
* 2. Clears URI references from the execution registry
* 3. Safely clears all result and console message documents
* 4. Expires Iceberg snapshots for runtime statistics
- * 5. Deletes big objects from MinIO
+ * 5. Deletes large binaries from MinIO
*
* @param eid The execution identity to clean up resources for
*/
diff --git a/build.sbt b/build.sbt
index 68c3ba231a..027775ff25 100644
--- a/build.sbt
+++ b/build.sbt
@@ -84,7 +84,19 @@ lazy val WorkflowExecutionService = (project in
file("amber"))
"org.slf4j" % "slf4j-api" % "1.7.26",
"org.eclipse.jetty" % "jetty-server" % "9.4.20.v20190813",
"org.eclipse.jetty" % "jetty-servlet" % "9.4.20.v20190813",
- "org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813"
+ "org.eclipse.jetty" % "jetty-http" % "9.4.20.v20190813",
+ // Netty dependency overrides to ensure compatibility with Arrow 14.0.1
+ // Arrow requires Netty 4.1.96.Final to avoid NoSuchFieldError: chunkSize
+ "io.netty" % "netty-all" % "4.1.96.Final",
+ "io.netty" % "netty-buffer" % "4.1.96.Final",
+ "io.netty" % "netty-codec" % "4.1.96.Final",
+ "io.netty" % "netty-codec-http" % "4.1.96.Final",
+ "io.netty" % "netty-codec-http2" % "4.1.96.Final",
+ "io.netty" % "netty-common" % "4.1.96.Final",
+ "io.netty" % "netty-handler" % "4.1.96.Final",
+ "io.netty" % "netty-resolver" % "4.1.96.Final",
+ "io.netty" % "netty-transport" % "4.1.96.Final",
+ "io.netty" % "netty-transport-native-unix-common" % "4.1.96.Final"
),
libraryDependencies ++= Seq(
"com.squareup.okhttp3" % "okhttp" % "4.10.0" force () // Force usage of
OkHttp 4.10.0
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index ab6b8f27c6..99870f03eb 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -115,6 +115,7 @@ libraryDependencies ++= Seq(
/////////////////////////////////////////////////////////////////////////////
// Arrow related
val arrowVersion = "14.0.1"
+val nettyVersion = "4.1.96.Final"
val arrowDependencies = Seq(
// https://mvnrepository.com/artifact/org.apache.arrow/flight-grpc
"org.apache.arrow" % "flight-grpc" % arrowVersion,
@@ -124,6 +125,22 @@ val arrowDependencies = Seq(
libraryDependencies ++= arrowDependencies
+// Netty dependency overrides to ensure compatibility with Arrow
+// Arrow 14.0.1 requires Netty 4.1.96.Final for proper memory allocation
+// The chunkSize field issue occurs when Netty versions are mismatched
+dependencyOverrides ++= Seq(
+ "io.netty" % "netty-all" % nettyVersion,
+ "io.netty" % "netty-buffer" % nettyVersion,
+ "io.netty" % "netty-codec" % nettyVersion,
+ "io.netty" % "netty-codec-http" % nettyVersion,
+ "io.netty" % "netty-codec-http2" % nettyVersion,
+ "io.netty" % "netty-common" % nettyVersion,
+ "io.netty" % "netty-handler" % nettyVersion,
+ "io.netty" % "netty-resolver" % nettyVersion,
+ "io.netty" % "netty-transport" % nettyVersion,
+ "io.netty" % "netty-transport-native-unix-common" % nettyVersion
+)
+
/////////////////////////////////////////////////////////////////////////////
// Iceberg-related Dependencies
/////////////////////////////////////////////////////////////////////////////
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
index 8e306c12a5..626047fda2 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/ArrowUtils.scala
@@ -26,7 +26,7 @@ import org.apache.arrow.memory.{BufferAllocator,
RootAllocator}
import org.apache.arrow.vector.types.FloatingPointPrecision
import org.apache.arrow.vector.types.TimeUnit.MILLISECOND
import org.apache.arrow.vector.types.pojo.ArrowType.PrimitiveType
-import org.apache.arrow.vector.types.pojo.{ArrowType, Field}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
import org.apache.arrow.vector.{
BigIntVector,
BitVector,
@@ -73,28 +73,28 @@ object ArrowUtils extends LazyLogging {
Tuple
.builder(schema)
.addSequentially(
- vectorSchemaRoot.getFieldVectors.asScala
- .map((fieldVector: FieldVector) => {
+ vectorSchemaRoot.getFieldVectors.asScala.zipWithIndex.map {
+ case (fieldVector: FieldVector, index: Int) =>
val value: AnyRef = fieldVector.getObject(rowIndex)
try {
- val arrowType = fieldVector.getField.getFieldType.getType
- val attributeType = toAttributeType(arrowType)
+ // Use the attribute type from the schema (which includes
metadata)
+ // instead of deriving it from the Arrow type
+ val attributeType = schema.getAttributes(index).getType
AttributeTypeUtils.parseField(value, attributeType)
-
} catch {
case e: Exception =>
logger.warn("Caught error during parsing Arrow value back to
Texera value", e)
null
}
- })
- .toArray
+ }.toArray
)
.build()
}
/**
* Converts an Arrow Schema into Texera Schema.
+ * Checks field metadata to detect LARGE_BINARY types.
*
* @param arrowSchema The Arrow Schema to be converted.
* @return A Texera Schema.
@@ -102,7 +102,12 @@ object ArrowUtils extends LazyLogging {
def toTexeraSchema(arrowSchema: org.apache.arrow.vector.types.pojo.Schema):
Schema =
Schema(
arrowSchema.getFields.asScala.map { field =>
- new Attribute(field.getName, toAttributeType(field.getType))
+ val isLargeBinary = Option(field.getMetadata)
+ .exists(m => m.containsKey("texera_type") && m.get("texera_type") ==
"LARGE_BINARY")
+
+ val attributeType =
+ if (isLargeBinary) AttributeType.LARGE_BINARY else
toAttributeType(field.getType)
+ new Attribute(field.getName, attributeType)
}.toList
)
@@ -211,7 +216,7 @@ object ArrowUtils extends LazyLogging {
else
vector
.asInstanceOf[VarCharVector]
- .setSafe(index,
value.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+ .setSafe(index, value.toString.getBytes(StandardCharsets.UTF_8))
case _: ArrowType.Binary | _: ArrowType.LargeBinary =>
if (isNull) vector.asInstanceOf[VarBinaryVector].setNull(index)
else
@@ -227,19 +232,27 @@ object ArrowUtils extends LazyLogging {
/**
* Converts an Amber schema into Arrow schema.
+ * Stores AttributeType in field metadata to preserve LARGE_BINARY type
information.
*
* @param schema The Texera Schema.
* @return An Arrow Schema.
*/
def fromTexeraSchema(schema: Schema):
org.apache.arrow.vector.types.pojo.Schema = {
- val arrowFields = new util.ArrayList[Field]
-
- for (amberAttribute <- schema.getAttributes) {
- val name = amberAttribute.getName
- val field = Field.nullablePrimitive(name,
fromAttributeType(amberAttribute.getType))
- arrowFields.add(field)
+ val arrowFields = schema.getAttributes.map { attribute =>
+ val metadata = if (attribute.getType == AttributeType.LARGE_BINARY) {
+ val map = new util.HashMap[String, String]()
+ map.put("texera_type", "LARGE_BINARY")
+ map
+ } else null
+
+ new Field(
+ attribute.getName,
+ new FieldType(true, fromAttributeType(attribute.getType), null,
metadata),
+ null
+ )
}
- new org.apache.arrow.vector.types.pojo.Schema(arrowFields)
+
+ new
org.apache.arrow.vector.types.pojo.Schema(util.Arrays.asList(arrowFields: _*))
}
/**
@@ -270,7 +283,7 @@ object ArrowUtils extends LazyLogging {
case AttributeType.BINARY =>
new ArrowType.Binary
- case AttributeType.STRING | AttributeType.ANY =>
+ case AttributeType.STRING | AttributeType.LARGE_BINARY |
AttributeType.ANY =>
ArrowType.Utf8.INSTANCE
case _ =>
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
index edeaf7d4ae..6a97df8ca3 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala
@@ -20,14 +20,15 @@
package org.apache.texera.amber.util
import
org.apache.texera.amber.core.tuple.AttributeTypeUtils.AttributeTypeException
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema, Tuple}
+import org.apache.texera.amber.core.tuple.{AttributeType, LargeBinary, Schema,
Tuple}
import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
import org.apache.arrow.vector.VectorSchemaRoot
-import org.apache.arrow.vector.types.pojo.{ArrowType, Field}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType}
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision,
IntervalUnit, TimeUnit}
import org.scalatest.flatspec.AnyFlatSpec
import java.sql.Timestamp
+import java.util
import scala.jdk.CollectionConverters.IterableHasAsJava
class ArrowUtilsSpec extends AnyFlatSpec {
@@ -99,6 +100,9 @@ class ArrowUtilsSpec extends AnyFlatSpec {
// but not the other way around.
assert(ArrowUtils.fromAttributeType(AttributeType.ANY) == string)
+ // LARGE_BINARY is converted to ArrowType.Utf8 (same as STRING)
+ assert(ArrowUtils.fromAttributeType(AttributeType.LARGE_BINARY) == string)
+
}
it should "raise AttributeTypeException when converting unsupported types"
in {
@@ -239,4 +243,140 @@ class ArrowUtilsSpec extends AnyFlatSpec {
}
+ it should "convert from AttributeType to ArrowType for LARGE_BINARY
correctly" in {
+ // LARGE_BINARY is converted to ArrowType.Utf8 (stored as string)
+ assert(ArrowUtils.fromAttributeType(AttributeType.LARGE_BINARY) == string)
+ }
+
+ it should "convert Texera Schema with LARGE_BINARY to Arrow Schema with
metadata correctly" in {
+ val texeraSchemaWithLargeBinary = Schema()
+ .add("regular_string", AttributeType.STRING)
+ .add("large_binary_field", AttributeType.LARGE_BINARY)
+
+ val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary)
+
+ // Check that regular string field has no metadata
+ val regularStringField = arrowSchema.getFields.get(0)
+ assert(regularStringField.getName == "regular_string")
+ assert(regularStringField.getType == string)
+ assert(
+ regularStringField.getMetadata == null ||
!regularStringField.getMetadata.containsKey(
+ "texera_type"
+ )
+ )
+
+ // Check that LARGE_BINARY field has metadata
+ val largeBinaryField = arrowSchema.getFields.get(1)
+ assert(largeBinaryField.getName == "large_binary_field")
+ assert(largeBinaryField.getType == string) // LARGE_BINARY is stored as
Utf8
+ assert(largeBinaryField.getMetadata != null)
+ assert(largeBinaryField.getMetadata.get("texera_type") == "LARGE_BINARY")
+ }
+
+ it should "convert Arrow Schema with LARGE_BINARY metadata to Texera Schema
correctly" in {
+ // Create Arrow schema with LARGE_BINARY metadata
+ val largeBinaryMetadata = new util.HashMap[String, String]()
+ largeBinaryMetadata.put("texera_type", "LARGE_BINARY")
+
+ val arrowSchemaWithLargeBinary = new
org.apache.arrow.vector.types.pojo.Schema(
+ Array(
+ Field.nullablePrimitive("regular_string", string),
+ new Field(
+ "large_binary_field",
+ new FieldType(true, string, null, largeBinaryMetadata),
+ null
+ )
+ ).toList.asJava
+ )
+
+ val texeraSchema = ArrowUtils.toTexeraSchema(arrowSchemaWithLargeBinary)
+
+ assert(texeraSchema.getAttribute("regular_string").getName ==
"regular_string")
+ assert(texeraSchema.getAttribute("regular_string").getType ==
AttributeType.STRING)
+
+ assert(texeraSchema.getAttribute("large_binary_field").getName ==
"large_binary_field")
+ assert(texeraSchema.getAttribute("large_binary_field").getType ==
AttributeType.LARGE_BINARY)
+ }
+
+ it should "set and get Texera Tuple with LARGE_BINARY correctly" in {
+ val texeraSchemaWithLargeBinary = Schema()
+ .add("large_binary_field", AttributeType.LARGE_BINARY)
+ .add("regular_string", AttributeType.STRING)
+
+ val largeBinary = new LargeBinary("s3://test-bucket/path/to/object")
+ val tuple = Tuple
+ .builder(texeraSchemaWithLargeBinary)
+ .addSequentially(
+ Array(
+ largeBinary,
+ "regular string value"
+ )
+ )
+ .build()
+
+ val allocator: BufferAllocator = new RootAllocator()
+ val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary)
+ val vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator)
+ vectorSchemaRoot.allocateNew()
+
+ // Set Tuple into the Vectors
+ ArrowUtils.appendTexeraTuple(tuple, vectorSchemaRoot)
+
+ // Verify the LARGE_BINARY is stored as string (URI) in Arrow
+ val storedValue = vectorSchemaRoot.getVector(0).getObject(0)
+ assert(storedValue.toString == "s3://test-bucket/path/to/object")
+
+ // Get the Tuple from the Vectors
+ val retrievedTuple = ArrowUtils.getTexeraTuple(0, vectorSchemaRoot)
+ assert(retrievedTuple.getField[LargeBinary](0) == largeBinary)
+ assert(retrievedTuple.getField[String](1) == "regular string value")
+ }
+
+ it should "handle null LARGE_BINARY values correctly" in {
+ val texeraSchemaWithLargeBinary = Schema()
+ .add("large_binary_field", AttributeType.LARGE_BINARY)
+
+ val tuple = Tuple
+ .builder(texeraSchemaWithLargeBinary)
+ .addSequentially(
+ Array(
+ null.asInstanceOf[LargeBinary]
+ )
+ )
+ .build()
+
+ val allocator: BufferAllocator = new RootAllocator()
+ val arrowSchema = ArrowUtils.fromTexeraSchema(texeraSchemaWithLargeBinary)
+ val vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator)
+ vectorSchemaRoot.allocateNew()
+
+ // Set Tuple into the Vectors
+ ArrowUtils.appendTexeraTuple(tuple, vectorSchemaRoot)
+
+ // Verify null is stored correctly
+ assert(vectorSchemaRoot.getVector(0).getObject(0) == null)
+
+ // Get the Tuple from the Vectors
+ val retrievedTuple = ArrowUtils.getTexeraTuple(0, vectorSchemaRoot)
+ assert(retrievedTuple.getField[LargeBinary](0) == null)
+ }
+
+ it should "round-trip LARGE_BINARY schema conversion correctly" in {
+ val originalSchema = Schema()
+ .add("field1", AttributeType.STRING)
+ .add("field2", AttributeType.LARGE_BINARY)
+ .add("field3", AttributeType.INTEGER)
+ .add("field4", AttributeType.LARGE_BINARY)
+
+ // Convert to Arrow and back
+ val arrowSchema = ArrowUtils.fromTexeraSchema(originalSchema)
+ val roundTripSchema = ArrowUtils.toTexeraSchema(arrowSchema)
+
+ assert(roundTripSchema.getAttribute("field1").getType ==
AttributeType.STRING)
+ assert(roundTripSchema.getAttribute("field2").getType ==
AttributeType.LARGE_BINARY)
+ assert(roundTripSchema.getAttribute("field3").getType ==
AttributeType.INTEGER)
+ assert(roundTripSchema.getAttribute("field4").getType ==
AttributeType.LARGE_BINARY)
+ assert(roundTripSchema == originalSchema)
+ }
+
}