Yun Gao created FLINK-25967:
-------------------------------
Summary:
StreamingModeDataStreamTests.test_keyed_process_function_with_state failed on
azure
Key: FLINK-25967
URL: https://issues.apache.org/jira/browse/FLINK-25967
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.15.0
Reporter: Yun Gao
{code:java}
2022-02-07T03:46:14.7065997Z Feb 07 03:46:14
=================================== FAILURES ===================================
2022-02-07T03:46:14.7077277Z Feb 07 03:46:14 _____
StreamingModeDataStreamTests.test_keyed_process_function_with_state ______
2022-02-07T03:46:14.7077773Z Feb 07 03:46:14
2022-02-07T03:46:14.7078301Z Feb 07 03:46:14 self =
<pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests
testMethod=test_keyed_process_function_with_state>
2022-02-07T03:46:14.7078856Z Feb 07 03:46:14
2022-02-07T03:46:14.7079241Z Feb 07 03:46:14 def
test_keyed_process_function_with_state(self):
2022-02-07T03:46:14.7079730Z Feb 07 03:46:14
self.env.get_config().set_auto_watermark_interval(2000)
2022-02-07T03:46:14.7080460Z Feb 07 03:46:14
self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
2022-02-07T03:46:14.7081786Z Feb 07 03:46:14 data_stream =
self.env.from_collection([(1, 'hi', '1603708211000'),
2022-02-07T03:46:14.7082489Z Feb 07 03:46:14
(2, 'hello', '1603708224000'),
2022-02-07T03:46:14.7083172Z Feb 07 03:46:14
(3, 'hi', '1603708226000'),
2022-02-07T03:46:14.7083808Z Feb 07 03:46:14
(4, 'hello', '1603708289000'),
2022-02-07T03:46:14.7084439Z Feb 07 03:46:14
(5, 'hi', '1603708291000'),
2022-02-07T03:46:14.7085060Z Feb 07 03:46:14
(6, 'hello', '1603708293000')],
2022-02-07T03:46:14.7085549Z Feb 07 03:46:14
type_info=Types.ROW([Types.INT(), Types.STRING(),
2022-02-07T03:46:14.7086019Z Feb 07 03:46:14
Types.STRING()]))
2022-02-07T03:46:14.7086374Z Feb 07 03:46:14
2022-02-07T03:46:14.7086749Z Feb 07 03:46:14 class
MyTimestampAssigner(TimestampAssigner):
2022-02-07T03:46:14.7087121Z Feb 07 03:46:14
2022-02-07T03:46:14.7087695Z Feb 07 03:46:14 def
extract_timestamp(self, value, record_timestamp) -> int:
2022-02-07T03:46:14.7088140Z Feb 07 03:46:14 return
int(value[2])
2022-02-07T03:46:14.7088469Z Feb 07 03:46:14
2022-02-07T03:46:14.7088848Z Feb 07 03:46:14 class
MyProcessFunction(KeyedProcessFunction):
2022-02-07T03:46:14.7089220Z Feb 07 03:46:14
2022-02-07T03:46:14.7089537Z Feb 07 03:46:14 def __init__(self):
2022-02-07T03:46:14.7089916Z Feb 07 03:46:14 self.value_state =
None
2022-02-07T03:46:14.7090294Z Feb 07 03:46:14 self.list_state =
None
2022-02-07T03:46:14.7090676Z Feb 07 03:46:14 self.map_state =
None
2022-02-07T03:46:14.7091117Z Feb 07 03:46:14
2022-02-07T03:46:14.7091482Z Feb 07 03:46:14 def open(self,
runtime_context: RuntimeContext):
2022-02-07T03:46:14.7092223Z Feb 07 03:46:14
value_state_descriptor = ValueStateDescriptor('value_state', Types.INT())
2022-02-07T03:46:14.7092790Z Feb 07 03:46:14 self.value_state =
runtime_context.get_state(value_state_descriptor)
2022-02-07T03:46:14.7093544Z Feb 07 03:46:14
list_state_descriptor = ListStateDescriptor('list_state', Types.INT())
2022-02-07T03:46:14.7094102Z Feb 07 03:46:14 self.list_state =
runtime_context.get_list_state(list_state_descriptor)
2022-02-07T03:46:14.7094883Z Feb 07 03:46:14
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(),
Types.STRING())
2022-02-07T03:46:14.7095550Z Feb 07 03:46:14 state_ttl_config =
StateTtlConfig \
2022-02-07T03:46:14.7095960Z Feb 07 03:46:14
.new_builder(Time.seconds(1)) \
2022-02-07T03:46:14.7096643Z Feb 07 03:46:14
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
2022-02-07T03:46:14.7097104Z Feb 07 03:46:14
.set_state_visibility(
2022-02-07T03:46:14.7097563Z Feb 07 03:46:14
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \
2022-02-07T03:46:14.7098044Z Feb 07 03:46:14
.disable_cleanup_in_background() \
2022-02-07T03:46:14.7098425Z Feb 07 03:46:14 .build()
2022-02-07T03:46:14.7098835Z Feb 07 03:46:14
map_state_descriptor.enable_time_to_live(state_ttl_config)
2022-02-07T03:46:14.7099362Z Feb 07 03:46:14 self.map_state =
runtime_context.get_map_state(map_state_descriptor)
2022-02-07T03:46:14.7099787Z Feb 07 03:46:14
2022-02-07T03:46:14.7100147Z Feb 07 03:46:14 def
process_element(self, value, ctx):
2022-02-07T03:46:14.7100532Z Feb 07 03:46:14 import time
2022-02-07T03:46:14.7100974Z Feb 07 03:46:14 time.sleep(1)
2022-02-07T03:46:14.7101368Z Feb 07 03:46:14 current_value =
self.value_state.value()
2022-02-07T03:46:14.7101799Z Feb 07 03:46:14
self.value_state.update(value[0])
2022-02-07T03:46:14.7102241Z Feb 07 03:46:14 current_list = [_
for _ in self.list_state.get()]
2022-02-07T03:46:14.7102676Z Feb 07 03:46:14
self.list_state.add(value[0])
2022-02-07T03:46:14.7103119Z Feb 07 03:46:14 map_entries = {k:
v for k, v in self.map_state.items()}
2022-02-07T03:46:14.7103552Z Feb 07 03:46:14 keys =
sorted(map_entries.keys())
2022-02-07T03:46:14.7104263Z Feb 07 03:46:14 map_entries_string
= [str(k) + ': ' + str(map_entries[k]) for k in keys]
2022-02-07T03:46:14.7104977Z Feb 07 03:46:14 map_entries_string
= '{' + ', '.join(map_entries_string) + '}'
2022-02-07T03:46:14.7105453Z Feb 07 03:46:14
self.map_state.put(value[0], value[1])
2022-02-07T03:46:14.7105882Z Feb 07 03:46:14 current_key =
ctx.get_current_key()
2022-02-07T03:46:14.7106361Z Feb 07 03:46:14 yield "current
key: {}, current value state: {}, current list state: {}, " \
2022-02-07T03:46:14.7106871Z Feb 07 03:46:14 "current map
state: {}, current value: {}".format(str(current_key),
2022-02-07T03:46:14.7107344Z Feb 07 03:46:14
str(current_value),
2022-02-07T03:46:14.7107765Z Feb 07 03:46:14
str(current_list),
2022-02-07T03:46:14.7108185Z Feb 07 03:46:14
map_entries_string,
2022-02-07T03:46:14.7108598Z Feb 07 03:46:14
str(value))
2022-02-07T03:46:14.7108938Z Feb 07 03:46:14
2022-02-07T03:46:14.7109285Z Feb 07 03:46:14 def on_timer(self,
timestamp, ctx):
2022-02-07T03:46:14.7109656Z Feb 07 03:46:14 pass
2022-02-07T03:46:14.7109958Z Feb 07 03:46:14
2022-02-07T03:46:14.7110370Z Feb 07 03:46:14 watermark_strategy =
WatermarkStrategy.for_monotonous_timestamps() \
2022-02-07T03:46:14.7110865Z Feb 07 03:46:14
.with_timestamp_assigner(MyTimestampAssigner())
2022-02-07T03:46:14.7111365Z Feb 07 03:46:14
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
2022-02-07T03:46:14.7111844Z Feb 07 03:46:14 .key_by(lambda x:
x[1], key_type=Types.STRING()) \
2022-02-07T03:46:14.7112334Z Feb 07 03:46:14
.process(MyProcessFunction(), output_type=Types.STRING()) \
2022-02-07T03:46:14.7112781Z Feb 07 03:46:14
.add_sink(self.test_sink)
2022-02-07T03:46:14.7113561Z Feb 07 03:46:14 self.env.execute('test
time stamp assigner with keyed process function')
2022-02-07T03:46:14.7114051Z Feb 07 03:46:14 results =
self.test_sink.get_results()
2022-02-07T03:46:14.7114548Z Feb 07 03:46:14 expected = ["current key:
hi, current value state: None, current list state: [], "
2022-02-07T03:46:14.7115251Z Feb 07 03:46:14 "current map
state: {}, current value: Row(f0=1, f1='hi', "
2022-02-07T03:46:14.7115862Z Feb 07 03:46:14
"f2='1603708211000')",
2022-02-07T03:46:14.7116293Z Feb 07 03:46:14 "current key:
hello, current value state: None, "
2022-02-07T03:46:14.7116795Z Feb 07 03:46:14 "current list
state: [], current map state: {}, current value: Row(f0=2,"
2022-02-07T03:46:14.7117458Z Feb 07 03:46:14 " f1='hello',
f2='1603708224000')",
2022-02-07T03:46:14.7117937Z Feb 07 03:46:14 "current key:
hi, current value state: 1, current list state: [1], "
2022-02-07T03:46:14.7118640Z Feb 07 03:46:14 "current map
state: {1: hi}, current value: Row(f0=3, f1='hi', "
2022-02-07T03:46:14.7119348Z Feb 07 03:46:14
"f2='1603708226000')",
2022-02-07T03:46:14.7119814Z Feb 07 03:46:14 "current key:
hello, current value state: 2, current list state: [2], "
2022-02-07T03:46:14.7120541Z Feb 07 03:46:14 "current map
state: {2: hello}, current value: Row(f0=4, f1='hello', "
2022-02-07T03:46:14.7121161Z Feb 07 03:46:14
"f2='1603708289000')",
2022-02-07T03:46:14.7121720Z Feb 07 03:46:14 "current key:
hi, current value state: 3, current list state: [1, 3], "
2022-02-07T03:46:14.7122444Z Feb 07 03:46:14 "current map
state: {1: hi, 3: hi}, current value: Row(f0=5, f1='hi', "
2022-02-07T03:46:14.7123070Z Feb 07 03:46:14
"f2='1603708291000')",
2022-02-07T03:46:14.7123540Z Feb 07 03:46:14 "current key:
hello, current value state: 4, current list state: [2, 4],"
2022-02-07T03:46:14.7124088Z Feb 07 03:46:14 " current map
state: {2: hello, 4: hello}, current value: Row(f0=6, "
2022-02-07T03:46:14.7124739Z Feb 07 03:46:14 "f1='hello',
f2='1603708293000')"]
2022-02-07T03:46:14.7125184Z Feb 07 03:46:14 >
self.assert_equals_sorted(expected, results)
2022-02-07T03:46:14.7125537Z Feb 07 03:46:14
2022-02-07T03:46:14.7125922Z Feb 07 03:46:14
pyflink/datastream/tests/test_data_stream.py:683:
2022-02-07T03:46:14.7126404Z Feb 07 03:46:14 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2022-02-07T03:46:14.7126918Z Feb 07 03:46:14
pyflink/datastream/tests/test_data_stream.py:62: in assert_equals_sorted
2022-02-07T03:46:14.7127403Z Feb 07 03:46:14 self.assertEqual(expected,
actual)
2022-02-07T03:46:14.7128357Z Feb 07 03:46:14 E AssertionError: Lists differ:
["cur[719 chars]te: {1: hi, 3: hi}, current value: Row(f0=5, f[172 chars]0')"]
!= ["cur[719 chars]te: {3: hi}, current value: Row(f0=5, f1='hi',[165
chars]0')"]
2022-02-07T03:46:14.7128969Z Feb 07 03:46:14 E
2022-02-07T03:46:14.7129299Z Feb 07 03:46:14 E First differing element 4:
2022-02-07T03:46:14.7168430Z Feb 07 03:46:14 E "curr[80 chars]te: {1: hi, 3:
hi}, current value: Row(f0=5, f[23 chars]00')"
2022-02-07T03:46:14.7169423Z Feb 07 03:46:14 E "curr[80 chars]te: {3: hi},
current value: Row(f0=5, f1='hi',[16 chars]00')"
2022-02-07T03:46:14.7169864Z Feb 07 03:46:14 E
2022-02-07T03:46:14.7170276Z Feb 07 03:46:14 E Diff is 1211 characters long.
Set self.maxDiff to None to see it.
2022-02-07T03:46:14.7170964Z Feb 07 03:46:14 ===============================
warnings summary ===============================
2022-02-07T03:46:14.7171618Z Feb 07 03:46:14
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_classpaths
2022-02-07T03:46:14.7172883Z Feb 07 03:46:14
/__w/1/s/flink-python/.tox/py36-cython/lib/python3.6/site-packages/future/standard_library/__init__.py:65:
DeprecationWarning: the imp module is deprecated in favour of importlib; see
the module's documentation for alternative uses
2022-02-07T03:46:14.7173901Z Feb 07 03:46:14 import imp
2022-02-07T03:46:14.7174207Z Feb 07 03:46:14
2022-02-07T03:46:14.7174734Z Feb 07 03:46:14
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
2022-02-07T03:46:14.7175827Z Feb 07 03:46:14
/__w/1/s/flink-python/pyflink/table/table_environment.py:1999:
DeprecationWarning: Deprecated in 1.12. Use from_data_stream(DataStream,
*Expression) instead.
2022-02-07T03:46:14.7212771Z Feb 07 03:46:14 DeprecationWarning)
2022-02-07T03:46:14.7213185Z Feb 07 03:46:14
2022-02-07T03:46:14.7213709Z Feb 07 03:46:14
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_execute
2022-02-07T03:46:14.7214927Z Feb 07 03:46:14
/__w/1/s/flink-python/pyflink/table/table_environment.py:538:
DeprecationWarning: Deprecated in 1.10. Use create_table instead.
2022-02-07T03:46:14.7215607Z Feb 07 03:46:14 warnings.warn("Deprecated in
1.10. Use create_table instead.", DeprecationWarning)
2022-02-07T03:46:14.7216284Z Feb 07 03:46:14
2022-02-07T03:46:14.7217229Z Feb 07 03:46:14 -- Docs:
https://docs.pytest.org/en/stable/how-to/capture-warnings.html
2022-02-07T03:46:14.7217782Z Feb 07 03:46:14 =============================
slowest 20 durations =============================
2022-02-07T03:46:14.7218383Z Feb 07 03:46:14 10.67s call
pyflink/datastream/tests/test_connectors.py::ConnectorTests::test_stream_file_sink
2022-02-07T03:46:14.7219092Z Feb 07 03:46:14 10.18s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_process_function_with_state
2022-02-07T03:46:14.7219864Z Feb 07 03:46:14 9.85s call
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
2022-02-07T03:46:14.7230675Z Feb 07 03:46:14 8.86s call
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
2022-02-07T03:46:14.7231521Z Feb 07 03:46:14 7.62s call
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_aggregating_state
2022-02-07T03:46:14.7232254Z Feb 07 03:46:14 6.19s call
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_execute_and_collect
2022-02-07T03:46:14.7232975Z Feb 07 03:46:14 5.55s call
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_partition_custom
2022-02-07T03:46:14.7233690Z Feb 07 03:46:14 5.37s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_execute_and_collect
2022-02-07T03:46:14.7234452Z Feb 07 03:46:14 5.01s call
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file_2
2022-02-07T03:46:14.7235303Z Feb 07 03:46:14 5.00s call
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_without_cached_directory
2022-02-07T03:46:14.7236194Z Feb 07 03:46:14 4.98s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations
2022-02-07T03:46:14.7236909Z Feb 07 03:46:14 4.97s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_process
2022-02-07T03:46:14.7237661Z Feb 07 03:46:14 4.90s call
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_stream_env
2022-02-07T03:46:14.7238446Z Feb 07 03:46:14 4.80s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations_with_output_type
2022-02-07T03:46:14.7239268Z Feb 07 03:46:14 4.23s call
pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory
2022-02-07T03:46:14.7240269Z Feb 07 03:46:14 4.07s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_reduce_with_state
2022-02-07T03:46:14.7240956Z Feb 07 03:46:14 4.05s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_map
2022-02-07T03:46:14.7241649Z Feb 07 03:46:14 3.96s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_flat_map
2022-02-07T03:46:14.7242335Z Feb 07 03:46:14 3.90s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_map
2022-02-07T03:46:14.7242990Z Feb 07 03:46:14 3.82s call
pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_filter
2022-02-07T03:46:14.7243598Z Feb 07 03:46:14 =========================== short
test summary info ============================
2022-02-07T03:46:14.7244251Z Feb 07 03:46:14 FAILED
pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30817&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=23725
--
This message was sent by Atlassian Jira
(v8.20.1#820001)