Yun Gao created FLINK-25967: ------------------------------- Summary: StreamingModeDataStreamTests.test_keyed_process_function_with_state failed on azure Key: FLINK-25967 URL: https://issues.apache.org/jira/browse/FLINK-25967 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0 Reporter: Yun Gao
{code:java} 2022-02-07T03:46:14.7065997Z Feb 07 03:46:14 =================================== FAILURES =================================== 2022-02-07T03:46:14.7077277Z Feb 07 03:46:14 _____ StreamingModeDataStreamTests.test_keyed_process_function_with_state ______ 2022-02-07T03:46:14.7077773Z Feb 07 03:46:14 2022-02-07T03:46:14.7078301Z Feb 07 03:46:14 self = <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests testMethod=test_keyed_process_function_with_state> 2022-02-07T03:46:14.7078856Z Feb 07 03:46:14 2022-02-07T03:46:14.7079241Z Feb 07 03:46:14 def test_keyed_process_function_with_state(self): 2022-02-07T03:46:14.7079730Z Feb 07 03:46:14 self.env.get_config().set_auto_watermark_interval(2000) 2022-02-07T03:46:14.7080460Z Feb 07 03:46:14 self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) 2022-02-07T03:46:14.7081786Z Feb 07 03:46:14 data_stream = self.env.from_collection([(1, 'hi', '1603708211000'), 2022-02-07T03:46:14.7082489Z Feb 07 03:46:14 (2, 'hello', '1603708224000'), 2022-02-07T03:46:14.7083172Z Feb 07 03:46:14 (3, 'hi', '1603708226000'), 2022-02-07T03:46:14.7083808Z Feb 07 03:46:14 (4, 'hello', '1603708289000'), 2022-02-07T03:46:14.7084439Z Feb 07 03:46:14 (5, 'hi', '1603708291000'), 2022-02-07T03:46:14.7085060Z Feb 07 03:46:14 (6, 'hello', '1603708293000')], 2022-02-07T03:46:14.7085549Z Feb 07 03:46:14 type_info=Types.ROW([Types.INT(), Types.STRING(), 2022-02-07T03:46:14.7086019Z Feb 07 03:46:14 Types.STRING()])) 2022-02-07T03:46:14.7086374Z Feb 07 03:46:14 2022-02-07T03:46:14.7086749Z Feb 07 03:46:14 class MyTimestampAssigner(TimestampAssigner): 2022-02-07T03:46:14.7087121Z Feb 07 03:46:14 2022-02-07T03:46:14.7087695Z Feb 07 03:46:14 def extract_timestamp(self, value, record_timestamp) -> int: 2022-02-07T03:46:14.7088140Z Feb 07 03:46:14 return int(value[2]) 2022-02-07T03:46:14.7088469Z Feb 07 03:46:14 2022-02-07T03:46:14.7088848Z Feb 07 03:46:14 class MyProcessFunction(KeyedProcessFunction): 2022-02-07T03:46:14.7089220Z Feb 07 03:46:14 2022-02-07T03:46:14.7089537Z Feb 07 03:46:14 def __init__(self): 2022-02-07T03:46:14.7089916Z Feb 07 03:46:14 self.value_state = None 2022-02-07T03:46:14.7090294Z Feb 07 03:46:14 self.list_state = None 2022-02-07T03:46:14.7090676Z Feb 07 03:46:14 self.map_state = None 2022-02-07T03:46:14.7091117Z Feb 07 03:46:14 2022-02-07T03:46:14.7091482Z Feb 07 03:46:14 def open(self, runtime_context: RuntimeContext): 2022-02-07T03:46:14.7092223Z Feb 07 03:46:14 value_state_descriptor = ValueStateDescriptor('value_state', Types.INT()) 2022-02-07T03:46:14.7092790Z Feb 07 03:46:14 self.value_state = runtime_context.get_state(value_state_descriptor) 2022-02-07T03:46:14.7093544Z Feb 07 03:46:14 list_state_descriptor = ListStateDescriptor('list_state', Types.INT()) 2022-02-07T03:46:14.7094102Z Feb 07 03:46:14 self.list_state = runtime_context.get_list_state(list_state_descriptor) 2022-02-07T03:46:14.7094883Z Feb 07 03:46:14 map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), Types.STRING()) 2022-02-07T03:46:14.7095550Z Feb 07 03:46:14 state_ttl_config = StateTtlConfig \ 2022-02-07T03:46:14.7095960Z Feb 07 03:46:14 .new_builder(Time.seconds(1)) \ 2022-02-07T03:46:14.7096643Z Feb 07 03:46:14 .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \ 2022-02-07T03:46:14.7097104Z Feb 07 03:46:14 .set_state_visibility( 2022-02-07T03:46:14.7097563Z Feb 07 03:46:14 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \ 2022-02-07T03:46:14.7098044Z Feb 07 03:46:14 .disable_cleanup_in_background() \ 2022-02-07T03:46:14.7098425Z Feb 07 03:46:14 .build() 2022-02-07T03:46:14.7098835Z Feb 07 03:46:14 map_state_descriptor.enable_time_to_live(state_ttl_config) 2022-02-07T03:46:14.7099362Z Feb 07 03:46:14 self.map_state = runtime_context.get_map_state(map_state_descriptor) 2022-02-07T03:46:14.7099787Z Feb 07 03:46:14 2022-02-07T03:46:14.7100147Z Feb 07 03:46:14 def process_element(self, value, ctx): 2022-02-07T03:46:14.7100532Z Feb 07 03:46:14 import time 2022-02-07T03:46:14.7100974Z Feb 07 03:46:14 time.sleep(1) 2022-02-07T03:46:14.7101368Z Feb 07 03:46:14 current_value = self.value_state.value() 2022-02-07T03:46:14.7101799Z Feb 07 03:46:14 self.value_state.update(value[0]) 2022-02-07T03:46:14.7102241Z Feb 07 03:46:14 current_list = [_ for _ in self.list_state.get()] 2022-02-07T03:46:14.7102676Z Feb 07 03:46:14 self.list_state.add(value[0]) 2022-02-07T03:46:14.7103119Z Feb 07 03:46:14 map_entries = {k: v for k, v in self.map_state.items()} 2022-02-07T03:46:14.7103552Z Feb 07 03:46:14 keys = sorted(map_entries.keys()) 2022-02-07T03:46:14.7104263Z Feb 07 03:46:14 map_entries_string = [str(k) + ': ' + str(map_entries[k]) for k in keys] 2022-02-07T03:46:14.7104977Z Feb 07 03:46:14 map_entries_string = '{' + ', '.join(map_entries_string) + '}' 2022-02-07T03:46:14.7105453Z Feb 07 03:46:14 self.map_state.put(value[0], value[1]) 2022-02-07T03:46:14.7105882Z Feb 07 03:46:14 current_key = ctx.get_current_key() 2022-02-07T03:46:14.7106361Z Feb 07 03:46:14 yield "current key: {}, current value state: {}, current list state: {}, " \ 2022-02-07T03:46:14.7106871Z Feb 07 03:46:14 "current map state: {}, current value: {}".format(str(current_key), 2022-02-07T03:46:14.7107344Z Feb 07 03:46:14 str(current_value), 2022-02-07T03:46:14.7107765Z Feb 07 03:46:14 str(current_list), 2022-02-07T03:46:14.7108185Z Feb 07 03:46:14 map_entries_string, 2022-02-07T03:46:14.7108598Z Feb 07 03:46:14 str(value)) 2022-02-07T03:46:14.7108938Z Feb 07 03:46:14 2022-02-07T03:46:14.7109285Z Feb 07 03:46:14 def on_timer(self, timestamp, ctx): 2022-02-07T03:46:14.7109656Z Feb 07 03:46:14 pass 2022-02-07T03:46:14.7109958Z Feb 07 03:46:14 2022-02-07T03:46:14.7110370Z Feb 07 03:46:14 watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ 2022-02-07T03:46:14.7110865Z Feb 07 03:46:14 .with_timestamp_assigner(MyTimestampAssigner()) 2022-02-07T03:46:14.7111365Z Feb 07 03:46:14 data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ 2022-02-07T03:46:14.7111844Z Feb 07 03:46:14 .key_by(lambda x: x[1], key_type=Types.STRING()) \ 2022-02-07T03:46:14.7112334Z Feb 07 03:46:14 .process(MyProcessFunction(), output_type=Types.STRING()) \ 2022-02-07T03:46:14.7112781Z Feb 07 03:46:14 .add_sink(self.test_sink) 2022-02-07T03:46:14.7113561Z Feb 07 03:46:14 self.env.execute('test time stamp assigner with keyed process function') 2022-02-07T03:46:14.7114051Z Feb 07 03:46:14 results = self.test_sink.get_results() 2022-02-07T03:46:14.7114548Z Feb 07 03:46:14 expected = ["current key: hi, current value state: None, current list state: [], " 2022-02-07T03:46:14.7115251Z Feb 07 03:46:14 "current map state: {}, current value: Row(f0=1, f1='hi', " 2022-02-07T03:46:14.7115862Z Feb 07 03:46:14 "f2='1603708211000')", 2022-02-07T03:46:14.7116293Z Feb 07 03:46:14 "current key: hello, current value state: None, " 2022-02-07T03:46:14.7116795Z Feb 07 03:46:14 "current list state: [], current map state: {}, current value: Row(f0=2," 2022-02-07T03:46:14.7117458Z Feb 07 03:46:14 " f1='hello', f2='1603708224000')", 2022-02-07T03:46:14.7117937Z Feb 07 03:46:14 "current key: hi, current value state: 1, current list state: [1], " 2022-02-07T03:46:14.7118640Z Feb 07 03:46:14 "current map state: {1: hi}, current value: Row(f0=3, f1='hi', " 2022-02-07T03:46:14.7119348Z Feb 07 03:46:14 "f2='1603708226000')", 2022-02-07T03:46:14.7119814Z Feb 07 03:46:14 "current key: hello, current value state: 2, current list state: [2], " 2022-02-07T03:46:14.7120541Z Feb 07 03:46:14 "current map state: {2: hello}, current value: Row(f0=4, f1='hello', " 2022-02-07T03:46:14.7121161Z Feb 07 03:46:14 "f2='1603708289000')", 2022-02-07T03:46:14.7121720Z Feb 07 03:46:14 "current key: hi, current value state: 3, current list state: [1, 3], " 2022-02-07T03:46:14.7122444Z Feb 07 03:46:14 "current map state: {1: hi, 3: hi}, current value: Row(f0=5, f1='hi', " 2022-02-07T03:46:14.7123070Z Feb 07 03:46:14 "f2='1603708291000')", 2022-02-07T03:46:14.7123540Z Feb 07 03:46:14 "current key: hello, current value state: 4, current list state: [2, 4]," 2022-02-07T03:46:14.7124088Z Feb 07 03:46:14 " current map state: {2: hello, 4: hello}, current value: Row(f0=6, " 2022-02-07T03:46:14.7124739Z Feb 07 03:46:14 "f1='hello', f2='1603708293000')"] 2022-02-07T03:46:14.7125184Z Feb 07 03:46:14 > self.assert_equals_sorted(expected, results) 2022-02-07T03:46:14.7125537Z Feb 07 03:46:14 2022-02-07T03:46:14.7125922Z Feb 07 03:46:14 pyflink/datastream/tests/test_data_stream.py:683: 2022-02-07T03:46:14.7126404Z Feb 07 03:46:14 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 2022-02-07T03:46:14.7126918Z Feb 07 03:46:14 pyflink/datastream/tests/test_data_stream.py:62: in assert_equals_sorted 2022-02-07T03:46:14.7127403Z Feb 07 03:46:14 self.assertEqual(expected, actual) 2022-02-07T03:46:14.7128357Z Feb 07 03:46:14 E AssertionError: Lists differ: ["cur[719 chars]te: {1: hi, 3: hi}, current value: Row(f0=5, f[172 chars]0')"] != ["cur[719 chars]te: {3: hi}, current value: Row(f0=5, f1='hi',[165 chars]0')"] 2022-02-07T03:46:14.7128969Z Feb 07 03:46:14 E 2022-02-07T03:46:14.7129299Z Feb 07 03:46:14 E First differing element 4: 2022-02-07T03:46:14.7168430Z Feb 07 03:46:14 E "curr[80 chars]te: {1: hi, 3: hi}, current value: Row(f0=5, f[23 chars]00')" 2022-02-07T03:46:14.7169423Z Feb 07 03:46:14 E "curr[80 chars]te: {3: hi}, current value: Row(f0=5, f1='hi',[16 chars]00')" 2022-02-07T03:46:14.7169864Z Feb 07 03:46:14 E 2022-02-07T03:46:14.7170276Z Feb 07 03:46:14 E Diff is 1211 characters long. Set self.maxDiff to None to see it. 2022-02-07T03:46:14.7170964Z Feb 07 03:46:14 =============================== warnings summary =============================== 2022-02-07T03:46:14.7171618Z Feb 07 03:46:14 pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_classpaths 2022-02-07T03:46:14.7172883Z Feb 07 03:46:14 /__w/1/s/flink-python/.tox/py36-cython/lib/python3.6/site-packages/future/standard_library/__init__.py:65: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses 2022-02-07T03:46:14.7173901Z Feb 07 03:46:14 import imp 2022-02-07T03:46:14.7174207Z Feb 07 03:46:14 2022-02-07T03:46:14.7174734Z Feb 07 03:46:14 pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file 2022-02-07T03:46:14.7175827Z Feb 07 03:46:14 /__w/1/s/flink-python/pyflink/table/table_environment.py:1999: DeprecationWarning: Deprecated in 1.12. Use from_data_stream(DataStream, *Expression) instead. 2022-02-07T03:46:14.7212771Z Feb 07 03:46:14 DeprecationWarning) 2022-02-07T03:46:14.7213185Z Feb 07 03:46:14 2022-02-07T03:46:14.7213709Z Feb 07 03:46:14 pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_execute 2022-02-07T03:46:14.7214927Z Feb 07 03:46:14 /__w/1/s/flink-python/pyflink/table/table_environment.py:538: DeprecationWarning: Deprecated in 1.10. Use create_table instead. 2022-02-07T03:46:14.7215607Z Feb 07 03:46:14 warnings.warn("Deprecated in 1.10. Use create_table instead.", DeprecationWarning) 2022-02-07T03:46:14.7216284Z Feb 07 03:46:14 2022-02-07T03:46:14.7217229Z Feb 07 03:46:14 -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html 2022-02-07T03:46:14.7217782Z Feb 07 03:46:14 ============================= slowest 20 durations ============================= 2022-02-07T03:46:14.7218383Z Feb 07 03:46:14 10.67s call pyflink/datastream/tests/test_connectors.py::ConnectorTests::test_stream_file_sink 2022-02-07T03:46:14.7219092Z Feb 07 03:46:14 10.18s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_process_function_with_state 2022-02-07T03:46:14.7219864Z Feb 07 03:46:14 9.85s call pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file 2022-02-07T03:46:14.7230675Z Feb 07 03:46:14 8.86s call pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state 2022-02-07T03:46:14.7231521Z Feb 07 03:46:14 7.62s call pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_aggregating_state 2022-02-07T03:46:14.7232254Z Feb 07 03:46:14 6.19s call pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_execute_and_collect 2022-02-07T03:46:14.7232975Z Feb 07 03:46:14 5.55s call pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_partition_custom 2022-02-07T03:46:14.7233690Z Feb 07 03:46:14 5.37s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_execute_and_collect 2022-02-07T03:46:14.7234452Z Feb 07 03:46:14 5.01s call pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file_2 2022-02-07T03:46:14.7235303Z Feb 07 03:46:14 5.00s call pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_without_cached_directory 2022-02-07T03:46:14.7236194Z Feb 07 03:46:14 4.98s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations 2022-02-07T03:46:14.7236909Z Feb 07 03:46:14 4.97s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_process 2022-02-07T03:46:14.7237661Z Feb 07 03:46:14 4.90s call pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_stream_env 2022-02-07T03:46:14.7238446Z Feb 07 03:46:14 4.80s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations_with_output_type 2022-02-07T03:46:14.7239268Z Feb 07 03:46:14 4.23s call pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory 2022-02-07T03:46:14.7240269Z Feb 07 03:46:14 4.07s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_reduce_with_state 2022-02-07T03:46:14.7240956Z Feb 07 03:46:14 4.05s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_map 2022-02-07T03:46:14.7241649Z Feb 07 03:46:14 3.96s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_flat_map 2022-02-07T03:46:14.7242335Z Feb 07 03:46:14 3.90s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_map 2022-02-07T03:46:14.7242990Z Feb 07 03:46:14 3.82s call pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_filter 2022-02-07T03:46:14.7243598Z Feb 07 03:46:14 =========================== short test summary info ============================ 2022-02-07T03:46:14.7244251Z Feb 07 03:46:14 FAILED pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30817&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=23725 -- This message was sent by Atlassian Jira (v8.20.1#820001)