Yun Gao created FLINK-25967:
-------------------------------

             Summary: 
StreamingModeDataStreamTests.test_keyed_process_function_with_state failed on 
azure
                 Key: FLINK-25967
                 URL: https://issues.apache.org/jira/browse/FLINK-25967
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.15.0
            Reporter: Yun Gao



{code:java}
2022-02-07T03:46:14.7065997Z Feb 07 03:46:14 
=================================== FAILURES ===================================
2022-02-07T03:46:14.7077277Z Feb 07 03:46:14 _____ 
StreamingModeDataStreamTests.test_keyed_process_function_with_state ______
2022-02-07T03:46:14.7077773Z Feb 07 03:46:14 
2022-02-07T03:46:14.7078301Z Feb 07 03:46:14 self = 
<pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests 
testMethod=test_keyed_process_function_with_state>
2022-02-07T03:46:14.7078856Z Feb 07 03:46:14 
2022-02-07T03:46:14.7079241Z Feb 07 03:46:14     def 
test_keyed_process_function_with_state(self):
2022-02-07T03:46:14.7079730Z Feb 07 03:46:14         
self.env.get_config().set_auto_watermark_interval(2000)
2022-02-07T03:46:14.7080460Z Feb 07 03:46:14         
self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
2022-02-07T03:46:14.7081786Z Feb 07 03:46:14         data_stream = 
self.env.from_collection([(1, 'hi', '1603708211000'),
2022-02-07T03:46:14.7082489Z Feb 07 03:46:14                                    
             (2, 'hello', '1603708224000'),
2022-02-07T03:46:14.7083172Z Feb 07 03:46:14                                    
             (3, 'hi', '1603708226000'),
2022-02-07T03:46:14.7083808Z Feb 07 03:46:14                                    
             (4, 'hello', '1603708289000'),
2022-02-07T03:46:14.7084439Z Feb 07 03:46:14                                    
             (5, 'hi', '1603708291000'),
2022-02-07T03:46:14.7085060Z Feb 07 03:46:14                                    
             (6, 'hello', '1603708293000')],
2022-02-07T03:46:14.7085549Z Feb 07 03:46:14                                    
            type_info=Types.ROW([Types.INT(), Types.STRING(),
2022-02-07T03:46:14.7086019Z Feb 07 03:46:14                                    
                                 Types.STRING()]))
