Hi Flinkers,

I'm trying to use MapState, where the value will be a list of <class
'pyflink.common.types.Row'> type elements.

Wanted to check if anyone else faced the same issue while trying to use
MapState in PyFlink with complex types.

Here is the code:

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
    KeyedCoProcessFunction,
    KeySelector,
    RuntimeContext,
)
from pyflink.datastream.state import (
    MapStateDescriptor,
    StateTtlConfig,
    ValueStateDescriptor,
    ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment


class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
    def __init__(self):
        self.my_map_state = None

    def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )

        my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.ROW([
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.BIG_INT()
            ]))
        )
        my_map_state_descriptor.enable_time_to_live(state_ttl_config)
        self.my_map_state =
runtime_context.get_map_state(my_map_state_descriptor)

But while running this code, it fails with this exception at job startup
(at runtime_context.get_map_state(my_map_state_descriptor)), even without
trying to add anything to the state.

File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation
.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 127, in open
self.open_func()
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 296, in open_func
process_function.open(runtime_context)
File "/tmp/ipykernel_83481/1603226134.py", line 57, in open
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 812, in from_type_info
from_type_info(type_info._key_type_info),
from_type_info(type_info._value_type_info))
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 809, in from_type_info
return GenericArrayCoder(from_type_info(type_info.elem_type))
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
line 377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
line 391, in get_java_type_info
j_types_array = get_gateway()\
File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
line 62, in get_gateway
_gateway = launch_gateway()
File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
line 86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF
execution "
Exception: It's launching the PythonGatewayServer during Python UDF
execution which is unexpected. It usually happens when the job codes are in
the top level of the Python script file and are not enclosed in a `if name
== 'main'` statement.

If I switch from Tupes.ROW to Types.TUPLE() it works without any exception.

This works:

my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.TUPLE([
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.BIG_INT()
            ]))
        )

Also created Jira-FLINK-33188
<https://issues.apache.org/jira/browse/FLINK-33188>

Thanks.

Reply via email to