This is an automated email from the ASF dual-hosted git repository.

yiconghuang pushed a commit to branch feat/drop-support-for-r-udf
in repository https://gitbox.apache.org/repos/asf/texera.git

commit fcf936e70166aef37cf378d7502209cb12b4af07
Author: Yicong Huang <[email protected]>
AuthorDate: Fri Nov 28 18:03:48 2025 -0800

    chore: drop support for RUDF
---
 .github/workflows/github-action-build.yml          |   2 +-
 amber/r-requirements.txt                           |  19 -
 .../core/architecture/managers/executor_manager.py |  31 +-
 .../src/main/python/core/models/RTableExecutor.py  | 132 -----
 .../src/main/python/core/models/RTupleExecutor.py  | 167 -------
 amber/src/main/python/core/models/r_utils.py       |  79 ---
 .../main/python/core/models/test_RTableExecutor.py | 539 ---------------------
 7 files changed, 8 insertions(+), 961 deletions(-)

diff --git a/.github/workflows/github-action-build.yml 
b/.github/workflows/github-action-build.yml
index fabf68053e..bb6c65a41d 100644
--- a/.github/workflows/github-action-build.yml
+++ b/.github/workflows/github-action-build.yml
@@ -157,4 +157,4 @@ jobs:
           cd amber/src/main/python && flake8 && black . --check
       - name: Test with pytest
         run: |
