This is an automated email from the ASF dual-hosted git repository.
chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 034cf7c844 chore: Drop Runtime Support for R-UDF (#4090)
034cf7c844 is described below
commit 034cf7c8440d5ae295a4280cb2cb2396798b1aff
Author: Yicong Huang <[email protected]>
AuthorDate: Fri Dec 5 20:47:56 2025 -0800
chore: Drop Runtime Support for R-UDF (#4090)
### What changes were proposed in this PR?
Removed official support for R-UDF. The frontend is not changed, but
during execution user will receive an error about unofficially supported
R-UDF. We plan to move the R-UDF to a third party hosted repo, so users
can install the R-UDF support as a plugin.
### Any related issues, documentation, discussions?
This change was due to the fact that R-UDF runtime requires `rpy2`,
which is not apache-license friendly.
resolves #4084
### How was this PR tested?
Added test suite `TestExecutorManager`.
### Was this PR authored or co-authored using generative AI tooling?
Tests generated by Cursor.
---------
Co-authored-by: Yicong Huang <[email protected]>
Co-authored-by: Chen Li <[email protected]>
---
.github/workflows/github-action-build.yml | 2 +-
amber/r-requirements.txt | 19 -
.../core/architecture/managers/executor_manager.py | 31 +-
.../architecture/managers/test_executor_manager.py | 174 +++++++
.../src/main/python/core/models/RTableExecutor.py | 132 -----
.../src/main/python/core/models/RTupleExecutor.py | 167 -------
amber/src/main/python/core/models/r_utils.py | 79 ---
.../main/python/core/models/test_RTableExecutor.py | 539 ---------------------
8 files changed, 182 insertions(+), 961 deletions(-)
diff --git a/.github/workflows/github-action-build.yml
b/.github/workflows/github-action-build.yml
index 5264bd454b..ff533a109f 100644
--- a/.github/workflows/github-action-build.yml
+++ b/.github/workflows/github-action-build.yml
@@ -157,4 +157,4 @@ jobs:
cd amber/src/main/python && flake8 && black . --check
- name: Test with pytest
run: |
- cd amber/src/main/python && pytest -sv
--ignore=core/models/test_RTableExecutor.py
+ cd amber/src/main/python && pytest -sv
diff --git a/amber/r-requirements.txt b/amber/r-requirements.txt
deleted file mode 100644
index 1216cdce4f..0000000000
--- a/amber/r-requirements.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-rpy2==3.5.11
-rpy2-arrow==0.0.8
\ No newline at end of file
diff --git
a/amber/src/main/python/core/architecture/managers/executor_manager.py
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index 9693041975..fc07ce74f5 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -132,30 +132,13 @@ class ExecutorManager:
:param language: The language of the operator code.
:return:
"""
- if language == "r-tuple":
- # Have to import it here and not at the top in case R_HOME from
udf.conf
- # is not defined, otherwise an error will occur
- # If R_HOME is not defined and rpy2 cannot find the
- # R_HOME environment variable, an error will occur here
- from core.models.RTupleExecutor import RTupleSourceExecutor,
RTupleExecutor
-
- self.executor = (
- RTupleSourceExecutor(code) if is_source else
RTupleExecutor(code)
- )
- elif language == "r-table":
- # Have to import it here and not at the top in case R_HOME from
udf.conf
- # is not defined, otherwise an error will occur
- # If R_HOME is not defined and rpy2 cannot find the
- # R_HOME environment variable, an error will occur here
- from core.models.RTableExecutor import RTableSourceExecutor,
RTableExecutor
-
- self.executor = (
- RTableSourceExecutor(code) if is_source else
RTableExecutor(code)
- )
- else:
- executor: type(Operator) = self.load_executor_definition(code)
- self.executor = executor()
- self.executor.is_source = is_source
+ assert language not in [
+ "r-tuple",
+ "r-table",
+ ], "R language is not supported by default. Please consult third party
plugin."
+ executor: type(Operator) = self.load_executor_definition(code)
+ self.executor = executor()
+ self.executor.is_source = is_source
assert (
isinstance(self.executor, SourceOperator) ==
self.executor.is_source
), "Please use SourceOperator API for source operators."
diff --git
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
new file mode 100644
index 0000000000..7728c509b4
--- /dev/null
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from core.architecture.managers.executor_manager import ExecutorManager
+
+
+# Sample operator code for testing
+SAMPLE_OPERATOR_CODE = """
+from pytexera import *
+
+class TestOperator(UDFOperatorV2):
+ def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ yield tuple_
+"""
+
+SAMPLE_SOURCE_OPERATOR_CODE = """
+from pytexera import *
+
+class TestSourceOperator(UDFSourceOperator):
+ def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
+ yield Tuple({"test": "data"})
+"""
+
+
+class TestExecutorManager:
+ """Test suite for ExecutorManager, focusing on R UDF support removal."""
+
+ @pytest.fixture
+ def executor_manager(self):
+ """Create a fresh ExecutorManager instance for each test."""
+ manager = ExecutorManager()
+ yield manager
+ # Cleanup: close the temp filesystem
+ if hasattr(manager, "_fs"):
+ manager.close()
+
+ def test_initialization(self, executor_manager):
+ """Test that ExecutorManager initializes correctly."""
+ assert executor_manager.executor is None
+ assert executor_manager.operator_module_name is None
+ assert executor_manager.executor_version == 0
+
+ def test_reject_r_tuple_language(self, executor_manager):
+ """Test that 'r-tuple' language is rejected with AssertionError."""
+ with pytest.raises(AssertionError) as exc_info:
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-tuple"
+ )
+
+ # Verify the error message mentions R UDF support has been dropped
+ assert "not supported" in str(exc_info.value) or "dropped" in str(
+ exc_info.value
+ )
+
+ def test_reject_r_table_language(self, executor_manager):
+ """Test that 'r-table' language is rejected with AssertionError."""
+ with pytest.raises(AssertionError) as exc_info:
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE, is_source=False, language="r-table"
+ )
+
+ # Verify the error message mentions R UDF support has been dropped
+ assert "not supported" in str(exc_info.value) or "dropped" in str(
+ exc_info.value
+ )
+
+ def test_accept_python_language_regular_operator(self, executor_manager):
+ """Test that 'python' language is accepted for regular operators."""
+ # This should not raise any assertion error
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
+ )
+
+ # Verify executor was initialized
+ assert executor_manager.executor is not None
+ assert executor_manager.operator_module_name == "udf-v1"
+ assert executor_manager.executor_version == 1
+ assert executor_manager.executor.is_source is False
+
+ def test_accept_python_language_source_operator(self, executor_manager):
+ """Test that 'python' language is accepted for source operators."""
+ # This should not raise any assertion error
+ executor_manager.initialize_executor(
+ code=SAMPLE_SOURCE_OPERATOR_CODE, is_source=True, language="python"
+ )
+
+ # Verify executor was initialized
+ assert executor_manager.executor is not None
+ assert executor_manager.operator_module_name == "udf-v1"
+ assert executor_manager.executor_version == 1
+ assert executor_manager.executor.is_source is True
+
+ def test_reject_other_unsupported_languages(self, executor_manager):
+ """Test that other arbitrary languages still work (no R-specific
check)."""
+ # Languages other than r-tuple and r-table should be allowed to pass
+ # the assertion, though they may fail at code execution
+ try:
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE,
+ is_source=False,
+ language="javascript", # arbitrary language
+ )
+ # If we get here, the assertion passed (which is correct behavior)
+ # But the code execution might fail, which is fine
+ except AssertionError:
+ # Should NOT raise AssertionError for non-R languages
+ pytest.fail("Should not raise AssertionError for non-R languages")
+ except Exception:
+ # Other exceptions (like import errors) are expected and acceptable
+ pass
+
+ def test_gen_module_file_name_increments(self, executor_manager):
+ """Test that module file names increment correctly."""
+ module1, file1 = executor_manager.gen_module_file_name()
+ assert module1 == "udf-v1"
+ assert file1 == "udf-v1.py"
+
+ module2, file2 = executor_manager.gen_module_file_name()
+ assert module2 == "udf-v2"
+ assert file2 == "udf-v2.py"
+
+ module3, file3 = executor_manager.gen_module_file_name()
+ assert module3 == "udf-v3"
+ assert file3 == "udf-v3.py"
+
+ def test_is_concrete_operator_static_method(self):
+ """Test the is_concrete_operator static method."""
+ from core.models import TupleOperatorV2
+
+ # Should return True for concrete operator classes
+ # Note: We can't easily test with actual concrete classes here without
imports
+ # This test just verifies the method exists and is callable
+ assert hasattr(ExecutorManager, "is_concrete_operator")
+ assert callable(ExecutorManager.is_concrete_operator)
+
+ # Test with non-class
+ assert ExecutorManager.is_concrete_operator("not a class") is False
+ assert ExecutorManager.is_concrete_operator(123) is False
+
+ # Test with abstract base classes (TupleOperatorV2 has abstract
methods)
+ assert ExecutorManager.is_concrete_operator(TupleOperatorV2) is False
+
+ def test_regular_operator_is_not_source(self, executor_manager):
+ """Test that regular operator with is_source=False works correctly."""
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
+ )
+ assert executor_manager.executor.is_source is False
+
+ def test_source_operator_mismatch_raises_error(self, executor_manager):
+ """Test that mismatched source operator flag raises AssertionError."""
+ with pytest.raises(AssertionError) as exc_info:
+ executor_manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE,
+ is_source=True, # Wrong: regular operator but marked as source
+ language="python",
+ )
+ assert "SourceOperator API" in str(exc_info.value)
diff --git a/amber/src/main/python/core/models/RTableExecutor.py
b/amber/src/main/python/core/models/RTableExecutor.py
deleted file mode 100644
index db8610e6f2..0000000000
--- a/amber/src/main/python/core/models/RTableExecutor.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-import rpy2.robjects as robjects
-import typing
-from rpy2.robjects import default_converter
-from rpy2.robjects.conversion import localconverter as local_converter
-from rpy2_arrow.arrow import rarrow_to_py_table, converter as arrow_converter
-from typing import Iterator, Optional, Union
-
-from core.models import ArrowTableTupleProvider, Tuple, TupleLike, Table,
TableLike
-from core.models.operator import SourceOperator, TableOperator
-
-
-class RTableExecutor(TableOperator):
- """
- An executor that can execute R code on Arrow tables.
- """
-
- is_source = False
-
- _arrow_to_r_dataframe = robjects.r(
- "function(table) { return (as.data.frame(table)) }"
- )
-
- _r_dataframe_to_arrow = robjects.r(
- """
- library(arrow)
- function(df) { return (arrow::as_arrow_table(df)) }
- """
- )
-
- def __init__(self, r_code: str):
- """
- Initialize the RTableExecutor with R code.
-
- Args:
- r_code (str): R code to be executed.
- """
- super().__init__()
- with local_converter(default_converter):
- self._func: typing.Callable[[pa.Table], pa.Table] =
robjects.r(r_code)
-
- def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
- """
- Process an input Table using the provided R function.
- The Table is represented as a pandas.DataFrame.
-
- :param table: Table, a table to be processed.
- :param port: int, input port index of the current Tuple.
- Currently unused in R-UDF
- :return: Iterator[Optional[TableLike]], producing one TableLike object
at a
- time, or None.
- """
- input_pyarrow_table = pa.Table.from_pandas(table)
- with local_converter(arrow_converter):
- input_r_dataframe = RTableExecutor._arrow_to_r_dataframe(
- input_pyarrow_table
- )
- output_r_dataframe = self._func(input_r_dataframe, port)
- output_rarrow_table = RTableExecutor._r_dataframe_to_arrow(
- output_r_dataframe
- )
- output_pyarrow_table = rarrow_to_py_table(output_rarrow_table)
-
- for field_accessor in ArrowTableTupleProvider(output_pyarrow_table):
- yield Tuple(
- {name: field_accessor for name in
output_pyarrow_table.column_names}
- )
-
-
-class RTableSourceExecutor(SourceOperator):
- """
- A source operator that produces an R Table or Table-like object using R
code.
- """
-
- is_source = True
- _source_output_to_arrow = robjects.r(
- """
- library(arrow)
- function(source_output) {
- return (arrow::as_arrow_table(as.data.frame(source_output)))
- }
- """
- )
-
- def __init__(self, r_code: str):
- """
- Initialize the RTableSourceExecutor with R code.
-
- Args:
- r_code (str): R code to be executed.
- """
- super().__init__()
- # Use the local converter from rpy2 to load in the R function given by
the user
- with local_converter(default_converter):
- self._func = robjects.r(r_code)
-
- def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
- """
- Produce Table using the provided R function.
- Used by the source operator only.
-
- :return: Iterator[Union[TupleLike, TableLike, None]], producing
- one TupleLike object, one TableLike object, or None, at a time.
- """
- with local_converter(arrow_converter):
- output_table = self._func()
- output_rarrow_table = RTableSourceExecutor._source_output_to_arrow(
- output_table
- )
- output_pyarrow_table = rarrow_to_py_table(output_rarrow_table)
-
- for field_accessor in ArrowTableTupleProvider(output_pyarrow_table):
- yield Tuple(
- {name: field_accessor for name in
output_pyarrow_table.column_names}
- )
diff --git a/amber/src/main/python/core/models/RTupleExecutor.py
b/amber/src/main/python/core/models/RTupleExecutor.py
deleted file mode 100644
index 67f05fb2d3..0000000000
--- a/amber/src/main/python/core/models/RTupleExecutor.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import pickle
-import pyarrow as pa
-import rpy2
-import rpy2.robjects as robjects
-import warnings
-from rpy2.robjects import default_converter
-from rpy2.robjects.conversion import localconverter as local_converter
-from rpy2_arrow.arrow import converter as arrow_converter
-from typing import Iterator, Optional, Union
-
-from core.models import Tuple, TupleLike, TableLike, r_utils
-from core.models.operator import SourceOperator, TupleOperatorV2
-
-warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*")
-
-
-class RTupleExecutor(TupleOperatorV2):
- """
- An operator that can execute R code on R Lists (R's representation of a
Tuple)
- """
-
- is_source = False
-
- _combine_binary_and_non_binary_lists = robjects.r(
- """
- function(non_binary_list, binary_list) {
- non_binary_list <- as.list(non_binary_list$as_vector())
- return (c(non_binary_list, binary_list))
- }
- """
- )
-
- def __init__(self, r_code: str):
- """
- Initialize the RTupleExecutor with R code.
-
- Args:
- r_code (str): R code to be executed.
- """
- super().__init__()
- # Use the local converter from rpy2 to load in the R function given by
the user
- with local_converter(default_converter):
- self._func = robjects.r(r_code)
-
- def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- """
- Process an input Tuple from the given link.
-
- :param tuple_: Tuple, a Tuple from an input port to be processed.
- :param port: int, input port index of the current Tuple.
- :return: Iterator[Optional[TupleLike]], producing one TupleLike object
at a
- time, or None.
- """
- with local_converter(arrow_converter):
- input_schema: pa.Schema = tuple_._schema.as_arrow_schema()
- input_fields: list[str] = [field.name for field in input_schema]
- non_binary_fields: list[str] = [
- field.name for field in input_schema if field.type !=
pa.binary()
- ]
- binary_fields: list[str] = [
- field.name for field in input_schema if field.type ==
pa.binary()
- ]
-
- non_binary_pyarrow_array: pa.StructArray = pa.array([],
type=pa.struct([]))
- if non_binary_fields:
- non_binary_tuple: Tuple =
tuple_.get_partial_tuple(non_binary_fields)
- non_binary_tuple_schema: pa.Schema = (
- non_binary_tuple._schema.as_arrow_schema()
- )
- non_binary_pyarrow_array: pa.StructArray = pa.array(
- [non_binary_tuple.as_dict()],
- type=pa.struct(non_binary_tuple_schema),
- )
-
- binary_r_list: dict[str, object] = {}
- if binary_fields:
- binary_tuple: Tuple = tuple_.get_partial_tuple(binary_fields)
- for k, v in binary_tuple.as_dict().items():
- if isinstance(v, bytes):
- binary_r_list[k] = pickle.loads(v[10:])
- elif isinstance(v, datetime.datetime):
- binary_r_list[k] =
robjects.vectors.POSIXct.sexp_from_datetime(
- [v]
- )
- else:
- binary_r_list[k] = v
-
- binary_r_list: rpy2.robjects.ListVector =
robjects.vectors.ListVector(
- binary_r_list
- )
-
- input_r_list: rpy2.robjects.ListVector = (
- RTupleExecutor._combine_binary_and_non_binary_lists(
- non_binary_pyarrow_array, binary_r_list
- )
- )
-
- output_r_generator: rpy2.robjects.SignatureTranslatedFunction =
self._func(
- input_r_list, port
- )
-
- while True:
- output_py_tuple: Tuple = r_utils.extract_tuple_from_r(
- output_r_generator, False, input_fields
- )
- yield output_py_tuple if output_py_tuple is not None else None
- if output_py_tuple is None:
- break
-
-
-class RTupleSourceExecutor(SourceOperator):
- """
- A source operator that produces a generator that yields R Lists using R
code.
- R Lists are R's representation of a Tuple
- """
-
- is_source = True
-
- def __init__(self, r_code: str):
- """
- Initialize the RTupleSourceExecutor with R code.
-
- Args:
- r_code (str): R code to be executed.
- """
- super().__init__()
- # Use the local converter from rpy2 to load in the R function given by
the user
- with local_converter(default_converter):
- self._func = robjects.r(r_code)
-
- def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
- """
- Produce Tuples using the provided R generator returned by the UDF.
- The returned R generator is an iterator
- that yields R Lists (R's representation of Tuple)
- Used by the source operator only.
-
- :return: Iterator[Union[TupleLike, TableLike, None]], producing
- one TupleLike object, one TableLike object, or None, at a time.
- """
- with local_converter(arrow_converter):
- output_r_generator: rpy2.robjects.SignatureTranslatedFunction =
self._func()
- while True:
- output_py_tuple: Tuple = r_utils.extract_tuple_from_r(
- output_r_generator, True
- )
- yield output_py_tuple if output_py_tuple is not None else None
- if output_py_tuple is None:
- break
diff --git a/amber/src/main/python/core/models/r_utils.py
b/amber/src/main/python/core/models/r_utils.py
deleted file mode 100644
index d1f6e512bd..0000000000
--- a/amber/src/main/python/core/models/r_utils.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import rpy2
-import rpy2.rinterface as rinterface
-import rpy2.robjects as robjects
-import warnings
-
-from core.models import Tuple
-
-warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*")
-
-
-def convert_r_to_py(value: rpy2.robjects):
- """
- :param value: A value that is from one of rpy2's many types (from
rpy2.robjects)
- :return: A Python representation of the value, if convertable.
- If not, it returns the value itself
- """
- if isinstance(value, robjects.vectors.BoolVector):
- return bool(value[0])
- if isinstance(value, robjects.vectors.IntVector):
- return int(value[0])
- if isinstance(value, robjects.vectors.FloatVector):
- if isinstance(value, robjects.vectors.POSIXct):
- return next(value.iter_localized_datetime())
- else:
- return float(value[0])
- if isinstance(value, robjects.vectors.StrVector):
- return str(value[0])
- return value
-
-
-def extract_tuple_from_r(
- output_r_generator: rpy2.robjects.SignatureTranslatedFunction,
- source_operator: bool,
- input_fields: [None, list[str]] = None,
-) -> [Tuple, None]:
- output_r_tuple: rpy2.robjects.ListVector = output_r_generator()
- if (
- isinstance(output_r_tuple, rinterface.SexpSymbol)
- and str(output_r_tuple) == ".__exhausted__."
- ) or isinstance(output_r_tuple.names, rpy2.rinterface_lib.sexp.NULLType):
- return None
-
- output_python_dict: dict[str, object] = {}
- if source_operator:
- output_python_dict = {
- key: output_r_tuple.rx2(key) for key in output_r_tuple.names
- }
- else:
- diff_fields: list[str] = [
- field_name
- for field_name in output_r_tuple.names
- if field_name not in input_fields
- ]
- output_python_dict: dict[str, object] = {
- key: output_r_tuple.rx2(key) for key in (input_fields +
diff_fields)
- }
-
- output_python_dict: dict[str, object] = {
- key: convert_r_to_py(value) for key, value in
output_python_dict.items()
- }
-
- return Tuple(output_python_dict)
diff --git a/amber/src/main/python/core/models/test_RTableExecutor.py
b/amber/src/main/python/core/models/test_RTableExecutor.py
deleted file mode 100644
index 1e584ab40d..0000000000
--- a/amber/src/main/python/core/models/test_RTableExecutor.py
+++ /dev/null
@@ -1,539 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pandas
-import pytest
-import rpy2.rinterface_lib.embedded
-
-from core.models import Tuple, Table
-from core.models.RTableExecutor import RTableSourceExecutor, RTableExecutor
-
-
-class TestRTableExecutor:
- @pytest.fixture
- def source_executor_empty(self):
- return """
- function() {
- df <- data.frame(
- col1 = character(),
- col2 = numeric(),
- col3 = logical()
- )
- return (df)
- }
- """
-
- @pytest.fixture
- def source_executor_NA(self):
- # This should work with no issues
- # since you can store NA in a data.frame
- # and then convert the data.frame to Arrow Table with
- # one Tuple with value Tuple({"source_output": None})
- return """
- function() {
- return (NA)
- }
- """
-
- @pytest.fixture
- def udf_executor_return_NA(self):
- # This should fail since the conversion back
- # to Arrow will be impossible
- return """
- function(table, port) {
- return (NA)
- }
- """
-
- @pytest.fixture
- def udf_executor_simple_return(self):
- return """
- function(table, port) {
- return (table)
- }
- """
-
- @pytest.fixture
- def udf_executor_empty_add_row(self):
- return """
- function(table, port) {
- new_row <- data.frame(
- col1 = "TEST",
- col2 = 12.3,
- col3 = TRUE
- )
- table <- rbind(table, new_row)
- return (table)
- }
- """
-
- @pytest.fixture
- def source_executor_null_values(self):
- return """
- function() {
- df <- data.frame(
- col1 = character(),
- col2 = numeric(),
- col3 = logical()
- )
- df[1:3,] <- NA
- return (df)
- }
- """
-
- @pytest.fixture
- def udf_executor_null_values_return(self):
- return """
- function(table, port) {
- return (table)
- }
- """
-
- @pytest.fixture
- def udf_executor_null_values_add_row(self):
- return """
- function(table, port) {
- new_row <- data.frame(
- col1 = NA,
- col2 = NA,
- col3 = NA
- )
- table <- rbind(table, new_row)
- return (table)
- }
- """
-
- @pytest.fixture
- def target_tuples_null_values(self):
- tuple_1 = Tuple({"col1": None, "col2": None, "col3": None})
- tuple_2 = Tuple({"col1": None, "col2": None, "col3": None})
- tuple_3 = Tuple({"col1": None, "col2": None, "col3": None})
- return [tuple_1, tuple_2, tuple_3]
-
- @pytest.fixture
- def pandas_target_df_simple(self):
- data = {
- "Name": ["Alice", "Bob", "Charlie"],
- "Age": [25, 30, 35],
- "City": ["New York", "Los Angeles", "Chicago"],
- }
- df = pandas.DataFrame(data)
- return df
-
- @pytest.fixture
- def target_tuples_simple(self, pandas_target_df_simple):
- tuples = []
- for index, row in pandas_target_df_simple.iterrows():
- tuples.append(Tuple(row))
- return tuples
-
- @pytest.fixture
- def source_executor_simple(self):
- return """
- function() {
- df <- data.frame(
- Name = c("Alice", "Bob", "Charlie"),
- Age = c(25, 30, 35),
- City = c("New York", "Los Angeles", "Chicago")
- )
- return (df)
- }
- """
-
- @pytest.fixture
- def udf_executor_simple_extract_row(self):
- return """
- function(table, port) {
- bob_row <- table[table$Name == "Bob", ]
- return (bob_row)
- }
- """
-
- @pytest.fixture
- def udf_executor_simple_update_row(self):
- return """
- function(table, port) {
- table[table$Name == "Bob", "Age"] <- 18
- return (table)
- }
- """
-
- @pytest.fixture
- def udf_executor_simple_add_row(self):
- return """
- function(table, port) {
- new_row <- list(Name = "Test", Age = 0, City = "Irvine")
- new_df <- rbind(table, new_row)
- return (new_df)
- }
- """
-
- @pytest.fixture
- def source_executor_df_fail(self):
- # This Source UDF should raise a TypeError since it cannot
- # be converted into a Table-like object
- return """
- function() {
- glm_model <- glm(mpg ~ wt, data = mtcars, family = gaussian)
- return (glm_model)
- }
- """
-
- @pytest.fixture
- def target_tuples_like_type(self):
- tuple_1 = Tuple({"C.1": 1, "C.2": 2, "C.3": 3})
- tuple_2 = Tuple({"C.1": 11, "C.2": 12, "C.3": 13})
- return [tuple_1, tuple_2]
-
- @pytest.fixture
- def source_executor_df_like_type(self):
- return """
- function() {
- mdat <- matrix(c(1,2,3, 11,12,13), nrow = 2, ncol = 3, byrow =
TRUE,
- dimnames = list(c("row1", "row2"),
- c("C.1", "C.2", "C.3")))
- return (mdat)
- }
- """
-
- @pytest.fixture
- def udf_executor_df_like_type_add_row(self):
- return """
- function(table, port) {
- # Adding a new row
- new_row <- c(4, 5, 6)
- table <- rbind(table, new_row)
-
- return (table)
- }
- """
-
- @pytest.fixture
- def udf_executor_df_like_type_add_col(self):
- return """
- function(table, port) {
- # Adding a new col
- new_col <- c("AAA", "BBB")
- table <- cbind(table, new_col)
-
- return (table)
- }
- """
-
- def test_source_executor_empty(self, source_executor_empty):
- source_executor = RTableSourceExecutor(source_executor_empty)
- output = source_executor.produce()
- tuples = [tup for tup in output]
- assert len(tuples) == 0
-
- output_tbl = Table(tuples)
- assert output_tbl == Table([])
-
- def test_source_executor_NA(self, source_executor_NA):
- source_executor = RTableSourceExecutor(source_executor_NA)
- output = source_executor.produce()
- tuples = [tup for tup in output]
- assert len(tuples) == 1
-
- output_tbl = Table(tuples)
- assert output_tbl == Table([Tuple({"source_output": None})])
-
- def test_udf_executor_return_NA_fail(
- self, source_executor_empty, udf_executor_return_NA
- ):
- source_executor = RTableSourceExecutor(source_executor_empty)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _:
- udf_executor = RTableExecutor(udf_executor_return_NA)
- output = udf_executor.process_table(input_tbl, 0)
- tuples = [out for out in output]
- assert tuples is None
-
- def test_udf_executor_empty_return(
- self, source_executor_empty, udf_executor_simple_return
- ):
- source_executor = RTableSourceExecutor(source_executor_empty)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_return)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- assert len(tuples) == 0
-
- output_tbl = Table(tuples)
- assert output_tbl == Table([])
- assert output_tbl == input_tbl
-
- def test_udf_executor_empty_add_row(
- self, source_executor_empty, udf_executor_empty_add_row
- ):
- source_executor = RTableSourceExecutor(source_executor_empty)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_empty_add_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"col1": "TEST", "col2": 12.3, "col3": True})
- assert len(tuples) == 1
-
- output_tbl = Table(tuples)
- assert output_tbl == Table([target_tuple])
-
- def test_source_executor_null_values(
- self, source_executor_null_values, target_tuples_null_values
- ):
- source_executor = RTableSourceExecutor(source_executor_null_values)
- output = source_executor.produce()
- tuples = [tup for tup in output]
- assert len(tuples) == 3
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_null_values)
-
- def test_udf_executor_null_values_return(
- self,
- source_executor_null_values,
- udf_executor_null_values_return,
- target_tuples_null_values,
- ):
- source_executor = RTableSourceExecutor(source_executor_null_values)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_null_values_return)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- assert len(tuples) == 3
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_null_values)
-
- def test_udf_executor_null_values_add_row(
- self,
- source_executor_null_values,
- udf_executor_null_values_add_row,
- target_tuples_null_values,
- ):
- source_executor = RTableSourceExecutor(source_executor_null_values)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_null_values_add_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"col1": None, "col2": None, "col3": None})
- assert len(tuples) == 4
- assert tuples[3] == target_tuple
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_null_values + [target_tuple])
-
- def test_source_executor_simple(self, source_executor_simple,
target_tuples_simple):
- source_executor = RTableSourceExecutor(source_executor_simple)
- output = source_executor.produce()
-
- tuples = [tup for tup in output]
- assert len(tuples) == 3
-
- for idx, v in enumerate(tuples):
- assert v == target_tuples_simple[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_simple)
-
- def test_udf_executor_simple(
- self, source_executor_simple, udf_executor_simple_return,
target_tuples_simple
- ):
- source_executor = RTableSourceExecutor(source_executor_simple)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_return)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- assert len(tuples) == 3
-
- for idx, v in enumerate(tuples):
- assert v == target_tuples_simple[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_simple)
- assert output_tbl == input_tbl
-
- def test_udf_executor_simple_extract_row(
- self,
- source_executor_simple,
- udf_executor_simple_extract_row,
- target_tuples_simple,
- ):
- source_executor = RTableSourceExecutor(source_executor_simple)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_extract_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"Name": "Bob", "Age": 30, "City": "Los Angeles"})
- assert len(tuples) == 1
- assert tuples[0] == target_tuple
-
- output_tbl = Table(tuples)
- assert output_tbl == Table([target_tuple])
-
- def test_udf_executor_simple_update_row(
- self,
- source_executor_simple,
- udf_executor_simple_update_row,
- target_tuples_simple,
- ):
- source_executor = RTableSourceExecutor(source_executor_simple)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_update_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"Name": "Bob", "Age": 18, "City": "Los Angeles"})
- assert len(tuples) == 3
-
- for idx, v in enumerate(tuples):
- if idx == 1:
- assert v == target_tuple
- else:
- assert v == target_tuples_simple[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(
- [target_tuples_simple[0], target_tuple, target_tuples_simple[2]]
- )
-
- def test_udf_executor_simple_add_row(
- self, source_executor_simple, udf_executor_simple_add_row,
target_tuples_simple
- ):
- source_executor = RTableSourceExecutor(source_executor_simple)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_add_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"Name": "Test", "Age": 0, "City": "Irvine"})
- assert len(tuples) == 4
-
- for idx, v in enumerate(tuples):
- if idx == len(tuples) - 1:
- assert v == target_tuple
- else:
- assert v == target_tuples_simple[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(
- [tup for tup in target_tuples_simple] + [target_tuple]
- )
-
- def test_source_executor_fail(self, source_executor_df_fail):
- source_executor = RTableSourceExecutor(source_executor_df_fail)
- with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _:
- output = source_executor.produce()
- output = [out for out in output]
-
- def test_source_executor_df_like_type(
- self, source_executor_df_like_type, target_tuples_like_type
- ):
- source_executor = RTableSourceExecutor(source_executor_df_like_type)
- output = source_executor.produce()
-
- tuples = [tup for tup in output]
- assert len(tuples) == 2
-
- for idx, v in enumerate(tuples):
- assert v == target_tuples_like_type[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_like_type)
-
- def test_udf_executor_df_like_type(
- self,
- source_executor_df_like_type,
- udf_executor_simple_return,
- target_tuples_like_type,
- ):
- source_executor = RTableSourceExecutor(source_executor_df_like_type)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_simple_return)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- assert len(tuples) == 2
-
- for idx, v in enumerate(tuples):
- assert v == target_tuples_like_type[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_like_type)
- assert output_tbl == input_tbl
-
- def test_udf_executor_df_like_type_add_row(
- self,
- source_executor_df_like_type,
- udf_executor_df_like_type_add_row,
- target_tuples_like_type,
- ):
- source_executor = RTableSourceExecutor(source_executor_df_like_type)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_df_like_type_add_row)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuple = Tuple({"C.1": 4, "C.2": 5, "C.3": 6})
- assert len(tuples) == 3
-
- for idx, v in enumerate(tuples):
- if idx == len(tuples) - 1:
- assert v == target_tuple
- else:
- assert v == target_tuples_like_type[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples_like_type + [target_tuple])
-
- def test_udf_executor_df_like_type_add_col(
- self, source_executor_df_like_type, udf_executor_df_like_type_add_col
- ):
- source_executor = RTableSourceExecutor(source_executor_df_like_type)
- input_tbl = Table([tup for tup in source_executor.produce()])
-
- udf_executor = RTableExecutor(udf_executor_df_like_type_add_col)
- output = udf_executor.process_table(input_tbl, 0)
-
- tuples = [tup for tup in output]
- target_tuples = [
- Tuple({"C.1": 1, "C.2": 2, "C.3": 3, "new_col": "AAA"}),
- Tuple({"C.1": 11, "C.2": 12, "C.3": 13, "new_col": "BBB"}),
- ]
-
- assert len(tuples) == 2
- for idx, v in enumerate(tuples):
- assert v == target_tuples[idx]
-
- output_tbl = Table(tuples)
- assert output_tbl == Table(target_tuples)