After digging into the flink-python code, It seems if `PYFLINK_GATEWAY_DISABLED` is set to false in an environment variable, then using Types.LIST(Types.ROW([...])) does not have any issue, once Java Gateway is launched.
It was unexpected for Flink local run to set this flag to false explicitly. This is a workaround for this issue: def open(self, runtime_context: RuntimeContext): state_ttl_config = ( StateTtlConfig.new_builder(Time.seconds(1)) .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) .disable_cleanup_in_background() .build() ) import os os.environ["PYFLINK_GATEWAY_DISABLED"] = "0" On Wed, Oct 4, 2023 at 1:48 PM Elkhan Dadashov <elkhan.dadas...@gmail.com> wrote: > Hi Flinkers, > > I'm trying to use MapState, where the value will be a list of <class > 'pyflink.common.types.Row'> type elements. > > Wanted to check if anyone else faced the same issue while trying to use > MapState in PyFlink with complex types. > > Here is the code: > > from pyflink.common import Time > from pyflink.common.typeinfo import Types > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import ( > KeyedCoProcessFunction, > KeySelector, > RuntimeContext, > ) > from pyflink.datastream.state import ( > MapStateDescriptor, > StateTtlConfig, > ValueStateDescriptor, > ListStateDescriptor > ) > from pyflink.table import DataTypes, StreamTableEnvironment > > > class MyKeyedCoProcessFunction(KeyedCoProcessFunction): > def __init__(self): > self.my_map_state = None > > def open(self, runtime_context: RuntimeContext): > state_ttl_config = ( > StateTtlConfig.new_builder(Time.seconds(1)) > .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) > .disable_cleanup_in_background() > .build() > ) > > my_map_state_descriptor = MapStateDescriptor( > "my_map_state", > Types.SQL_TIMESTAMP(), > Types.LIST(Types.ROW([ > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.BIG_INT() > ])) > ) > my_map_state_descriptor.enable_time_to_live(state_ttl_config) > self.my_map_state = > runtime_context.get_map_state(my_map_state_descriptor) > > But while running this code, it fails with this exception at job startup > (at runtime_context.get_map_state(my_map_state_descriptor)), even without > trying to add anything to the state. > > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in > pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation > .__init__ > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", > line 127, in open > self.open_func() > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", > line 296, in open_func > process_function.open(runtime_context) > File "/tmp/ipykernel_83481/1603226134.py", line 57, in open > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", > line 125, in get_map_state > map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 812, in from_type_info > from_type_info(type_info._key_type_info), > from_type_info(type_info._value_type_info)) > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 809, in from_type_info > return GenericArrayCoder(from_type_info(type_info.elem_type)) > File > "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", > line 819, in from_type_info > [f for f in type_info.get_field_names()]) > File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", > line 377, in get_field_names > j_field_names = self.get_java_type_info().getFieldNames() > File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", > line 391, in get_java_type_info > j_types_array = get_gateway()\ > File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", > line 62, in get_gateway > _gateway = launch_gateway() > File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", > line 86, in launch_gateway > raise Exception("It's launching the PythonGatewayServer during Python UDF > execution " > Exception: It's launching the PythonGatewayServer during Python UDF > execution which is unexpected. It usually happens when the job codes are in > the top level of the Python script file and are not enclosed in a `if name > == 'main'` statement. > > If I switch from Tupes.ROW to Types.TUPLE() it works without any exception. > > This works: > > my_map_state_descriptor = MapStateDescriptor( > "my_map_state", > Types.SQL_TIMESTAMP(), > Types.LIST(Types.TUPLE([ > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.STRING(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.SQL_TIMESTAMP(), > Types.BIG_INT() > ])) > ) > > Also created Jira-FLINK-33188 > <https://issues.apache.org/jira/browse/FLINK-33188> > > Thanks. >