-          cd amber/src/main/python && pytest -sv 
--ignore=core/models/test_RTableExecutor.py
+          cd amber/src/main/python && pytest -sv
diff --git a/amber/r-requirements.txt b/amber/r-requirements.txt
deleted file mode 100644
index 1216cdce4f..0000000000
--- a/amber/r-requirements.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-rpy2==3.5.11
-rpy2-arrow==0.0.8
\ No newline at end of file
diff --git 
a/amber/src/main/python/core/architecture/managers/executor_manager.py 
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index 9693041975..286d7bcaf0 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -132,30 +132,13 @@ class ExecutorManager:
         :param language: The language of the operator code.
         :return:
         """
-        if language == "r-tuple":
-            # Have to import it here and not at the top in case R_HOME from 
udf.conf
-            # is not defined, otherwise an error will occur
-            # If R_HOME is not defined and rpy2 cannot find the
-            # R_HOME environment variable, an error will occur here
-            from core.models.RTupleExecutor import RTupleSourceExecutor, 
RTupleExecutor
-
-            self.executor = (
-                RTupleSourceExecutor(code) if is_source else 
RTupleExecutor(code)
-            )
-        elif language == "r-table":
-            # Have to import it here and not at the top in case R_HOME from 
udf.conf
-            # is not defined, otherwise an error will occur
-            # If R_HOME is not defined and rpy2 cannot find the
-            # R_HOME environment variable, an error will occur here
-            from core.models.RTableExecutor import RTableSourceExecutor, 
RTableExecutor
-
-            self.executor = (
-                RTableSourceExecutor(code) if is_source else 
RTableExecutor(code)
-            )
-        else:
-            executor: type(Operator) = self.load_executor_definition(code)
-            self.executor = executor()
-            self.executor.is_source = is_source
+        assert language in [
+            "r-tuple",
+            "r-table",
+        ], f"Language {language} is not by default. Please consult third party 
plugin."
+        executor: type(Operator) = self.load_executor_definition(code)
+        self.executor = executor()
+        self.executor.is_source = is_source
         assert (
             isinstance(self.executor, SourceOperator) == 
self.executor.is_source
         ), "Please use SourceOperator API for source operators."
diff --git a/amber/src/main/python/core/models/RTableExecutor.py 
b/amber/src/main/python/core/models/RTableExecutor.py
deleted file mode 100644
index db8610e6f2..0000000000
--- a/amber/src/main/python/core/models/RTableExecutor.py
+++ /dev/null
@@ -1,132 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-import rpy2.robjects as robjects
-import typing
-from rpy2.robjects import default_converter
-from rpy2.robjects.conversion import localconverter as local_converter
-from rpy2_arrow.arrow import rarrow_to_py_table, converter as arrow_converter
-from typing import Iterator, Optional, Union
-
-from core.models import ArrowTableTupleProvider, Tuple, TupleLike, Table, 
TableLike
-from core.models.operator import SourceOperator, TableOperator
-
-
-class RTableExecutor(TableOperator):
-    """
-    An executor that can execute R code on Arrow tables.
-    """
-
-    is_source = False
-
-    _arrow_to_r_dataframe = robjects.r(
-        "function(table) { return (as.data.frame(table)) }"
-    )
-
-    _r_dataframe_to_arrow = robjects.r(
-        """
-        library(arrow)
-        function(df) { return (arrow::as_arrow_table(df)) }
-        """
-    )
-
-    def __init__(self, r_code: str):
-        """
-        Initialize the RTableExecutor with R code.
-
-        Args:
-            r_code (str): R code to be executed.
-        """
-        super().__init__()
-        with local_converter(default_converter):
-            self._func: typing.Callable[[pa.Table], pa.Table] = 
robjects.r(r_code)
-
-    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
-        """
-        Process an input Table using the provided R function.
-        The Table is represented as a pandas.DataFrame.
-
-        :param table: Table, a table to be processed.
-        :param port: int, input port index of the current Tuple.
-            Currently unused in R-UDF
-        :return: Iterator[Optional[TableLike]], producing one TableLike object 
at a
-        time, or None.
-        """
-        input_pyarrow_table = pa.Table.from_pandas(table)
-        with local_converter(arrow_converter):
-            input_r_dataframe = RTableExecutor._arrow_to_r_dataframe(
-                input_pyarrow_table
-            )
-            output_r_dataframe = self._func(input_r_dataframe, port)
-            output_rarrow_table = RTableExecutor._r_dataframe_to_arrow(
-                output_r_dataframe
-            )
-            output_pyarrow_table = rarrow_to_py_table(output_rarrow_table)
-
-        for field_accessor in ArrowTableTupleProvider(output_pyarrow_table):
-            yield Tuple(
-                {name: field_accessor for name in 
output_pyarrow_table.column_names}
-            )
-
-
-class RTableSourceExecutor(SourceOperator):
-    """
-    A source operator that produces an R Table or Table-like object using R 
code.
-    """
-
-    is_source = True
-    _source_output_to_arrow = robjects.r(
-        """
-    library(arrow)
-    function(source_output) {
-        return (arrow::as_arrow_table(as.data.frame(source_output)))
-    }
-    """
-    )
-
-    def __init__(self, r_code: str):
-        """
-        Initialize the RTableSourceExecutor with R code.
-
-        Args:
-            r_code (str): R code to be executed.
-        """
-        super().__init__()
-        # Use the local converter from rpy2 to load in the R function given by 
the user
-        with local_converter(default_converter):
-            self._func = robjects.r(r_code)
-
-    def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
-        """
-        Produce Table using the provided R function.
-        Used by the source operator only.
-
-        :return: Iterator[Union[TupleLike, TableLike, None]], producing
-            one TupleLike object, one TableLike object, or None, at a time.
-        """
-        with local_converter(arrow_converter):
-            output_table = self._func()
-            output_rarrow_table = RTableSourceExecutor._source_output_to_arrow(
-                output_table
-            )
-            output_pyarrow_table = rarrow_to_py_table(output_rarrow_table)
-
-        for field_accessor in ArrowTableTupleProvider(output_pyarrow_table):
-            yield Tuple(
-                {name: field_accessor for name in 
output_pyarrow_table.column_names}
-            )
diff --git a/amber/src/main/python/core/models/RTupleExecutor.py 
b/amber/src/main/python/core/models/RTupleExecutor.py
deleted file mode 100644
index 67f05fb2d3..0000000000
--- a/amber/src/main/python/core/models/RTupleExecutor.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import pickle
-import pyarrow as pa
-import rpy2
-import rpy2.robjects as robjects
-import warnings
-from rpy2.robjects import default_converter
-from rpy2.robjects.conversion import localconverter as local_converter
-from rpy2_arrow.arrow import converter as arrow_converter
-from typing import Iterator, Optional, Union
-
-from core.models import Tuple, TupleLike, TableLike, r_utils
-from core.models.operator import SourceOperator, TupleOperatorV2
-
-warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*")
-
-
-class RTupleExecutor(TupleOperatorV2):
-    """
-    An operator that can execute R code on R Lists (R's representation of a 
Tuple)
-    """
-
-    is_source = False
-
-    _combine_binary_and_non_binary_lists = robjects.r(
-        """
-        function(non_binary_list, binary_list) {
-            non_binary_list <- as.list(non_binary_list$as_vector())
-            return (c(non_binary_list, binary_list))
-        }
-        """
-    )
-
-    def __init__(self, r_code: str):
-        """
-        Initialize the RTupleExecutor with R code.
-
-        Args:
-            r_code (str): R code to be executed.
-        """
-        super().__init__()
-        # Use the local converter from rpy2 to load in the R function given by 
the user
-        with local_converter(default_converter):
-            self._func = robjects.r(r_code)
-
-    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-        """
-        Process an input Tuple from the given link.
-
-        :param tuple_: Tuple, a Tuple from an input port to be processed.
-        :param port: int, input port index of the current Tuple.
-        :return: Iterator[Optional[TupleLike]], producing one TupleLike object 
at a
-            time, or None.
-        """
-        with local_converter(arrow_converter):
-            input_schema: pa.Schema = tuple_._schema.as_arrow_schema()
-            input_fields: list[str] = [field.name for field in input_schema]
-            non_binary_fields: list[str] = [
-                field.name for field in input_schema if field.type != 
pa.binary()
-            ]
-            binary_fields: list[str] = [
-                field.name for field in input_schema if field.type == 
pa.binary()
-            ]
-
-            non_binary_pyarrow_array: pa.StructArray = pa.array([], 
type=pa.struct([]))
-            if non_binary_fields:
-                non_binary_tuple: Tuple = 
tuple_.get_partial_tuple(non_binary_fields)
-                non_binary_tuple_schema: pa.Schema = (
-                    non_binary_tuple._schema.as_arrow_schema()
-                )
-                non_binary_pyarrow_array: pa.StructArray = pa.array(
-                    [non_binary_tuple.as_dict()],
-                    type=pa.struct(non_binary_tuple_schema),
-                )
-
-            binary_r_list: dict[str, object] = {}
-            if binary_fields:
-                binary_tuple: Tuple = tuple_.get_partial_tuple(binary_fields)
-                for k, v in binary_tuple.as_dict().items():
-                    if isinstance(v, bytes):
-                        binary_r_list[k] = pickle.loads(v[10:])
-                    elif isinstance(v, datetime.datetime):
-                        binary_r_list[k] = 
robjects.vectors.POSIXct.sexp_from_datetime(
-                            [v]
-                        )
-                    else:
-                        binary_r_list[k] = v
-
-            binary_r_list: rpy2.robjects.ListVector = 
robjects.vectors.ListVector(
-                binary_r_list
-            )
-
-            input_r_list: rpy2.robjects.ListVector = (
-                RTupleExecutor._combine_binary_and_non_binary_lists(
-                    non_binary_pyarrow_array, binary_r_list
-                )
-            )
-
-            output_r_generator: rpy2.robjects.SignatureTranslatedFunction = 
self._func(
-                input_r_list, port
-            )
-
-            while True:
-                output_py_tuple: Tuple = r_utils.extract_tuple_from_r(
-                    output_r_generator, False, input_fields
-                )
-                yield output_py_tuple if output_py_tuple is not None else None
-                if output_py_tuple is None:
-                    break
-
-
-class RTupleSourceExecutor(SourceOperator):
-    """
-    A source operator that produces a generator that yields R Lists using R 
code.
-    R Lists are R's representation of a Tuple
-    """
-
-    is_source = True
-
-    def __init__(self, r_code: str):
-        """
-        Initialize the RTupleSourceExecutor with R code.
-
-        Args:
-            r_code (str): R code to be executed.
-        """
-        super().__init__()
-        # Use the local converter from rpy2 to load in the R function given by 
the user
-        with local_converter(default_converter):
-            self._func = robjects.r(r_code)
-
-    def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]:
-        """
-        Produce Tuples using the provided R generator returned by the UDF.
-        The returned R generator is an iterator
-        that yields R Lists (R's representation of Tuple)
-        Used by the source operator only.
-
-        :return: Iterator[Union[TupleLike, TableLike, None]], producing
-            one TupleLike object, one TableLike object, or None, at a time.
-        """
-        with local_converter(arrow_converter):
-            output_r_generator: rpy2.robjects.SignatureTranslatedFunction = 
self._func()
-            while True:
-                output_py_tuple: Tuple = r_utils.extract_tuple_from_r(
-                    output_r_generator, True
-                )
-                yield output_py_tuple if output_py_tuple is not None else None
-                if output_py_tuple is None:
-                    break
diff --git a/amber/src/main/python/core/models/r_utils.py 
b/amber/src/main/python/core/models/r_utils.py
deleted file mode 100644
index d1f6e512bd..0000000000
--- a/amber/src/main/python/core/models/r_utils.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import rpy2
-import rpy2.rinterface as rinterface
-import rpy2.robjects as robjects
-import warnings
-
-from core.models import Tuple
-
-warnings.filterwarnings(action="ignore", category=UserWarning, module=r"rpy2*")
-
-
-def convert_r_to_py(value: rpy2.robjects):
-    """
-    :param value: A value that is from one of rpy2's many types (from 
rpy2.robjects)
-    :return: A Python representation of the value, if convertable.
-        If not, it returns the value itself
-    """
-    if isinstance(value, robjects.vectors.BoolVector):
-        return bool(value[0])
-    if isinstance(value, robjects.vectors.IntVector):
-        return int(value[0])
-    if isinstance(value, robjects.vectors.FloatVector):
-        if isinstance(value, robjects.vectors.POSIXct):
-            return next(value.iter_localized_datetime())
-        else:
-            return float(value[0])
-    if isinstance(value, robjects.vectors.StrVector):
-        return str(value[0])
-    return value
-
-
-def extract_tuple_from_r(
-    output_r_generator: rpy2.robjects.SignatureTranslatedFunction,
-    source_operator: bool,
-    input_fields: [None, list[str]] = None,
-) -> [Tuple, None]:
-    output_r_tuple: rpy2.robjects.ListVector = output_r_generator()
-    if (
-        isinstance(output_r_tuple, rinterface.SexpSymbol)
-        and str(output_r_tuple) == ".__exhausted__."
-    ) or isinstance(output_r_tuple.names, rpy2.rinterface_lib.sexp.NULLType):
-        return None
-
-    output_python_dict: dict[str, object] = {}
-    if source_operator:
-        output_python_dict = {
-            key: output_r_tuple.rx2(key) for key in output_r_tuple.names
-        }
-    else:
-        diff_fields: list[str] = [
-            field_name
-            for field_name in output_r_tuple.names
-            if field_name not in input_fields
-        ]
-        output_python_dict: dict[str, object] = {
-            key: output_r_tuple.rx2(key) for key in (input_fields + 
diff_fields)
-        }
-
-    output_python_dict: dict[str, object] = {
-        key: convert_r_to_py(value) for key, value in 
output_python_dict.items()
-    }
-
-    return Tuple(output_python_dict)
diff --git a/amber/src/main/python/core/models/test_RTableExecutor.py 
b/amber/src/main/python/core/models/test_RTableExecutor.py
deleted file mode 100644
index 1e584ab40d..0000000000
--- a/amber/src/main/python/core/models/test_RTableExecutor.py
+++ /dev/null
@@ -1,539 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pandas
-import pytest
-import rpy2.rinterface_lib.embedded
-
-from core.models import Tuple, Table
-from core.models.RTableExecutor import RTableSourceExecutor, RTableExecutor
-
-
-class TestRTableExecutor:
-    @pytest.fixture
-    def source_executor_empty(self):
-        return """
-        function() {
-            df <- data.frame(
-                col1 = character(),
-                col2 = numeric(),
-                col3 = logical()
-                )
-            return (df)
-        }
-        """
-
-    @pytest.fixture
-    def source_executor_NA(self):
-        # This should work with no issues
-        # since you can store NA in a data.frame
-        # and then convert the data.frame to Arrow Table with
-        # one Tuple with value Tuple({"source_output": None})
-        return """
-        function() {
-            return (NA)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_return_NA(self):
-        # This should fail since the conversion back
-        # to Arrow will be impossible
-        return """
-        function(table, port) {
-            return (NA)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_simple_return(self):
-        return """
-        function(table, port) {
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_empty_add_row(self):
-        return """
-        function(table, port) {
-            new_row <- data.frame(
-                col1 = "TEST",
-                col2 = 12.3,
-                col3 = TRUE
-              )
-            table <- rbind(table, new_row)
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def source_executor_null_values(self):
-        return """
-        function() {
-            df <- data.frame(
-                col1 = character(),
-                col2 = numeric(),
-                col3 = logical()
-                )
-            df[1:3,] <- NA
-            return (df)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_null_values_return(self):
-        return """
-        function(table, port) {
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_null_values_add_row(self):
-        return """
-        function(table, port) {
-            new_row <- data.frame(
-                col1 = NA,
-                col2 = NA,
-                col3 = NA
-              )
-            table <- rbind(table, new_row)
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def target_tuples_null_values(self):
-        tuple_1 = Tuple({"col1": None, "col2": None, "col3": None})
-        tuple_2 = Tuple({"col1": None, "col2": None, "col3": None})
-        tuple_3 = Tuple({"col1": None, "col2": None, "col3": None})
-        return [tuple_1, tuple_2, tuple_3]
-
-    @pytest.fixture
-    def pandas_target_df_simple(self):
-        data = {
-            "Name": ["Alice", "Bob", "Charlie"],
-            "Age": [25, 30, 35],
-            "City": ["New York", "Los Angeles", "Chicago"],
-        }
-        df = pandas.DataFrame(data)
-        return df
-
-    @pytest.fixture
-    def target_tuples_simple(self, pandas_target_df_simple):
-        tuples = []
-        for index, row in pandas_target_df_simple.iterrows():
-            tuples.append(Tuple(row))
-        return tuples
-
-    @pytest.fixture
-    def source_executor_simple(self):
-        return """
-        function() {
-            df <- data.frame(
-              Name = c("Alice", "Bob", "Charlie"),
-              Age = c(25, 30, 35),
-              City = c("New York", "Los Angeles", "Chicago")
-              )
-            return (df)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_simple_extract_row(self):
-        return """
-        function(table, port) {
-            bob_row <- table[table$Name == "Bob", ]
-            return (bob_row)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_simple_update_row(self):
-        return """
-        function(table, port) {
-            table[table$Name == "Bob", "Age"] <- 18
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_simple_add_row(self):
-        return """
-        function(table, port) {
-            new_row <- list(Name = "Test", Age = 0, City = "Irvine")
-            new_df <- rbind(table, new_row)
-            return (new_df)
-        }
-        """
-
-    @pytest.fixture
-    def source_executor_df_fail(self):
-        # This Source UDF should raise a TypeError since it cannot
-        # be converted into a Table-like object
-        return """
-        function() {
-            glm_model <- glm(mpg ~ wt, data = mtcars, family = gaussian)
-            return (glm_model)
-        }
-        """
-
-    @pytest.fixture
-    def target_tuples_like_type(self):
-        tuple_1 = Tuple({"C.1": 1, "C.2": 2, "C.3": 3})
-        tuple_2 = Tuple({"C.1": 11, "C.2": 12, "C.3": 13})
-        return [tuple_1, tuple_2]
-
-    @pytest.fixture
-    def source_executor_df_like_type(self):
-        return """
-        function() {
-            mdat <- matrix(c(1,2,3, 11,12,13), nrow = 2, ncol = 3, byrow = 
TRUE,
-                dimnames = list(c("row1", "row2"),
-                c("C.1", "C.2", "C.3")))
-            return (mdat)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_df_like_type_add_row(self):
-        return """
-        function(table, port) {
-            # Adding a new row
-            new_row <- c(4, 5, 6)
-            table <- rbind(table, new_row)
-
-            return (table)
-        }
-        """
-
-    @pytest.fixture
-    def udf_executor_df_like_type_add_col(self):
-        return """
-        function(table, port) {
-            # Adding a new col
-            new_col <- c("AAA", "BBB")
-            table <- cbind(table, new_col)
-
-            return (table)
-        }
-        """
-
-    def test_source_executor_empty(self, source_executor_empty):
-        source_executor = RTableSourceExecutor(source_executor_empty)
-        output = source_executor.produce()
-        tuples = [tup for tup in output]
-        assert len(tuples) == 0
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table([])
-
-    def test_source_executor_NA(self, source_executor_NA):
-        source_executor = RTableSourceExecutor(source_executor_NA)
-        output = source_executor.produce()
-        tuples = [tup for tup in output]
-        assert len(tuples) == 1
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table([Tuple({"source_output": None})])
-
-    def test_udf_executor_return_NA_fail(
-        self, source_executor_empty, udf_executor_return_NA
-    ):
-        source_executor = RTableSourceExecutor(source_executor_empty)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _:
-            udf_executor = RTableExecutor(udf_executor_return_NA)
-            output = udf_executor.process_table(input_tbl, 0)
-            tuples = [out for out in output]
-            assert tuples is None
-
-    def test_udf_executor_empty_return(
-        self, source_executor_empty, udf_executor_simple_return
-    ):
-        source_executor = RTableSourceExecutor(source_executor_empty)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_return)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 0
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table([])
-        assert output_tbl == input_tbl
-
-    def test_udf_executor_empty_add_row(
-        self, source_executor_empty, udf_executor_empty_add_row
-    ):
-        source_executor = RTableSourceExecutor(source_executor_empty)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_empty_add_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"col1": "TEST", "col2": 12.3, "col3": True})
-        assert len(tuples) == 1
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table([target_tuple])
-
-    def test_source_executor_null_values(
-        self, source_executor_null_values, target_tuples_null_values
-    ):
-        source_executor = RTableSourceExecutor(source_executor_null_values)
-        output = source_executor.produce()
-        tuples = [tup for tup in output]
-        assert len(tuples) == 3
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_null_values)
-
-    def test_udf_executor_null_values_return(
-        self,
-        source_executor_null_values,
-        udf_executor_null_values_return,
-        target_tuples_null_values,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_null_values)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_null_values_return)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 3
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_null_values)
-
-    def test_udf_executor_null_values_add_row(
-        self,
-        source_executor_null_values,
-        udf_executor_null_values_add_row,
-        target_tuples_null_values,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_null_values)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_null_values_add_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"col1": None, "col2": None, "col3": None})
-        assert len(tuples) == 4
-        assert tuples[3] == target_tuple
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_null_values + [target_tuple])
-
-    def test_source_executor_simple(self, source_executor_simple, 
target_tuples_simple):
-        source_executor = RTableSourceExecutor(source_executor_simple)
-        output = source_executor.produce()
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 3
-
-        for idx, v in enumerate(tuples):
-            assert v == target_tuples_simple[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_simple)
-
-    def test_udf_executor_simple(
-        self, source_executor_simple, udf_executor_simple_return, 
target_tuples_simple
-    ):
-        source_executor = RTableSourceExecutor(source_executor_simple)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_return)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 3
-
-        for idx, v in enumerate(tuples):
-            assert v == target_tuples_simple[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_simple)
-        assert output_tbl == input_tbl
-
-    def test_udf_executor_simple_extract_row(
-        self,
-        source_executor_simple,
-        udf_executor_simple_extract_row,
-        target_tuples_simple,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_simple)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_extract_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"Name": "Bob", "Age": 30, "City": "Los Angeles"})
-        assert len(tuples) == 1
-        assert tuples[0] == target_tuple
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table([target_tuple])
-
-    def test_udf_executor_simple_update_row(
-        self,
-        source_executor_simple,
-        udf_executor_simple_update_row,
-        target_tuples_simple,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_simple)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_update_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"Name": "Bob", "Age": 18, "City": "Los Angeles"})
-        assert len(tuples) == 3
-
-        for idx, v in enumerate(tuples):
-            if idx == 1:
-                assert v == target_tuple
-            else:
-                assert v == target_tuples_simple[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(
-            [target_tuples_simple[0], target_tuple, target_tuples_simple[2]]
-        )
-
-    def test_udf_executor_simple_add_row(
-        self, source_executor_simple, udf_executor_simple_add_row, 
target_tuples_simple
-    ):
-        source_executor = RTableSourceExecutor(source_executor_simple)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_add_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"Name": "Test", "Age": 0, "City": "Irvine"})
-        assert len(tuples) == 4
-
-        for idx, v in enumerate(tuples):
-            if idx == len(tuples) - 1:
-                assert v == target_tuple
-            else:
-                assert v == target_tuples_simple[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(
-            [tup for tup in target_tuples_simple] + [target_tuple]
-        )
-
-    def test_source_executor_fail(self, source_executor_df_fail):
-        source_executor = RTableSourceExecutor(source_executor_df_fail)
-        with pytest.raises(rpy2.rinterface_lib.embedded.RRuntimeError) as _:
-            output = source_executor.produce()
-            output = [out for out in output]
-
-    def test_source_executor_df_like_type(
-        self, source_executor_df_like_type, target_tuples_like_type
-    ):
-        source_executor = RTableSourceExecutor(source_executor_df_like_type)
-        output = source_executor.produce()
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 2
-
-        for idx, v in enumerate(tuples):
-            assert v == target_tuples_like_type[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_like_type)
-
-    def test_udf_executor_df_like_type(
-        self,
-        source_executor_df_like_type,
-        udf_executor_simple_return,
-        target_tuples_like_type,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_df_like_type)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_simple_return)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        assert len(tuples) == 2
-
-        for idx, v in enumerate(tuples):
-            assert v == target_tuples_like_type[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_like_type)
-        assert output_tbl == input_tbl
-
-    def test_udf_executor_df_like_type_add_row(
-        self,
-        source_executor_df_like_type,
-        udf_executor_df_like_type_add_row,
-        target_tuples_like_type,
-    ):
-        source_executor = RTableSourceExecutor(source_executor_df_like_type)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_df_like_type_add_row)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuple = Tuple({"C.1": 4, "C.2": 5, "C.3": 6})
-        assert len(tuples) == 3
-
-        for idx, v in enumerate(tuples):
-            if idx == len(tuples) - 1:
-                assert v == target_tuple
-            else:
-                assert v == target_tuples_like_type[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples_like_type + [target_tuple])
-
-    def test_udf_executor_df_like_type_add_col(
-        self, source_executor_df_like_type, udf_executor_df_like_type_add_col
-    ):
-        source_executor = RTableSourceExecutor(source_executor_df_like_type)
-        input_tbl = Table([tup for tup in source_executor.produce()])
-
-        udf_executor = RTableExecutor(udf_executor_df_like_type_add_col)
-        output = udf_executor.process_table(input_tbl, 0)
-
-        tuples = [tup for tup in output]
-        target_tuples = [
-            Tuple({"C.1": 1, "C.2": 2, "C.3": 3, "new_col": "AAA"}),
-            Tuple({"C.1": 11, "C.2": 12, "C.3": 13, "new_col": "BBB"}),
-        ]
-
-        assert len(tuples) == 2
-        for idx, v in enumerate(tuples):
-            assert v == target_tuples[idx]
-
-        output_tbl = Table(tuples)
-        assert output_tbl == Table(target_tuples)


Reply via email to