Dear readers, I'm running into some unexpected behaviour in PyFlink when switching execution mode from process to thread. In thread mode, my `Row` gets converted to a tuple whenever I use a UDF in a map operation. By this conversion to tuples, we lose critical information such as column names. Below is a minimal working example (mostly taken from the documentation):
``` from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Row from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") # This does work: t_env.get_config().set("python.execution-mode", "process") # This doesn't work: #t_env.get_config().set("python.execution-mode", "thread") def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # map operation with a python general scalar function func = udf( map_function, result_type=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) table = ( t_env.from_elements( [(2, 4), (0, 0)], schema=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) .map(func) .alias("a", "b") .execute() .print() ) ``` This results in the following exception: 2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a' 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72) 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102) 2024-03-28 16:32:10 at <string>.<lambda>(<string>:1) 2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19) Note that in process mode this works perfectly fine. Is this expected behaviour and/or is there a workaround? Kind regards, Wouter