[ 
https://issues.apache.org/jira/browse/FLINK-31272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-31272:
----------------------------
    Release Note: It may produce duplicate operators for Python DataStream API 
jobs of versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1. It has addressed this 
issue since 1.15.4, 1.16.2 and 1.17.0. For jobs which are not affected by this 
issue, there are no backward compatibility issues. However, for jobs which are 
affected, it may not be possible to restore from savepoints generated from 
versions 1.15.0 ~ 1.15.3 and 1.16.0 ~ 1.16.1.

> Duplicate operators appear in the StreamGraph for Python DataStream API jobs
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-31272
>                 URL: https://issues.apache.org/jira/browse/FLINK-31272
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.15.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.16.2, 1.15.5
>
>
> For the following job:
> {code}
> import argparse
> import json
> import sys
> import time
> from typing import Iterable, cast
> from pyflink.common import Types, Time, Encoder
> from pyflink.datastream import StreamExecutionEnvironment, 
> ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
>     PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, 
> ExternalizedCheckpointCleanup
> from pyflink.datastream.connectors.file_system import FileSink, 
> RollingPolicy, OutputFileConfig
> from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
> from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, 
> TumblingProcessingTimeWindows, \
>     ProcessingTimeTrigger
> class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
>     def __init__(self, window_size: int):
>         self._window_size = window_size
>         self._count_state_descriptor = ReducingStateDescriptor(
>             "count", lambda a, b: a + b, Types.LONG())
>     @staticmethod
>     def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
>         return CountWithProcessTimeoutTrigger(window_size)
>     def on_element(self,
>                    element: T,
>                    timestamp: int,
>                    window: TimeWindow,
>                    ctx: 'Trigger.TriggerContext') -> TriggerResult:
>         count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
>         count_state.add(1)
>         # print("element arrive:", element, "count_state:", 
> count_state.get(), window.max_timestamp(),
>         #       ctx.get_current_watermark())
>         if count_state.get() >= self._window_size:  # 必须fire&purge!!!!
>             print("fire element count", element, count_state.get(), 
> window.max_timestamp(),
>                   ctx.get_current_watermark())
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>         if timestamp >= window.end:
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>         else:
>             return TriggerResult.CONTINUE
>     def on_processing_time(self,
>                            timestamp: int,
>                            window: TimeWindow,
>                            ctx: Trigger.TriggerContext) -> TriggerResult:
>         if timestamp >= window.end:
>             return TriggerResult.CONTINUE
>         else:
>             print("fire with process_time:", timestamp)
>             count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>     def on_event_time(self,
>                       timestamp: int,
>                       window: TimeWindow,
>                       ctx: 'Trigger.TriggerContext') -> TriggerResult:
>         return TriggerResult.CONTINUE
>     def clear(self,
>               window: TimeWindow,
>               ctx: 'Trigger.TriggerContext') -> None:
>         count_state = ctx.get_partitioned_state(self._count_state_descriptor)
>         count_state.clear()
> def to_dict_map(v):
>     time.sleep(1)
>     dict_value = json.loads(v)
>     return dict_value
> def get_group_key(value, keys):
>     group_key_values = []
>     for key in keys:
>         one_key_value = 'null'
>         if key in value:
>             list_value = value[key]
>             if list_value:
>                 one_key_value = str(list_value[0])
>         group_key_values.append(one_key_value)
>     group_key = '_'.join(group_key_values)
>     # print("group_key=", group_key)
>     return group_key
> class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, 
> TimeWindow]):
>     def __init__(self, uf):
>         self._user_function = uf
>     def process(self,
>                 key: str,
>                 context: ProcessWindowFunction.Context[TimeWindow],
>                 elements: Iterable[dict]) -> Iterable[dict]:
>         result_list = 
> self._user_function.process_after_group_by_function(elements)
>         return result_list
> if __name__ == '__main__':
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         '--output',
>         dest='output',
>         required=False,
>         help='Output file to write results to.')
>     argv = sys.argv[1:]
>     known_args, _ = parser.parse_known_args(argv)
>     output_path = known_args.output
>     env = StreamExecutionEnvironment.get_execution_environment()
>     # write all the data to one file
>     env.set_parallelism(1)
>     # process time
>     env.get_config().set_auto_watermark_interval(0)
>     state_backend = EmbeddedRocksDBStateBackend(True)
>     
> state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>     env.set_state_backend(state_backend)
>     config = env.get_checkpoint_config()
>     # 
> config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
>     
> config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
>     config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
>     config.set_checkpoint_interval(5000)
>     
> config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
>     # define the source
>     data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": 
> [0,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [1,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [2,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [3,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [4,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [5,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [6,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [7,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [8,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [9,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [10,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [11,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [12,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [13,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [14,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [15,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [16,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [17,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [18,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [19,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [20,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [21,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [22,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [23,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [24,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [25,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [26,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [27,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [28,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [29,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [30,0]}'])
>     data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": 
> [0,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [1,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [2,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [3,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [4,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [5,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [6,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [7,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [8,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [9,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [10,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [11,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [12,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [13,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [14,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [15,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [16,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [17,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [18,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [19,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [20,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [21,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [22,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [23,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [24,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [25,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [26,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [27,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [28,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [29,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [30,0]}'])
>     # group_keys = ['user_id', 'goods_id']
>     group_keys = ['user_id']
>     sink_to_file_flag = True
>     data_stream = data_stream1.union(data_stream2)
>     # user_function = __import__("UserFunction")
>     ds = data_stream.map(lambda v: to_dict_map(v)) \
>         .filter(lambda v: v) \
>         .map(lambda v: v) \
>         .key_by(lambda v: get_group_key(v, group_keys)) \
>         .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
>         .process(CountWindowProcessFunction(lambda v: v), Types.STRING())
>     ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))
>     base_path = "/tmp/1.txt"
>     encoder = Encoder.simple_string_encoder()
>     file_sink_builder = FileSink.for_row_format(base_path, encoder)
>     file_sink = file_sink_builder \
>         .with_bucket_check_interval(1000) \
>         .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
>         .with_output_file_config(
>         
> OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build())
>  \
>         .build()
>     ds.sink_to(file_sink)
>     # submit for execution
>     env.execute()
> {code}
> The stream graph is as following:
> {code}
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: Collection Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Collection Source",
>     "parallelism" : 1
>   }, {
>     "id" : 2,
>     "type" : "Source: Collection Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Collection Source",
>     "parallelism" : 1
>   }, {
>     "id" : 9,
>     "type" : "TumblingProcessingTimeWindows",
>     "pact" : "Operator",
>     "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
> ProcessingTimeTrigger, CountWindowProcessFunction)",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 15,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 15,
>     "type" : "Map, Filter, Map, _stream_key_by_map_operator",
>     "pact" : "Operator",
>     "contents" : "Map, Filter, Map, _stream_key_by_map_operator",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     }, {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 16,
>     "type" : "TumblingProcessingTimeWindows, Map",
>     "pact" : "Operator",
>     "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
> ProcessingTimeTrigger, CountWindowProcessFunction)",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 15,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 18,
>     "type" : "Sink: Writer",
>     "pact" : "Operator",
>     "contents" : "Sink: Writer",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 10,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 20,
>     "type" : "Sink: Committer",
>     "pact" : "Operator",
>     "contents" : "Sink: Committer",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 18,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The plan is incorrect as we can see that TumblingProcessingTimeWindows 
> appears twice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to