Dear readers,

I'm running into some unexpected behaviour in PyFlink when switching
execution mode from process to thread. In thread mode, my `Row` gets
converted to a tuple whenever I use a UDF in a map operation. By this
conversion to tuples, we lose critical information such as column names.
Below is a minimal working example (mostly taken from the documentation):

```
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
    return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
    map_function,
    result_type=DataTypes.ROW(
        [
            DataTypes.FIELD("a", DataTypes.BIGINT()),
            DataTypes.FIELD("b", DataTypes.BIGINT()),
        ]
    ),
)
table = (
    t_env.from_elements(
        [(2, 4), (0, 0)],
        schema=DataTypes.ROW(
            [
                DataTypes.FIELD("a", DataTypes.BIGINT()),
                DataTypes.FIELD("b", DataTypes.BIGINT()),
            ]
        ),
    )
    .map(func)
    .alias("a", "b")
    .execute()
    .print()
)
```
This results in the following exception:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class
'AttributeError'>: 'tuple' object has no attribute 'a' 2024-03-28 16:32:10
at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at <string>.<lambda>(<string>:1) 2024-03-28 16:32:10 at
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
Note that in process mode this works perfectly fine. Is this expected
behaviour and/or is there a workaround?

Kind regards,
Wouter

Reply via email to