2022-02-07T03:46:14.7086374Z Feb 07 03:46:14     
2022-02-07T03:46:14.7086749Z Feb 07 03:46:14         class 
MyTimestampAssigner(TimestampAssigner):
2022-02-07T03:46:14.7087121Z Feb 07 03:46:14     
2022-02-07T03:46:14.7087695Z Feb 07 03:46:14             def 
extract_timestamp(self, value, record_timestamp) -> int:
2022-02-07T03:46:14.7088140Z Feb 07 03:46:14                 return 
int(value[2])
2022-02-07T03:46:14.7088469Z Feb 07 03:46:14     
2022-02-07T03:46:14.7088848Z Feb 07 03:46:14         class 
MyProcessFunction(KeyedProcessFunction):
2022-02-07T03:46:14.7089220Z Feb 07 03:46:14     
2022-02-07T03:46:14.7089537Z Feb 07 03:46:14             def __init__(self):
2022-02-07T03:46:14.7089916Z Feb 07 03:46:14                 self.value_state = 
None
2022-02-07T03:46:14.7090294Z Feb 07 03:46:14                 self.list_state = 
None
2022-02-07T03:46:14.7090676Z Feb 07 03:46:14                 self.map_state = 
None
2022-02-07T03:46:14.7091117Z Feb 07 03:46:14     
2022-02-07T03:46:14.7091482Z Feb 07 03:46:14             def open(self, 
runtime_context: RuntimeContext):
2022-02-07T03:46:14.7092223Z Feb 07 03:46:14                 
value_state_descriptor = ValueStateDescriptor('value_state', Types.INT())
2022-02-07T03:46:14.7092790Z Feb 07 03:46:14                 self.value_state = 
runtime_context.get_state(value_state_descriptor)
2022-02-07T03:46:14.7093544Z Feb 07 03:46:14                 
list_state_descriptor = ListStateDescriptor('list_state', Types.INT())
2022-02-07T03:46:14.7094102Z Feb 07 03:46:14                 self.list_state = 
runtime_context.get_list_state(list_state_descriptor)
2022-02-07T03:46:14.7094883Z Feb 07 03:46:14                 
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), 
Types.STRING())
2022-02-07T03:46:14.7095550Z Feb 07 03:46:14                 state_ttl_config = 
StateTtlConfig \
2022-02-07T03:46:14.7095960Z Feb 07 03:46:14                     
.new_builder(Time.seconds(1)) \
2022-02-07T03:46:14.7096643Z Feb 07 03:46:14                     
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
2022-02-07T03:46:14.7097104Z Feb 07 03:46:14                     
.set_state_visibility(
2022-02-07T03:46:14.7097563Z Feb 07 03:46:14                         
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \
2022-02-07T03:46:14.7098044Z Feb 07 03:46:14                     
.disable_cleanup_in_background() \
2022-02-07T03:46:14.7098425Z Feb 07 03:46:14                     .build()
2022-02-07T03:46:14.7098835Z Feb 07 03:46:14                 
map_state_descriptor.enable_time_to_live(state_ttl_config)
2022-02-07T03:46:14.7099362Z Feb 07 03:46:14                 self.map_state = 
runtime_context.get_map_state(map_state_descriptor)
2022-02-07T03:46:14.7099787Z Feb 07 03:46:14     
2022-02-07T03:46:14.7100147Z Feb 07 03:46:14             def 
process_element(self, value, ctx):
2022-02-07T03:46:14.7100532Z Feb 07 03:46:14                 import time
2022-02-07T03:46:14.7100974Z Feb 07 03:46:14                 time.sleep(1)
2022-02-07T03:46:14.7101368Z Feb 07 03:46:14                 current_value = 
self.value_state.value()
2022-02-07T03:46:14.7101799Z Feb 07 03:46:14                 
self.value_state.update(value[0])
2022-02-07T03:46:14.7102241Z Feb 07 03:46:14                 current_list = [_ 
for _ in self.list_state.get()]
2022-02-07T03:46:14.7102676Z Feb 07 03:46:14                 
self.list_state.add(value[0])
2022-02-07T03:46:14.7103119Z Feb 07 03:46:14                 map_entries = {k: 
v for k, v in self.map_state.items()}
2022-02-07T03:46:14.7103552Z Feb 07 03:46:14                 keys = 
sorted(map_entries.keys())
2022-02-07T03:46:14.7104263Z Feb 07 03:46:14                 map_entries_string 
= [str(k) + ': ' + str(map_entries[k]) for k in keys]
2022-02-07T03:46:14.7104977Z Feb 07 03:46:14                 map_entries_string 
= '{' + ', '.join(map_entries_string) + '}'
2022-02-07T03:46:14.7105453Z Feb 07 03:46:14                 
self.map_state.put(value[0], value[1])
2022-02-07T03:46:14.7105882Z Feb 07 03:46:14                 current_key = 
ctx.get_current_key()
2022-02-07T03:46:14.7106361Z Feb 07 03:46:14                 yield "current 
key: {}, current value state: {}, current list state: {}, " \
2022-02-07T03:46:14.7106871Z Feb 07 03:46:14                       "current map 
state: {}, current value: {}".format(str(current_key),
2022-02-07T03:46:14.7107344Z Feb 07 03:46:14                                    
                                     str(current_value),
2022-02-07T03:46:14.7107765Z Feb 07 03:46:14                                    
                                     str(current_list),
2022-02-07T03:46:14.7108185Z Feb 07 03:46:14                                    
                                     map_entries_string,
2022-02-07T03:46:14.7108598Z Feb 07 03:46:14                                    
                                     str(value))
