This is an automated email from the ASF dual-hosted git repository. yiconghuang pushed a commit to branch feat/drop-support-for-r-udf in repository https://gitbox.apache.org/repos/asf/texera.git
commit fcf936e70166aef37cf378d7502209cb12b4af07 Author: Yicong Huang <[email protected]> AuthorDate: Fri Nov 28 18:03:48 2025 -0800 chore: drop support for RUDF --- .github/workflows/github-action-build.yml | 2 +- amber/r-requirements.txt | 19 - .../core/architecture/managers/executor_manager.py | 31 +- .../src/main/python/core/models/RTableExecutor.py | 132 ----- .../src/main/python/core/models/RTupleExecutor.py | 167 ------- amber/src/main/python/core/models/r_utils.py | 79 --- .../main/python/core/models/test_RTableExecutor.py | 539 --------------------- 7 files changed, 8 insertions(+), 961 deletions(-) diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index fabf68053e..bb6c65a41d 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -157,4 +157,4 @@ jobs: cd amber/src/main/python && flake8 && black . --check - name: Test with pytest run: | - cd amber/src/main/python && pytest -sv --ignore=core/models/test_RTableExecutor.py + cd amber/src/main/python && pytest -sv diff --git a/amber/r-requirements.txt b/amber/r-requirements.txt deleted file mode 100644 index 1216cdce4f..0000000000 --- a/amber/r-requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -rpy2==3.5.11 -rpy2-arrow==0.0.8 \ No newline at end of file diff --git a/amber/src/main/python/core/architecture/managers/executor_manager.py b/amber/src/main/python/core/architecture/managers/executor_manager.py index 9693041975..286d7bcaf0 100644 --- a/amber/src/main/python/core/architecture/managers/executor_manager.py +++ b/amber/src/main/python/core/architecture/managers/executor_manager.py @@ -132,30 +132,13 @@ class ExecutorManager: :param language: The language of the operator code. :return: """ - if language == "r-tuple": - # Have to import it here and not at the top in case R_HOME from udf.conf - # is not defined, otherwise an error will occur - # If R_HOME is not defined and rpy2 cannot find the - # R_HOME environment variable, an error will occur here - from core.models.RTupleExecutor import RTupleSourceExecutor, RTupleExecutor - - self.executor = ( - RTupleSourceExecutor(code) if is_source else RTupleExecutor(code) - ) - elif language == "r-table": - # Have to import it here and not at the top in case R_HOME from udf.conf - # is not defined, otherwise an error will occur - # If R_HOME is not defined and rpy2 cannot find the - # R_HOME environment variable, an error will occur here - from core.models.RTableExecutor import RTableSourceExecutor, RTableExecutor - - self.executor = ( - RTableSourceExecutor(code) if is_source else RTableExecutor(code) - ) - else: - executor: type(Operator) = self.load_executor_definition(code) - self.executor = executor() - self.executor.is_source = is_source + assert language in [ + "r-tuple", + "r-table", + ], f"Language {language} is not by default. Please consult third party plugin." + executor: type(Operator) = self.load_executor_definition(code) + self.executor = executor() + self.executor.is_source = is_source assert ( isinstance(self.executor, SourceOperator) == self.executor.is_source ), "Please use SourceOperator API for source operators." diff --git a/amber/src/main/python/core/models/RTableExecutor.py b/amber/src/main/python/core/models/RTableExecutor.py deleted file mode 100644 index db8610e6f2..0000000000 --- a/amber/src/main/python/core/models/RTableExecutor.py +++ /dev/null @@ -1,132 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pyarrow as pa -import rpy2.robjects as robjects -import typing -from rpy2.robjects import default_converter -from rpy2.robjects.conversion import localconverter as local_converter -from rpy2_arrow.arrow import rarrow_to_py_table, converter as arrow_converter -from typing import Iterator, Optional, Union - -from core.models import ArrowTableTupleProvider, Tuple, TupleLike, Table, TableLike -from core.models.operator import SourceOperator, TableOperator - - -class RTableExecutor(TableOperator): - """ - An executor that can execute R code on Arrow tables. - """ - - is_source = False - - _arrow_to_r_dataframe = robjects.r( - "function(table) { return (as.data.frame(table)) }" - ) - - _r_dataframe_to_arrow = robjects.r( - """ - library(arrow) - function(df) { return (arrow::as_arrow_table(df)) } - """ - ) - - def __init__(self, r_code: str): - """ - Initialize the RTableExecutor with R code. - - Args: - r_code (str): R code to be executed. - """ - super().__init__() - with local_converter(default_converter): - self._func: typing.Callable[[pa.Table], pa.Table] = robjects.r(r_code) - - def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: - """ - Process an input Table using the provided R function. - The Table is represented as a pandas.DataFrame. - - :param table: Table, a table to be processed. - :param port: int, input port index of the current Tuple. - Currently unused in R-UDF - :return: Iterator[Optional[TableLike]], producing one TableLike object at a - time, or None. - """ - input_pyarrow_table = pa.Table.from_pandas(table) - with local_converter(arrow_converter): - input_r_dataframe = RTableExecutor._arrow_to_r_dataframe( - input_pyarrow_table - ) - output_r_dataframe = self._func(input_r_dataframe, port) - output_rarrow_table = RTableExecutor._r_dataframe_to_arrow( - output_r_dataframe - ) - output_pyarrow_table = rarrow_to_py_table(output_rarrow_table) - - for field_accessor in ArrowTableTupleProvider(output_pyarrow_table): - yield Tuple( - {name: field_accessor for name in output_pyarrow_table.column_names} - ) - - -class RTableSourceExecutor(SourceOperator): - """ - A source operator that produces an R Table or Table-like object using R code. - """ - - is_source = True - _source_output_to_arrow = robjects.r( - """ - library(arrow) - function(source_output) { - return (arrow::as_arrow_table(as.data.frame(source_output))) - } - """ - ) - - def __init__(self, r_code: str): - """ - Initialize the RTableSourceExecutor with R code. - - Args: - r_code (str): R code to be executed. - """ - super().__init__() - # Use the local converter from rpy2 to load in the R function given by the user - with local_converter(default_converter): - self._func = robjects.r(r_code) - - def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: - """ - Produce Table using the provided R function. - Used by the source operator only. - - :return: Iterator[Union[TupleLike, TableLike, None]], producing - one TupleLike object, one TableLike object, or None, at a time. - """ - with local_converter(arrow_converter): - output_table = self._func() - output_rarrow_table = RTableSourceExecutor._source_output_to_arrow( - output_table - ) - output_pyarrow_table = rarrow_to_py_table(output_rarrow_table) - - for field_accessor in ArrowTableTupleProvider(output_pyarrow_table): - yield Tuple( - {name: field_accessor for name in output_pyarrow_table.column_names} - ) diff --git a/amber/src/main/python/core/models/RTupleExecutor.py b/amber/src/main/python/core/models/RTupleExecutor.py deleted file mode 100644 index 67f05fb2d3..0000000000 --- a/amber/src/main/python/core/models/RTupleExecutor.py +++ /dev/null @@ -1,167 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import datetime -import pickle -import pyarrow as pa -import rpy2 -import rpy2.robjects as robjects -import warnings -from rpy2.robjects import default_converter -from rpy2.robjects.conversion import localconverter as local_converter -from rpy2_arrow.arrow import converter as arrow_converter -from typing import Iterator, Optional, Union - -from core.models import Tuple, TupleLike, TableLike, r_utils -from core.models.operator import SourceOperator, TupleOperatorV2 - -warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*") - - -class RTupleExecutor(TupleOperatorV2): - """ - An operator that can execute R code on R Lists (R's representation of a Tuple) - """ - - is_source = False - - _combine_binary_and_non_binary_lists = robjects.r( - """ - function(non_binary_list, binary_list) { - non_binary_list <- as.list(non_binary_list$as_vector()) - return (c(non_binary_list, binary_list)) - } - """ - ) - - def __init__(self, r_code: str): - """ - Initialize the RTupleExecutor with R code. - - Args: - r_code (str): R code to be executed. - """ - super().__init__() - # Use the local converter from rpy2 to load in the R function given by the user - with local_converter(default_converter): - self._func = robjects.r(r_code) - - def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: - """ - Process an input Tuple from the given link. - - :param tuple_: Tuple, a Tuple from an input port to be processed. - :param port: int, input port index of the current Tuple. - :return: Iterator[Optional[TupleLike]], producing one TupleLike object at a - time, or None. - """ - with local_converter(arrow_converter): - input_schema: pa.Schema = tuple_._schema.as_arrow_schema() - input_fields: list[str] = [field.name for field in input_schema] - non_binary_fields: list[str] = [ - field.name for field in input_schema if field.type != pa.binary() - ] - binary_fields: list[str] = [ - field.name for field in input_schema if field.type == pa.binary() - ] - - non_binary_pyarrow_array: pa.StructArray = pa.array([], type=pa.struct([])) - if non_binary_fields: - non_binary_tuple: Tuple = tuple_.get_partial_tuple(non_binary_fields) - non_binary_tuple_schema: pa.Schema = ( - non_binary_tuple._schema.as_arrow_schema() - ) - non_binary_pyarrow_array: pa.StructArray = pa.array( - [non_binary_tuple.as_dict()], - type=pa.struct(non_binary_tuple_schema), - ) - - binary_r_list: dict[str, object] = {} - if binary_fields: - binary_tuple: Tuple = tuple_.get_partial_tuple(binary_fields) - for k, v in binary_tuple.as_dict().items(): - if isinstance(v, bytes): - binary_r_list[k] = pickle.loads(v[10:]) - elif isinstance(v, datetime.datetime): - binary_r_list[k] = robjects.vectors.POSIXct.sexp_from_datetime( - [v] - ) - else: - binary_r_list[k] = v - - binary_r_list: rpy2.robjects.ListVector = robjects.vectors.ListVector( - binary_r_list - ) - - input_r_list: rpy2.robjects.ListVector = ( - RTupleExecutor._combine_binary_and_non_binary_lists( - non_binary_pyarrow_array, binary_r_list - ) - ) - - output_r_generator: rpy2.robjects.SignatureTranslatedFunction = self._func( - input_r_list, port - ) - - while True: - output_py_tuple: Tuple = r_utils.extract_tuple_from_r( - output_r_generator, False, input_fields - ) - yield output_py_tuple if output_py_tuple is not None else None - if output_py_tuple is None: - break - - -class RTupleSourceExecutor(SourceOperator): - """ - A source operator that produces a generator that yields R Lists using R code. - R Lists are R's representation of a Tuple - """ - - is_source = True - - def __init__(self, r_code: str): - """ - Initialize the RTupleSourceExecutor with R code. - - Args: - r_code (str): R code to be executed. - """ - super().__init__() - # Use the local converter from rpy2 to load in the R function given by the user - with local_converter(default_converter): - self._func = robjects.r(r_code) - - def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: - """ - Produce Tuples using the provided R generator returned by the UDF. - The returned R generator is an iterator - that yields R Lists (R's representation of Tuple) - Used by the source operator only. - - :return: Iterator[Union[TupleLike, TableLike, None]], producing - one TupleLike object, one TableLike object, or None, at a time. - """ - with local_converter(arrow_converter): - output_r_generator: rpy2.robjects.SignatureTranslatedFunction = self._func() - while True: - output_py_tuple: Tuple = r_utils.extract_tuple_from_r( - output_r_generator, True - ) - yield output_py_tuple if output_py_tuple is not None else None - if output_py_tuple is None: - break diff --git a/amber/src/main/python/core/models/r_utils.py b/amber/src/main/python/core/models/r_utils.py deleted file mode 100644 index d1f6e512bd..0000000000 --- a/amber/src/main/python/core/models/r_utils.py +++ /dev/null @@ -1,79 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import rpy2 -import rpy2.rinterface as rinterface -import rpy2.robjects as robjects -import warnings - -from core.models import Tuple - -warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*") - - -def convert_r_to_py(value: rpy2.robjects): - """ - :param value: A value that is from one of rpy2's many types (from rpy2.robjects) - :return: A Python representation of the value, if convertable. - If not, it returns the value itself - """ - if isinstance(value, robjects.vectors.BoolVector): - return bool(value[0]) - if isinstance(value, robjects.vectors.IntVector): - return int(value[0]) - if isinstance(value, robjects.vectors.FloatVector): - if isinstance(value, robjects.vectors.POSIXct): - return next(value.iter_localized_datetime()) - else: - return float(value[0]) - if isinstance(value, robjects.vectors.StrVector): - return str(value[0]) - return value - - -def extract_tuple_from_r( - output_r_generator: rpy2.robjects.SignatureTranslatedFunction, - source_operator: bool, - input_fields: [None, list[str]] = None, -) -> [Tuple, None]: - output_r_tuple: rpy2.robjects.ListVector = output_r_generator() - if ( - isinstance(output_r_tuple, rinterface.SexpSymbol) - and str(output_r_tuple) == ".__exhausted__." - ) or isinstance(output_r_tuple.names, rpy2.rinterface_lib.sexp.NULLType): - return None - - output_python_dict: dict[str, object] = {} - if source_operator: - output_python_dict = { - key: output_r_tuple.rx2(key) for key in output_r_tuple.names - } - else: - diff_fields: list[str] = [ - field_name - for field_name in output_r_tuple.names - if field_name not in input_fields - ] - output_python_dict: dict[str, object] = { - key: output_r_tuple.rx2(key) for key in (input_fields + diff_fields) - } - - output_python_dict: dict[str, object] = { - key: convert_r_to_py(value) for key, value in output_python_dict.items() - } - - return Tuple(output_python_dict) diff --git a/amber/src/main/python/core/models/test_RTableExecutor.py b/amber/src/main/python/core/models/test_RTableExecutor.py deleted file mode 100644 index 1e584ab40d..0000000000 --- a/amber/src/main/python/core/models/test_RTableExecutor.py +++ /dev/null @@ -1,539 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pandas -import pytest -import rpy2.rinterface_lib.embedded - -from core.models import Tuple, Table -from core.models.RTableExecutor import RTableSourceExecutor, RTableExecutor - - -class TestRTableExecutor: - @pytest.fixture - def source_executor_empty(self): - return """ - function() { - df <- data.frame( - col1 = character(), - col2 = numeric(), - col3 = logical() - ) - return (df) - } - """ - - @pytest.fixture - def source_executor_NA(self): - # This should work with no issues - # since you can store NA in a data.frame - # and then convert the data.frame to Arrow Table with - # one Tuple with value Tuple({"source_output": None}) - return """ - function() { - return (NA) - } - """ - - @pytest.fixture - def udf_executor_return_NA(self): - # This should fail since the conversion back - # to Arrow will be impossible - return """ - function(table, port) { - return (NA) - } - """ - - @pytest.fixture - def udf_executor_simple_return(self): - return """ - function(table, port) { - return (table) - } - """ - - @pytest.fixture - def udf_executor_empty_add_row(self): - return """ - function(table, port) { - new_row <- data.frame( - col1 = "TEST", - col2 = 12.3, - col3 = TRUE - ) - table <- rbind(table, new_row) - return (table) - } - """ - - @pytest.fixture - def source_executor_null_values(self): - return """ - function() { - df <- data.frame( - col1 = character(), - col2 = numeric(), - col3 = logical() - ) - df[1:3,] <- NA - return (df) - } - """ - - @pytest.fixture - def udf_executor_null_values_return(self): - return """ - function(table, port) { - return (table) - } - """ - - @pytest.fixture - def udf_executor_null_values_add_row(self): - return """ - function(table, port) { - new_row <- data.frame( - col1 = NA, - col2 = NA, - col3 = NA - ) - table <- rbind(table, new_row) - return (table) - } - """ - - @pytest.fixture - def target_tuples_null_values(self): - tuple_1 = Tuple({"col1": None, "col2": None, "col3": None}) - tuple_2 = Tuple({"col1": None, "col2": None, "col3": None}) - tuple_3 = Tuple({"col1": None, "col2": None, "col3": None}) - return [tuple_1, tuple_2, tuple_3] - - @pytest.fixture - def pandas_target_df_simple(self): - data = { - "Name": ["Alice", "Bob", "Charlie"], - "Age": [25, 30, 35], - "City": ["New York", "Los Angeles", "Chicago"], - } - df = pandas.DataFrame(data) - return df - - @pytest.fixture - def target_tuples_simple(self, pandas_target_df_simple): - tuples = [] - for index, row in pandas_target_df_simple.iterrows(): - tuples.append(Tuple(row)) - return tuples - - @pytest.fixture - def source_executor_simple(self): - return """ - function() { - df <- data.frame( - Name = c("Alice", "Bob", "Charlie"), - Age = c(25, 30, 35), - City = c("New York", "Los Angeles", "Chicago") - ) - return (df) - } - """ - - @pytest.fixture - def udf_executor_simple_extract_row(self): - return """ - function(table, port) { - bob_row <- table[table$Name == "Bob", ] - return (bob_row) - } - """ - - @pytest.fixture - def udf_executor_simple_update_row(self): - return """ - function(table, port) { - table[table$Name == "Bob", "Age"] <- 18 - return (table) - } - """ - - @pytest.fixture - def udf_executor_simple_add_row(self): - return """ - function(table, port) { - new_row <- list(Name = "Test", Age = 0, City = "Irvine") - new_df <- rbind(table, new_row) - return (new_df) - } - """ - - @pytest.fixture - def source_executor_df_fail(self): - # This Source UDF should raise a TypeError since it cannot - # be converted into a Table-like object - return """ - function() { - glm_model <- glm(mpg ~ wt, data = mtcars, family = gaussian) - return (glm_model) - } - """ - - @pytest.fixture - def target_tuples_like_type(self): - tuple_1 = Tuple({"C.1": 1, "C.2": 2, "C.3": 3}) - tuple_2 = Tuple({"C.1": 11, "C.2": 12, "C.3": 13}) - return [tuple_1, tuple_2] - - @pytest.fixture - def source_executor_df_like_type(self): - return """ - function() { - mdat <- matrix(c(1,2,3, 11,12,13), nrow = 2, ncol = 3, byrow = TRUE, - dimnames = list(c("row1", "row2"), - c("C.1", "C.2", "C.3"))) - return (mdat) - } - """ - - @pytest.fixture - def udf_executor_df_like_type_add_row(self): - return """ - function(table, port) { - # Adding a new row - new_row <- c(4, 5, 6) - table <- rbind(table, new_row) - - return (table) - } - """ - - @pytest.fixture - def udf_executor_df_like_type_add_col(self): - return """ - function(table, port) { - # Adding a new col - new_col <- c("AAA", "BBB") - table <- cbind(table, new_col) - - return (table) - } - """ - - def test_source_executor_empty(self, source_executor_empty): - source_executor = RTableSourceExecutor(source_executor_empty) - output = source_executor.produce() - tuples = [tup for tup in output] - assert len(tuples) == 0 - - output_tbl = Table(tuples) - assert output_tbl == Table([]) - - def test_source_executor_NA(self, source_executor_NA): - source_executor = RTableSourceExecutor(source_executor_NA) - output = source_executor.produce() - tuples = [tup for tup in output] - assert len(tuples) == 1 - - output_tbl = Table(tuples) - assert output_tbl == Table([Tuple({"source_output": None})]) - - def test_udf_executor_return_NA_fail( - self, source_executor_empty, udf_executor_return_NA - ): - source_executor = RTableSourceExecutor(source_executor_empty) - input_tbl = Table([tup for tup in source_executor.produce()]) - - with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _: - udf_executor = RTableExecutor(udf_executor_return_NA) - output = udf_executor.process_table(input_tbl, 0) - tuples = [out for out in output] - assert tuples is None - - def test_udf_executor_empty_return( - self, source_executor_empty, udf_executor_simple_return - ): - source_executor = RTableSourceExecutor(source_executor_empty) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_return) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - assert len(tuples) == 0 - - output_tbl = Table(tuples) - assert output_tbl == Table([]) - assert output_tbl == input_tbl - - def test_udf_executor_empty_add_row( - self, source_executor_empty, udf_executor_empty_add_row - ): - source_executor = RTableSourceExecutor(source_executor_empty) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_empty_add_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"col1": "TEST", "col2": 12.3, "col3": True}) - assert len(tuples) == 1 - - output_tbl = Table(tuples) - assert output_tbl == Table([target_tuple]) - - def test_source_executor_null_values( - self, source_executor_null_values, target_tuples_null_values - ): - source_executor = RTableSourceExecutor(source_executor_null_values) - output = source_executor.produce() - tuples = [tup for tup in output] - assert len(tuples) == 3 - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_null_values) - - def test_udf_executor_null_values_return( - self, - source_executor_null_values, - udf_executor_null_values_return, - target_tuples_null_values, - ): - source_executor = RTableSourceExecutor(source_executor_null_values) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_null_values_return) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - assert len(tuples) == 3 - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_null_values) - - def test_udf_executor_null_values_add_row( - self, - source_executor_null_values, - udf_executor_null_values_add_row, - target_tuples_null_values, - ): - source_executor = RTableSourceExecutor(source_executor_null_values) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_null_values_add_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"col1": None, "col2": None, "col3": None}) - assert len(tuples) == 4 - assert tuples[3] == target_tuple - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_null_values + [target_tuple]) - - def test_source_executor_simple(self, source_executor_simple, target_tuples_simple): - source_executor = RTableSourceExecutor(source_executor_simple) - output = source_executor.produce() - - tuples = [tup for tup in output] - assert len(tuples) == 3 - - for idx, v in enumerate(tuples): - assert v == target_tuples_simple[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_simple) - - def test_udf_executor_simple( - self, source_executor_simple, udf_executor_simple_return, target_tuples_simple - ): - source_executor = RTableSourceExecutor(source_executor_simple) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_return) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - assert len(tuples) == 3 - - for idx, v in enumerate(tuples): - assert v == target_tuples_simple[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_simple) - assert output_tbl == input_tbl - - def test_udf_executor_simple_extract_row( - self, - source_executor_simple, - udf_executor_simple_extract_row, - target_tuples_simple, - ): - source_executor = RTableSourceExecutor(source_executor_simple) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_extract_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"Name": "Bob", "Age": 30, "City": "Los Angeles"}) - assert len(tuples) == 1 - assert tuples[0] == target_tuple - - output_tbl = Table(tuples) - assert output_tbl == Table([target_tuple]) - - def test_udf_executor_simple_update_row( - self, - source_executor_simple, - udf_executor_simple_update_row, - target_tuples_simple, - ): - source_executor = RTableSourceExecutor(source_executor_simple) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_update_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"Name": "Bob", "Age": 18, "City": "Los Angeles"}) - assert len(tuples) == 3 - - for idx, v in enumerate(tuples): - if idx == 1: - assert v == target_tuple - else: - assert v == target_tuples_simple[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table( - [target_tuples_simple[0], target_tuple, target_tuples_simple[2]] - ) - - def test_udf_executor_simple_add_row( - self, source_executor_simple, udf_executor_simple_add_row, target_tuples_simple - ): - source_executor = RTableSourceExecutor(source_executor_simple) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_add_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"Name": "Test", "Age": 0, "City": "Irvine"}) - assert len(tuples) == 4 - - for idx, v in enumerate(tuples): - if idx == len(tuples) - 1: - assert v == target_tuple - else: - assert v == target_tuples_simple[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table( - [tup for tup in target_tuples_simple] + [target_tuple] - ) - - def test_source_executor_fail(self, source_executor_df_fail): - source_executor = RTableSourceExecutor(source_executor_df_fail) - with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _: - output = source_executor.produce() - output = [out for out in output] - - def test_source_executor_df_like_type( - self, source_executor_df_like_type, target_tuples_like_type - ): - source_executor = RTableSourceExecutor(source_executor_df_like_type) - output = source_executor.produce() - - tuples = [tup for tup in output] - assert len(tuples) == 2 - - for idx, v in enumerate(tuples): - assert v == target_tuples_like_type[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_like_type) - - def test_udf_executor_df_like_type( - self, - source_executor_df_like_type, - udf_executor_simple_return, - target_tuples_like_type, - ): - source_executor = RTableSourceExecutor(source_executor_df_like_type) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_simple_return) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - assert len(tuples) == 2 - - for idx, v in enumerate(tuples): - assert v == target_tuples_like_type[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_like_type) - assert output_tbl == input_tbl - - def test_udf_executor_df_like_type_add_row( - self, - source_executor_df_like_type, - udf_executor_df_like_type_add_row, - target_tuples_like_type, - ): - source_executor = RTableSourceExecutor(source_executor_df_like_type) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_df_like_type_add_row) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuple = Tuple({"C.1": 4, "C.2": 5, "C.3": 6}) - assert len(tuples) == 3 - - for idx, v in enumerate(tuples): - if idx == len(tuples) - 1: - assert v == target_tuple - else: - assert v == target_tuples_like_type[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples_like_type + [target_tuple]) - - def test_udf_executor_df_like_type_add_col( - self, source_executor_df_like_type, udf_executor_df_like_type_add_col - ): - source_executor = RTableSourceExecutor(source_executor_df_like_type) - input_tbl = Table([tup for tup in source_executor.produce()]) - - udf_executor = RTableExecutor(udf_executor_df_like_type_add_col) - output = udf_executor.process_table(input_tbl, 0) - - tuples = [tup for tup in output] - target_tuples = [ - Tuple({"C.1": 1, "C.2": 2, "C.3": 3, "new_col": "AAA"}), - Tuple({"C.1": 11, "C.2": 12, "C.3": 13, "new_col": "BBB"}), - ] - - assert len(tuples) == 2 - for idx, v in enumerate(tuples): - assert v == target_tuples[idx] - - output_tbl = Table(tuples) - assert output_tbl == Table(target_tuples)
