This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit d3e63db9f536de602682ca4973aad8361887918f Author: Kunwoo Park <[email protected]> AuthorDate: Fri Oct 17 12:11:51 2025 -0700 Big Object Read for R UDF Operators --- .../core/architecture/packaging/output_manager.py | 2 +- .../src/main/python/core/models/RTableExecutor.py | 108 +++++++++++++++++++-- .../src/main/python/core/models/RTupleExecutor.py | 30 +++++- .../main/python/core/models/big_object_manager.R | 99 +++++++++++++++++++ .../main/python/core/models/big_object_pointer.R | 51 ++++++++++ amber/src/main/python/core/models/r_utils.py | 18 ++++ .../src/main/python/core/models/schema/__init__.py | 2 + .../python/core/models/schema/attribute_type.py | 2 +- .../schema}/big_object_pointer.py | 2 +- amber/src/main/python/core/models/tuple.py | 2 +- .../python/core/storage/iceberg/iceberg_utils.py | 2 +- amber/src/main/python/pytexera/__init__.py | 4 +- .../pytexera/{ => storage}/big_object_manager.py | 19 ++-- bin/deployment/computing-unit-master.dockerfile | 4 +- 14 files changed, 319 insertions(+), 26 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index eb027920ee..b2a6392dfe 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -259,7 +259,7 @@ class OutputManager: ) def tuple_to_frame(self, tuples: typing.List[Tuple]) -> DataFrame: - from core.storage.big_object_pointer import BigObjectPointer + from core.models.schema.big_object_pointer import BigObjectPointer return DataFrame( frame=Table.from_pydict( diff --git a/amber/src/main/python/core/models/RTableExecutor.py b/amber/src/main/python/core/models/RTableExecutor.py index db8610e6f2..a72451012b 100644 --- a/amber/src/main/python/core/models/RTableExecutor.py +++ b/amber/src/main/python/core/models/RTableExecutor.py @@ -23,7 +23,14 @@ from rpy2.robjects.conversion import localconverter as local_converter from rpy2_arrow.arrow import rarrow_to_py_table, converter as arrow_converter from typing import Iterator, Optional, Union -from core.models import ArrowTableTupleProvider, Tuple, TupleLike, Table, TableLike +from core.models import ( + ArrowTableTupleProvider, + Tuple, + TupleLike, + Table, + TableLike, + r_utils, +) from core.models.operator import SourceOperator, TableOperator @@ -45,6 +52,36 @@ class RTableExecutor(TableOperator): """ ) + # BigObject column conversion helpers + _wrap_big_object_cols = robjects.r( + """ + function(df, cols) { + for (col in cols) { + df[[col]] <- new_BigObjectPointerColumn(lapply(df[[col]], function(uri) { + if (is.na(uri) || is.null(uri) || nchar(uri) == 0) NA else BigObjectPointer$new(uri) + })) + } + df + } + """ + ) + + _unwrap_big_object_cols = robjects.r( + """ + function(df, cols) { + for (col in cols) { + vals <- if (inherits(df[[col]], "BigObjectPointerColumn")) unclass(df[[col]]) else df[[col]] + df[[col]] <- sapply(vals, function(obj) { + if (is.na(obj) || is.null(obj)) NA_character_ + else if (inherits(obj, "BigObjectPointer")) obj$uri + else as.character(obj) + }) + } + df + } + """ + ) + def __init__(self, r_code: str): """ Initialize the RTableExecutor with R code. @@ -67,20 +104,71 @@ class RTableExecutor(TableOperator): :return: Iterator[Optional[TableLike]], producing one TableLike object at a time, or None. """ - input_pyarrow_table = pa.Table.from_pandas(table) + from core.models.schema.big_object_pointer import BigObjectPointer + from core.models.schema import Schema, AttributeType + from core.models.schema.attribute_type import FROM_ARROW_MAPPING + + # Step 1: Identify and serialize BigObjectPointer → URI strings for input + input_big_object_cols = [ + col + for col in table.columns + if len(table[col]) > 0 and isinstance(table[col].iloc[0], BigObjectPointer) + ] + + if input_big_object_cols: + table = table.copy() + for col in input_big_object_cols: + table[col] = table[col].apply( + lambda x: x.uri if isinstance(x, BigObjectPointer) else x + ) + + # Step 2: Python → R conversion and execution with local_converter(arrow_converter): - input_r_dataframe = RTableExecutor._arrow_to_r_dataframe( - input_pyarrow_table + # Convert pandas → Arrow → R dataframe + arrow_table = pa.Table.from_pandas(table) + r_df = RTableExecutor._arrow_to_r_dataframe(arrow_table) + + # Wrap input BIG_OBJECT columns as BigObjectPointer for R UDF + if input_big_object_cols: + r_df = RTableExecutor._wrap_big_object_cols(r_df, input_big_object_cols) + + # Execute user's R function + r_df = self._func(r_df, port) + + # Detect which OUTPUT columns are BIG_OBJECT + output_big_object_cols = list( + robjects.r( + 'function(df) names(df)[sapply(df, inherits, "BigObjectPointerColumn")]' + )(r_df) ) - output_r_dataframe = self._func(input_r_dataframe, port) - output_rarrow_table = RTableExecutor._r_dataframe_to_arrow( - output_r_dataframe + + # Unwrap output BIG_OBJECT columns to URI strings for Python + if output_big_object_cols: + r_df = RTableExecutor._unwrap_big_object_cols( + r_df, output_big_object_cols + ) + + # Convert R dataframe → Arrow → pandas + arrow_table = rarrow_to_py_table(RTableExecutor._r_dataframe_to_arrow(r_df)) + + # Step 3: Build output schema with correct BIG_OBJECT type information + output_schema = Schema() + for col in arrow_table.column_names: + attr_type = ( + AttributeType.BIG_OBJECT + if col in output_big_object_cols + else FROM_ARROW_MAPPING[arrow_table.schema.field(col).type.id] ) - output_pyarrow_table = rarrow_to_py_table(output_rarrow_table) + output_schema.add(col, attr_type) - for field_accessor in ArrowTableTupleProvider(output_pyarrow_table): + # Step 4: Create Arrow table with metadata and yield Tuples + arrow_with_metadata = pa.Table.from_pandas( + arrow_table.to_pandas(), schema=output_schema.as_arrow_schema() + ) + + for field_accessor in ArrowTableTupleProvider(arrow_with_metadata): yield Tuple( - {name: field_accessor for name in output_pyarrow_table.column_names} + {name: field_accessor for name in arrow_with_metadata.column_names} ) diff --git a/amber/src/main/python/core/models/RTupleExecutor.py b/amber/src/main/python/core/models/RTupleExecutor.py index 67f05fb2d3..3b0a966597 100644 --- a/amber/src/main/python/core/models/RTupleExecutor.py +++ b/amber/src/main/python/core/models/RTupleExecutor.py @@ -85,8 +85,17 @@ class RTupleExecutor(TupleOperatorV2): non_binary_tuple_schema: pa.Schema = ( non_binary_tuple._schema.as_arrow_schema() ) + + # Serialize BigObjectPointer to URI string for Arrow + from core.models.schema.big_object_pointer import BigObjectPointer + + serialized_dict = { + k: (v.uri if isinstance(v, BigObjectPointer) else v) + for k, v in non_binary_tuple.as_dict().items() + } + non_binary_pyarrow_array: pa.StructArray = pa.array( - [non_binary_tuple.as_dict()], + [serialized_dict], type=pa.struct(non_binary_tuple_schema), ) @@ -113,6 +122,25 @@ class RTupleExecutor(TupleOperatorV2): ) ) + # Wrap URI strings → R BigObjectPointer objects for BIG_OBJECT fields + from core.models.schema.attribute_type import AttributeType + + pointer_class = robjects.r("BigObjectPointer") + wrapped = { + name: ( + pointer_class(input_r_list.rx2(name)[0]) + if ( + input_r_list.rx2(name) + and isinstance(input_r_list.rx2(name)[0], str) + and tuple_._schema.get_attr_type(name) + == AttributeType.BIG_OBJECT + ) + else input_r_list.rx2(name) + ) + for name in input_r_list.names + } + input_r_list = robjects.vectors.ListVector(wrapped) + output_r_generator: rpy2.robjects.SignatureTranslatedFunction = self._func( input_r_list, port ) diff --git a/amber/src/main/python/core/models/big_object_manager.R b/amber/src/main/python/core/models/big_object_manager.R new file mode 100644 index 0000000000..c365c11141 --- /dev/null +++ b/amber/src/main/python/core/models/big_object_manager.R @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# BigObjectManager - Manager for reading big objects from S3 +# +# This file defines the BigObjectManager API and BigObjectStream class. +# Users call BigObjectManager$open() in their R UDF code to read big objects. + +# Load required packages +if (!require("aws.s3", quietly = TRUE)) { + warning("Package 'aws.s3' not installed. Install with: install.packages('aws.s3')") +} + +# BigObjectStream Reference Class +# Provides stream-like access to big object content +BigObjectStream <- setRefClass("BigObjectStream", + fields = list(conn = "ANY", uri = "character", is_closed = "logical"), + methods = list( + initialize = function(raw_bytes, uri_val) { + conn <<- rawConnection(raw_bytes, open = "rb") + uri <<- uri_val + is_closed <<- FALSE + }, + read = function(n = -1L) { + if (is_closed) stop("Stream is closed") + readBin(conn, "raw", if (n == -1L) 1e9 else n) + }, + close = function() { + if (!is_closed) { + base::close(conn) + is_closed <<- TRUE + } + }, + finalize = function() close() + ) +) + +# BigObjectManager API +# Main interface for accessing big objects from S3 +BigObjectManager <- list( + open = function(pointer_or_uri) { + # Extract from list if needed (for backward compatibility) + if (is.list(pointer_or_uri) && length(pointer_or_uri) == 1) { + pointer_or_uri <- pointer_or_uri[[1]] + } + + # Get URI string + uri <- if (inherits(pointer_or_uri, "BigObjectPointer")) { + pointer_or_uri$uri + } else if (is.character(pointer_or_uri)) { + pointer_or_uri + } else { + stop("Expected BigObjectPointer or character URI") + } + + if (!grepl("^s3://", uri)) stop(paste("Invalid S3 URI:", uri)) + + # Parse s3://bucket/key + parts <- strsplit(sub("^s3://", "", uri), "/", fixed = TRUE)[[1]] + if (length(parts) < 2) stop(paste("Invalid S3 URI format:", uri)) + + # Configure S3 credentials from environment variables + Sys.setenv( + AWS_ACCESS_KEY_ID = Sys.getenv("STORAGE_S3_AUTH_USERNAME", "texera_minio"), + AWS_SECRET_ACCESS_KEY = Sys.getenv("STORAGE_S3_AUTH_PASSWORD", "password"), + AWS_S3_ENDPOINT = Sys.getenv("STORAGE_S3_ENDPOINT", "localhost:9000"), + AWS_DEFAULT_REGION = Sys.getenv("STORAGE_S3_REGION", "us-west-2") + ) + + # Fetch object from S3 + raw_bytes <- tryCatch( + aws.s3::get_object( + object = paste(parts[-1], collapse = "/"), + bucket = parts[1], + region = Sys.getenv("AWS_DEFAULT_REGION"), + base_url = Sys.getenv("AWS_S3_ENDPOINT"), + use_https = grepl("^https://", Sys.getenv("AWS_S3_ENDPOINT")) + ), + error = function(e) stop(paste("Failed to open", uri, ":", conditionMessage(e))) + ) + + BigObjectStream$new(raw_bytes, uri) + } +) + diff --git a/amber/src/main/python/core/models/big_object_pointer.R b/amber/src/main/python/core/models/big_object_pointer.R new file mode 100644 index 0000000000..d75284a3af --- /dev/null +++ b/amber/src/main/python/core/models/big_object_pointer.R @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# BigObjectPointer - Reference to a large object stored in S3 +# +# This file defines the BigObjectPointer Reference Class, which represents +# a pointer to a big object. Users receive BigObjectPointer objects in their +# R UDF code when working with BIG_OBJECT type fields. + +BigObjectPointer <- setRefClass("BigObjectPointer", + fields = list(uri = "character"), + methods = list( + initialize = function(uri_val) { + if (!grepl("^s3://", uri_val)) stop(paste("Invalid S3 URI:", uri_val)) + uri <<- uri_val + } + ) +) + +# BigObjectPointerColumn - S3 class for natural table$col[i] syntax +# +# R stores object columns as list-columns (require [[i]]), but we want [i] to work +# like other columns. This S3 class overrides [ to return elements directly. + +new_BigObjectPointerColumn <- function(x) { + structure(x, class = c("BigObjectPointerColumn", "list")) +} + +`[.BigObjectPointerColumn` <- function(x, i) { + result <- unclass(x)[i] + if (length(i) == 1) result[[1]] else new_BigObjectPointerColumn(result) +} + +`[[.BigObjectPointerColumn` <- function(x, i) unclass(x)[[i]] + +length.BigObjectPointerColumn <- function(x) length(unclass(x)) + diff --git a/amber/src/main/python/core/models/r_utils.py b/amber/src/main/python/core/models/r_utils.py index d1f6e512bd..f0516cab99 100644 --- a/amber/src/main/python/core/models/r_utils.py +++ b/amber/src/main/python/core/models/r_utils.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import os import rpy2 import rpy2.rinterface as rinterface import rpy2.robjects as robjects @@ -24,6 +25,15 @@ from core.models import Tuple warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*") +# Auto-load R BigObject user-facing API when this module is imported +# Load order: pointer → manager (dependency order) +_r_files = [ + os.path.join(os.path.dirname(__file__), "big_object_pointer.R"), + os.path.join(os.path.dirname(__file__), "big_object_manager.R"), +] +for r_file in _r_files: + robjects.r(f'source("{r_file}")') + def convert_r_to_py(value: rpy2.robjects): """ @@ -31,6 +41,14 @@ def convert_r_to_py(value: rpy2.robjects): :return: A Python representation of the value, if convertable. If not, it returns the value itself """ + # Convert R BigObjectPointer to Python BigObjectPointer + if hasattr(value, "rclass") and value.rclass and "BigObjectPointer" in value.rclass: + from core.models.schema.big_object_pointer import BigObjectPointer + + uri_field = robjects.r("function(obj) obj$uri")(value) + uri = str(uri_field[0]) if len(uri_field) > 0 else str(uri_field) + return BigObjectPointer(uri) + if isinstance(value, robjects.vectors.BoolVector): return bool(value[0]) if isinstance(value, robjects.vectors.IntVector): diff --git a/amber/src/main/python/core/models/schema/__init__.py b/amber/src/main/python/core/models/schema/__init__.py index 7c8b57bb31..a21430a3d2 100644 --- a/amber/src/main/python/core/models/schema/__init__.py +++ b/amber/src/main/python/core/models/schema/__init__.py @@ -16,12 +16,14 @@ # under the License. from .attribute_type import AttributeType +from .big_object_pointer import BigObjectPointer from .field import Field from .schema import Schema __all__ = [ "AttributeType", + "BigObjectPointer", "Field", "Schema", ] diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 63c952d1f8..f3f6e50f38 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -20,7 +20,7 @@ import pyarrow as pa from bidict import bidict from enum import Enum from pyarrow import lib -from core.storage.big_object_pointer import BigObjectPointer +from core.models.schema.big_object_pointer import BigObjectPointer class AttributeType(Enum): diff --git a/amber/src/main/python/core/storage/big_object_pointer.py b/amber/src/main/python/core/models/schema/big_object_pointer.py similarity index 95% rename from amber/src/main/python/core/storage/big_object_pointer.py rename to amber/src/main/python/core/models/schema/big_object_pointer.py index 4aac9d3a63..e846f8411d 100644 --- a/amber/src/main/python/core/storage/big_object_pointer.py +++ b/amber/src/main/python/core/models/schema/big_object_pointer.py @@ -17,7 +17,7 @@ """ BigObjectPointer represents a reference to a large object stored externally (e.g., S3). -This is a storage reference class used throughout the system for handling big objects. +This is a schema type class used throughout the system for handling BIG_OBJECT attribute types. """ diff --git a/amber/src/main/python/core/models/tuple.py b/amber/src/main/python/core/models/tuple.py index f88d75b4fe..6369b6dba2 100644 --- a/amber/src/main/python/core/models/tuple.py +++ b/amber/src/main/python/core/models/tuple.py @@ -29,7 +29,7 @@ from pympler import asizeof from typing import Any, List, Iterator, Callable from typing_extensions import Protocol, runtime_checkable -from core.storage.big_object_pointer import BigObjectPointer +from core.models.schema.big_object_pointer import BigObjectPointer from .schema.attribute_type import TO_PYOBJECT_MAPPING, AttributeType from .schema.field import Field from .schema.schema import Schema diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 3c23df73b0..c8e0db9e93 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -243,7 +243,7 @@ def amber_tuples_to_arrow_table( Converts a list of amber tuples to a pyarrow table for serialization. Handles BIG_OBJECT field name encoding and serialization. """ - from core.storage.big_object_pointer import BigObjectPointer + from core.models.schema.big_object_pointer import BigObjectPointer # Build data dict using Iceberg schema field names (encoded) data_dict = {} diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index 2b3d59be10..f65eefac9d 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -21,13 +21,14 @@ from typing import Iterator, Optional, Union from pyamber import * from .storage.dataset_file_document import DatasetFileDocument +from .storage.big_object_manager import BigObjectManager from .udf.udf_operator import ( UDFOperatorV2, UDFTableOperator, UDFBatchOperator, UDFSourceOperator, ) -from .big_object_manager import BigObjectManager +from core.models.schema.big_object_pointer import BigObjectPointer __all__ = [ "State", @@ -43,6 +44,7 @@ __all__ = [ "UDFSourceOperator", "DatasetFileDocument", "BigObjectManager", + "BigObjectPointer", # export external tools to be used "overrides", "logger", diff --git a/amber/src/main/python/pytexera/big_object_manager.py b/amber/src/main/python/pytexera/storage/big_object_manager.py similarity index 88% rename from amber/src/main/python/pytexera/big_object_manager.py rename to amber/src/main/python/pytexera/storage/big_object_manager.py index f61549fb5f..a59129c2c9 100644 --- a/amber/src/main/python/pytexera/big_object_manager.py +++ b/amber/src/main/python/pytexera/storage/big_object_manager.py @@ -19,36 +19,39 @@ import os from typing import BinaryIO -from core.storage.big_object_pointer import BigObjectPointer +from core.models.schema.big_object_pointer import BigObjectPointer class BigObjectStream: - """Stream for reading big objects (matches Scala BigObjectStream).""" + """Stream for reading big objects (matches Scala/R BigObjectStream).""" def __init__(self, body: BinaryIO, pointer: BigObjectPointer): self._body = body self._pointer = pointer self._closed = False - def read(self, amt=None): + def read(self, n: int = -1) -> bytes: + """Read n bytes from stream (-1 = read all).""" if self._closed: raise ValueError("I/O operation on closed stream") - return self._body.read(amt) + if n == -1: + return self._body.read() + return self._body.read(n) def close(self): + """Close the stream.""" if not self._closed: self._closed = True self._body.close() def __enter__(self): + """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - automatically cleanup.""" self.close() - - @property - def closed(self): - return self._closed + return False class BigObjectManager: diff --git a/bin/deployment/computing-unit-master.dockerfile b/bin/deployment/computing-unit-master.dockerfile index d44fe9703d..74e0d52e71 100644 --- a/bin/deployment/computing-unit-master.dockerfile +++ b/bin/deployment/computing-unit-master.dockerfile @@ -93,7 +93,9 @@ RUN Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \ install.packages('remotes'); \ remotes::install_version('arrow', version='14.0.2.1', \ repos='https://cran.r-project.org', upgrade='never'); \ - cat('R arrow version: ', as.character(packageVersion('arrow')), '\n')" + cat('R arrow version: ', as.character(packageVersion('arrow')), '\n')" && \ + Rscript -e "options(repos = c(CRAN = 'https://cran.r-project.org')); \ + install.packages('aws.s3')"; ENV LD_LIBRARY_PATH=/usr/local/lib/R/lib:$LD_LIBRARY_PATH # Copy the built texera binary from the build phase
