[ 
https://issues.apache.org/jira/browse/FLINK-31272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31272:
-----------------------------------
    Labels: pull-request-available  (was: )

> Duplicate operators appear in the StreamGraph for Python DataStream API jobs
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-31272
>                 URL: https://issues.apache.org/jira/browse/FLINK-31272
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.16.0
>            Reporter: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>
> For the following job:
> {code}
> import argparse
> import json
> import sys
> import time
> from typing import Iterable, cast
> from pyflink.common import Types, Time, Encoder
> from pyflink.datastream import StreamExecutionEnvironment, 
> ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
>     PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, 
> ExternalizedCheckpointCleanup
> from pyflink.datastream.connectors.file_system import FileSink, 
> RollingPolicy, OutputFileConfig
> from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
> from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, 
> TumblingProcessingTimeWindows, \
>     ProcessingTimeTrigger
> class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
>     def __init__(self, window_size: int):
>         self._window_size = window_size
>         self._count_state_descriptor = ReducingStateDescriptor(
>             "count", lambda a, b: a + b, Types.LONG())
>     @staticmethod
>     def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
>         return CountWithProcessTimeoutTrigger(window_size)
>     def on_element(self,
>                    element: T,
>                    timestamp: int,
>                    window: TimeWindow,
>                    ctx: 'Trigger.TriggerContext') -> TriggerResult:
>         count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
>         count_state.add(1)
>         # print("element arrive:", element, "count_state:", 
> count_state.get(), window.max_timestamp(),
>         #       ctx.get_current_watermark())
>         if count_state.get() >= self._window_size:  # 必须fire&purge!!!!
>             print("fire element count", element, count_state.get(), 
> window.max_timestamp(),
>                   ctx.get_current_watermark())
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>         if timestamp >= window.end:
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>         else:
>             return TriggerResult.CONTINUE
>     def on_processing_time(self,
>                            timestamp: int,
>                            window: TimeWindow,
>                            ctx: Trigger.TriggerContext) -> TriggerResult:
>         if timestamp >= window.end:
>             return TriggerResult.CONTINUE
>         else:
>             print("fire with process_time:", timestamp)
>             count_state = cast(ReducingState, 
> ctx.get_partitioned_state(self._count_state_descriptor))
>             count_state.clear()
>             return TriggerResult.FIRE_AND_PURGE
>     def on_event_time(self,
>                       timestamp: int,
>                       window: TimeWindow,
>                       ctx: 'Trigger.TriggerContext') -> TriggerResult:
>         return TriggerResult.CONTINUE
>     def clear(self,
>               window: TimeWindow,
>               ctx: 'Trigger.TriggerContext') -> None:
>         count_state = ctx.get_partitioned_state(self._count_state_descriptor)
>         count_state.clear()
> def to_dict_map(v):
>     time.sleep(1)
>     dict_value = json.loads(v)
>     return dict_value
> def get_group_key(value, keys):
>     group_key_values = []
>     for key in keys:
>         one_key_value = 'null'
>         if key in value:
>             list_value = value[key]
>             if list_value:
>                 one_key_value = str(list_value[0])
>         group_key_values.append(one_key_value)
>     group_key = '_'.join(group_key_values)
>     # print("group_key=", group_key)
>     return group_key
> class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, 
> TimeWindow]):
>     def __init__(self, uf):
>         self._user_function = uf
>     def process(self,
>                 key: str,
>                 context: ProcessWindowFunction.Context[TimeWindow],
>                 elements: Iterable[dict]) -> Iterable[dict]:
>         result_list = 
> self._user_function.process_after_group_by_function(elements)
>         return result_list
> if __name__ == '__main__':
>     parser = argparse.ArgumentParser()
>     parser.add_argument(
>         '--output',
>         dest='output',
>         required=False,
>         help='Output file to write results to.')
>     argv = sys.argv[1:]
>     known_args, _ = parser.parse_known_args(argv)
>     output_path = known_args.output
>     env = StreamExecutionEnvironment.get_execution_environment()
>     # write all the data to one file
>     env.set_parallelism(1)
>     # process time
>     env.get_config().set_auto_watermark_interval(0)
>     state_backend = EmbeddedRocksDBStateBackend(True)
>     
> state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>     env.set_state_backend(state_backend)
>     config = env.get_checkpoint_config()
>     # 
> config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
>     
> config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
>     config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
>     config.set_checkpoint_interval(5000)
>     
> config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
>     # define the source
>     data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": 
> [0,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [1,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [2,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [3,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [4,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [5,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [6,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [7,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [8,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [9,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [10,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [11,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [12,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [13,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [14,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [15,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [16,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [17,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [18,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [19,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [20,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [21,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [22,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [23,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [24,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [25,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [26,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [27,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [28,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [29,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [30,0]}'])
>     data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": 
> [0,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [1,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [2,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [3,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [4,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [5,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [6,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [7,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [8,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [9,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [10,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [11,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [12,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [13,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [14,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [15,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [16,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [17,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [18,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [19,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [20,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [21,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [22,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [23,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [24,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [25,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [26,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [27,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [28,0]}',
>                                         '{"user_id": ["1"], "goods_id": 
> [29,0]}',
>                                         '{"user_id": ["2"], "goods_id": 
> [30,0]}'])
>     # group_keys = ['user_id', 'goods_id']
>     group_keys = ['user_id']
>     sink_to_file_flag = True
>     data_stream = data_stream1.union(data_stream2)
>     # user_function = __import__("UserFunction")
>     ds = data_stream.map(lambda v: to_dict_map(v)) \
>         .filter(lambda v: v) \
>         .map(lambda v: v) \
>         .key_by(lambda v: get_group_key(v, group_keys)) \
>         .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
>         .process(CountWindowProcessFunction(lambda v: v), Types.STRING())
>     ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))
>     base_path = "/tmp/1.txt"
>     encoder = Encoder.simple_string_encoder()
>     file_sink_builder = FileSink.for_row_format(base_path, encoder)
>     file_sink = file_sink_builder \
>         .with_bucket_check_interval(1000) \
>         .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
>         .with_output_file_config(
>         
> OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build())
>  \
>         .build()
>     ds.sink_to(file_sink)
>     # submit for execution
>     env.execute()
> {code}
> The stream graph is as following:
> {code}
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: Collection Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Collection Source",
>     "parallelism" : 1
>   }, {
>     "id" : 2,
>     "type" : "Source: Collection Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Collection Source",
>     "parallelism" : 1
>   }, {
>     "id" : 9,
>     "type" : "TumblingProcessingTimeWindows",
>     "pact" : "Operator",
>     "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
> ProcessingTimeTrigger, CountWindowProcessFunction)",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 15,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 15,
>     "type" : "Map, Filter, Map, _stream_key_by_map_operator",
>     "pact" : "Operator",
>     "contents" : "Map, Filter, Map, _stream_key_by_map_operator",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     }, {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 16,
>     "type" : "TumblingProcessingTimeWindows, Map",
>     "pact" : "Operator",
>     "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), 
> ProcessingTimeTrigger, CountWindowProcessFunction)",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 15,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 18,
>     "type" : "Sink: Writer",
>     "pact" : "Operator",
>     "contents" : "Sink: Writer",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 10,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 20,
>     "type" : "Sink: Committer",
>     "pact" : "Operator",
>     "contents" : "Sink: Committer",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 18,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The plan is incorrect as we can see that TumblingProcessingTimeWindows 
> appears twice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to