2022-02-07T03:46:14.7108938Z Feb 07 03:46:14     
2022-02-07T03:46:14.7109285Z Feb 07 03:46:14             def on_timer(self, 
timestamp, ctx):
2022-02-07T03:46:14.7109656Z Feb 07 03:46:14                 pass
2022-02-07T03:46:14.7109958Z Feb 07 03:46:14     
2022-02-07T03:46:14.7110370Z Feb 07 03:46:14         watermark_strategy = 
WatermarkStrategy.for_monotonous_timestamps() \
2022-02-07T03:46:14.7110865Z Feb 07 03:46:14             
.with_timestamp_assigner(MyTimestampAssigner())
2022-02-07T03:46:14.7111365Z Feb 07 03:46:14         
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
2022-02-07T03:46:14.7111844Z Feb 07 03:46:14             .key_by(lambda x: 
x[1], key_type=Types.STRING()) \
2022-02-07T03:46:14.7112334Z Feb 07 03:46:14             
.process(MyProcessFunction(), output_type=Types.STRING()) \
2022-02-07T03:46:14.7112781Z Feb 07 03:46:14             
.add_sink(self.test_sink)
2022-02-07T03:46:14.7113561Z Feb 07 03:46:14         self.env.execute('test 
time stamp assigner with keyed process function')
2022-02-07T03:46:14.7114051Z Feb 07 03:46:14         results = 
self.test_sink.get_results()
2022-02-07T03:46:14.7114548Z Feb 07 03:46:14         expected = ["current key: 
hi, current value state: None, current list state: [], "
2022-02-07T03:46:14.7115251Z Feb 07 03:46:14                     "current map 
state: {}, current value: Row(f0=1, f1='hi', "
2022-02-07T03:46:14.7115862Z Feb 07 03:46:14                     
"f2='1603708211000')",
2022-02-07T03:46:14.7116293Z Feb 07 03:46:14                     "current key: 
hello, current value state: None, "
2022-02-07T03:46:14.7116795Z Feb 07 03:46:14                     "current list 
state: [], current map state: {}, current value: Row(f0=2,"
2022-02-07T03:46:14.7117458Z Feb 07 03:46:14                     " f1='hello', 
f2='1603708224000')",
2022-02-07T03:46:14.7117937Z Feb 07 03:46:14                     "current key: 
hi, current value state: 1, current list state: [1], "
2022-02-07T03:46:14.7118640Z Feb 07 03:46:14                     "current map 
state: {1: hi}, current value: Row(f0=3, f1='hi', "
2022-02-07T03:46:14.7119348Z Feb 07 03:46:14                     
"f2='1603708226000')",
2022-02-07T03:46:14.7119814Z Feb 07 03:46:14                     "current key: 
hello, current value state: 2, current list state: [2], "
2022-02-07T03:46:14.7120541Z Feb 07 03:46:14                     "current map 
state: {2: hello}, current value: Row(f0=4, f1='hello', "
2022-02-07T03:46:14.7121161Z Feb 07 03:46:14                     
"f2='1603708289000')",
2022-02-07T03:46:14.7121720Z Feb 07 03:46:14                     "current key: 
hi, current value state: 3, current list state: [1, 3], "
2022-02-07T03:46:14.7122444Z Feb 07 03:46:14                     "current map 
state: {1: hi, 3: hi}, current value: Row(f0=5, f1='hi', "
2022-02-07T03:46:14.7123070Z Feb 07 03:46:14                     
"f2='1603708291000')",
2022-02-07T03:46:14.7123540Z Feb 07 03:46:14                     "current key: 
hello, current value state: 4, current list state: [2, 4],"
2022-02-07T03:46:14.7124088Z Feb 07 03:46:14                     " current map 
state: {2: hello, 4: hello}, current value: Row(f0=6, "
2022-02-07T03:46:14.7124739Z Feb 07 03:46:14                     "f1='hello', 
f2='1603708293000')"]
2022-02-07T03:46:14.7125184Z Feb 07 03:46:14 >       
self.assert_equals_sorted(expected, results)
2022-02-07T03:46:14.7125537Z Feb 07 03:46:14 
2022-02-07T03:46:14.7125922Z Feb 07 03:46:14 
pyflink/datastream/tests/test_data_stream.py:683: 
2022-02-07T03:46:14.7126404Z Feb 07 03:46:14 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
2022-02-07T03:46:14.7126918Z Feb 07 03:46:14 
pyflink/datastream/tests/test_data_stream.py:62: in assert_equals_sorted
2022-02-07T03:46:14.7127403Z Feb 07 03:46:14     self.assertEqual(expected, 
actual)
2022-02-07T03:46:14.7128357Z Feb 07 03:46:14 E   AssertionError: Lists differ: 
["cur[719 chars]te: {1: hi, 3: hi}, current value: Row(f0=5, f[172 chars]0')"] 
!= ["cur[719 chars]te: {3: hi}, current value: Row(f0=5, f1='hi',[165 
chars]0')"]
2022-02-07T03:46:14.7128969Z Feb 07 03:46:14 E   
2022-02-07T03:46:14.7129299Z Feb 07 03:46:14 E   First differing element 4:
2022-02-07T03:46:14.7168430Z Feb 07 03:46:14 E   "curr[80 chars]te: {1: hi, 3: 
hi}, current value: Row(f0=5, f[23 chars]00')"
2022-02-07T03:46:14.7169423Z Feb 07 03:46:14 E   "curr[80 chars]te: {3: hi}, 
current value: Row(f0=5, f1='hi',[16 chars]00')"
2022-02-07T03:46:14.7169864Z Feb 07 03:46:14 E   
2022-02-07T03:46:14.7170276Z Feb 07 03:46:14 E   Diff is 1211 characters long. 
Set self.maxDiff to None to see it.
2022-02-07T03:46:14.7170964Z Feb 07 03:46:14 =============================== 
warnings summary ===============================
2022-02-07T03:46:14.7171618Z Feb 07 03:46:14 
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_classpaths
2022-02-07T03:46:14.7172883Z Feb 07 03:46:14   
/__w/1/s/flink-python/.tox/py36-cython/lib/python3.6/site-packages/future/standard_library/__init__.py:65:
 DeprecationWarning: the imp module is deprecated in favour of importlib; see 
the module's documentation for alternative uses
2022-02-07T03:46:14.7173901Z Feb 07 03:46:14     import imp
2022-02-07T03:46:14.7174207Z Feb 07 03:46:14 
2022-02-07T03:46:14.7174734Z Feb 07 03:46:14 
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
2022-02-07T03:46:14.7175827Z Feb 07 03:46:14   
/__w/1/s/flink-python/pyflink/table/table_environment.py:1999: 
DeprecationWarning: Deprecated in 1.12. Use from_data_stream(DataStream, 
*Expression) instead.
2022-02-07T03:46:14.7212771Z Feb 07 03:46:14     DeprecationWarning)
2022-02-07T03:46:14.7213185Z Feb 07 03:46:14 
2022-02-07T03:46:14.7213709Z Feb 07 03:46:14 
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_execute
2022-02-07T03:46:14.7214927Z Feb 07 03:46:14   
/__w/1/s/flink-python/pyflink/table/table_environment.py:538: 
DeprecationWarning: Deprecated in 1.10. Use create_table instead.
2022-02-07T03:46:14.7215607Z Feb 07 03:46:14     warnings.warn("Deprecated in 
1.10. Use create_table instead.", DeprecationWarning)
2022-02-07T03:46:14.7216284Z Feb 07 03:46:14 
2022-02-07T03:46:14.7217229Z Feb 07 03:46:14 -- Docs: 
https://docs.pytest.org/en/stable/how-to/capture-warnings.html
2022-02-07T03:46:14.7217782Z Feb 07 03:46:14 ============================= 
slowest 20 durations =============================
2022-02-07T03:46:14.7218383Z Feb 07 03:46:14 10.67s call     
pyflink/datastream/tests/test_connectors.py::ConnectorTests::test_stream_file_sink
2022-02-07T03:46:14.7219092Z Feb 07 03:46:14 10.18s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_process_function_with_state
2022-02-07T03:46:14.7219864Z Feb 07 03:46:14 9.85s call     
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
2022-02-07T03:46:14.7230675Z Feb 07 03:46:14 8.86s call     
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
2022-02-07T03:46:14.7231521Z Feb 07 03:46:14 7.62s call     
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_aggregating_state
2022-02-07T03:46:14.7232254Z Feb 07 03:46:14 6.19s call     
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_execute_and_collect
2022-02-07T03:46:14.7232975Z Feb 07 03:46:14 5.55s call     
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_partition_custom
2022-02-07T03:46:14.7233690Z Feb 07 03:46:14 5.37s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_execute_and_collect
2022-02-07T03:46:14.7234452Z Feb 07 03:46:14 5.01s call     
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file_2
2022-02-07T03:46:14.7235303Z Feb 07 03:46:14 5.00s call     
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_without_cached_directory
2022-02-07T03:46:14.7236194Z Feb 07 03:46:14 4.98s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations
2022-02-07T03:46:14.7236909Z Feb 07 03:46:14 4.97s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_process
2022-02-07T03:46:14.7237661Z Feb 07 03:46:14 4.90s call     
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_stream_env
2022-02-07T03:46:14.7238446Z Feb 07 03:46:14 4.80s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations_with_output_type
2022-02-07T03:46:14.7239268Z Feb 07 03:46:14 4.23s call     
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory
2022-02-07T03:46:14.7240269Z Feb 07 03:46:14 4.07s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_reduce_with_state
2022-02-07T03:46:14.7240956Z Feb 07 03:46:14 4.05s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_map
2022-02-07T03:46:14.7241649Z Feb 07 03:46:14 3.96s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_flat_map
2022-02-07T03:46:14.7242335Z Feb 07 03:46:14 3.90s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_map
2022-02-07T03:46:14.7242990Z Feb 07 03:46:14 3.82s call     
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_filter
2022-02-07T03:46:14.7243598Z Feb 07 03:46:14 =========================== short 
test summary info ============================
2022-02-07T03:46:14.7244251Z Feb 07 03:46:14 FAILED 
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30817&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=23725



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to