Hi Flinkers, I'm trying to use MapState, where the value will be a list of <class 'pyflink.common.types.Row'> type elements.
Wanted to check if anyone else faced the same issue while trying to use MapState in PyFlink with complex types. Here is the code: from pyflink.common import Time from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import ( KeyedCoProcessFunction, KeySelector, RuntimeContext, ) from pyflink.datastream.state import ( MapStateDescriptor, StateTtlConfig, ValueStateDescriptor, ListStateDescriptor ) from pyflink.table import DataTypes, StreamTableEnvironment class MyKeyedCoProcessFunction(KeyedCoProcessFunction): def __init__(self): self.my_map_state = None def open(self, runtime_context: RuntimeContext): state_ttl_config = ( StateTtlConfig.new_builder(Time.seconds(1)) .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) .disable_cleanup_in_background() .build() ) my_map_state_descriptor = MapStateDescriptor( "my_map_state", Types.SQL_TIMESTAMP(), Types.LIST(Types.ROW([ Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.BIG_INT() ])) ) my_map_state_descriptor.enable_time_to_live(state_ttl_config) self.my_map_state = runtime_context.get_map_state(my_map_state_descriptor) But while running this code, it fails with this exception at job startup (at runtime_context.get_map_state(my_map_state_descriptor)), even without trying to add anything to the state. File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation .__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 127, in open self.open_func() File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 296, in open_func process_function.open(runtime_context) File "/tmp/ipykernel_83481/1603226134.py", line 57, in open File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py", line 125, in get_map_state map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 812, in from_type_info from_type_info(type_info._key_type_info), from_type_info(type_info._value_type_info)) File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 809, in from_type_info return GenericArrayCoder(from_type_info(type_info.elem_type)) File "/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", line 819, in from_type_info [f for f in type_info.get_field_names()]) File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 377, in get_field_names j_field_names = self.get_java_type_info().getFieldNames() File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 391, in get_java_type_info j_types_array = get_gateway()\ File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 62, in get_gateway _gateway = launch_gateway() File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 86, in launch_gateway raise Exception("It's launching the PythonGatewayServer during Python UDF execution " Exception: It's launching the PythonGatewayServer during Python UDF execution which is unexpected. It usually happens when the job codes are in the top level of the Python script file and are not enclosed in a `if name == 'main'` statement. If I switch from Tupes.ROW to Types.TUPLE() it works without any exception. This works: my_map_state_descriptor = MapStateDescriptor( "my_map_state", Types.SQL_TIMESTAMP(), Types.LIST(Types.TUPLE([ Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.BIG_INT() ])) ) Also created Jira-FLINK-33188 <https://issues.apache.org/jira/browse/FLINK-33188> Thanks.