Elkhan Dadashov created FLINK-33188:
---------------------------------------

             Summary: PyFlink MapState with Types.ROW() throws exception
                 Key: FLINK-33188
                 URL: https://issues.apache.org/jira/browse/FLINK-33188
             Project: Flink
          Issue Type: Bug
          Components: API / Python, API / Type Serialization System
    Affects Versions: 1.17.1
            Reporter: Elkhan Dadashov


I'm trying to use MapState, where the value will be a list of <class 
'pyflink.common.types.Row'> type elements.
 
Wanted to check if anyone else faced the same issue while trying to use 
MapState in PyFlink with complex types.
 
Here is the code:
 
from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
    KeyedCoProcessFunction,
    KeySelector,
    RuntimeContext,
)
from pyflink.datastream.state import (
    MapStateDescriptor,
    StateTtlConfig,
    ValueStateDescriptor,
    ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment


class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
    def __init__(self):
        self.my_map_state = None

    def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )

        my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.ROW([
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.BIG_INT() 
            ]))
        )
        my_map_state_descriptor.enable_time_to_live(state_ttl_config)
        self.my_map_state = 
runtime_context.get_map_state(my_map_state_descriptor)
 
But while running this code, it fails with this exception at job startup (at 
runtime_context.get_map_state(my_map_state_descriptor)), even without trying to 
add anything to the state.
 
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in 
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 127, in open
self.open_func()
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 296, in open_func
process_function.open(runtime_context)
File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
 line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 812, in from_type_info
from_type_info(type_info._key_type_info), 
from_type_info(type_info._value_type_info))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 809, in from_type_info
returnGenericArrayCoder(from_type_info(type_info.elem_type))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
391, in get_java_type_info
j_types_array = get_gateway()\
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
62, in get_gateway
_gateway = launch_gateway()
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF 
execution "
Exception: It's launching the PythonGatewayServer during Python UDF execution 
which is unexpected. It usually happens when the job codes are in the top level 
of the Python script file and are not enclosed in a `if name == 'main'` 
statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any 
exception.
 
This works:
 
my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.TUPLE([
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.SQL_TIMESTAMP(),
                Types.BIG_INT()
            ]))
        )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to