[ https://issues.apache.org/jira/browse/FLINK-31272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-31272: ----------------------------------- Labels: pull-request-available (was: ) > Duplicate operators appear in the StreamGraph for Python DataStream API jobs > ---------------------------------------------------------------------------- > > Key: FLINK-31272 > URL: https://issues.apache.org/jira/browse/FLINK-31272 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.16.0 > Reporter: Dian Fu > Priority: Major > Labels: pull-request-available > > For the following job: > {code} > import argparse > import json > import sys > import time > from typing import Iterable, cast > from pyflink.common import Types, Time, Encoder > from pyflink.datastream import StreamExecutionEnvironment, > ProcessWindowFunction, EmbeddedRocksDBStateBackend, \ > PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, > ExternalizedCheckpointCleanup > from pyflink.datastream.connectors.file_system import FileSink, > RollingPolicy, OutputFileConfig > from pyflink.datastream.state import ReducingState, ReducingStateDescriptor > from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, > TumblingProcessingTimeWindows, \ > ProcessingTimeTrigger > class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger): > def __init__(self, window_size: int): > self._window_size = window_size > self._count_state_descriptor = ReducingStateDescriptor( > "count", lambda a, b: a + b, Types.LONG()) > @staticmethod > def of(window_size: int) -> 'CountWithProcessTimeoutTrigger': > return CountWithProcessTimeoutTrigger(window_size) > def on_element(self, > element: T, > timestamp: int, > window: TimeWindow, > ctx: 'Trigger.TriggerContext') -> TriggerResult: > count_state = cast(ReducingState, > ctx.get_partitioned_state(self._count_state_descriptor)) > count_state.add(1) > # print("element arrive:", element, "count_state:", > count_state.get(), window.max_timestamp(), > # ctx.get_current_watermark()) > if count_state.get() >= self._window_size: # 必须fire&purge!!!! > print("fire element count", element, count_state.get(), > window.max_timestamp(), > ctx.get_current_watermark()) > count_state.clear() > return TriggerResult.FIRE_AND_PURGE > if timestamp >= window.end: > count_state.clear() > return TriggerResult.FIRE_AND_PURGE > else: > return TriggerResult.CONTINUE > def on_processing_time(self, > timestamp: int, > window: TimeWindow, > ctx: Trigger.TriggerContext) -> TriggerResult: > if timestamp >= window.end: > return TriggerResult.CONTINUE > else: > print("fire with process_time:", timestamp) > count_state = cast(ReducingState, > ctx.get_partitioned_state(self._count_state_descriptor)) > count_state.clear() > return TriggerResult.FIRE_AND_PURGE > def on_event_time(self, > timestamp: int, > window: TimeWindow, > ctx: 'Trigger.TriggerContext') -> TriggerResult: > return TriggerResult.CONTINUE > def clear(self, > window: TimeWindow, > ctx: 'Trigger.TriggerContext') -> None: > count_state = ctx.get_partitioned_state(self._count_state_descriptor) > count_state.clear() > def to_dict_map(v): > time.sleep(1) > dict_value = json.loads(v) > return dict_value > def get_group_key(value, keys): > group_key_values = [] > for key in keys: > one_key_value = 'null' > if key in value: > list_value = value[key] > if list_value: > one_key_value = str(list_value[0]) > group_key_values.append(one_key_value) > group_key = '_'.join(group_key_values) > # print("group_key=", group_key) > return group_key > class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, > TimeWindow]): > def __init__(self, uf): > self._user_function = uf > def process(self, > key: str, > context: ProcessWindowFunction.Context[TimeWindow], > elements: Iterable[dict]) -> Iterable[dict]: > result_list = > self._user_function.process_after_group_by_function(elements) > return result_list > if __name__ == '__main__': > parser = argparse.ArgumentParser() > parser.add_argument( > '--output', > dest='output', > required=False, > help='Output file to write results to.') > argv = sys.argv[1:] > known_args, _ = parser.parse_known_args(argv) > output_path = known_args.output > env = StreamExecutionEnvironment.get_execution_environment() > # write all the data to one file > env.set_parallelism(1) > # process time > env.get_config().set_auto_watermark_interval(0) > state_backend = EmbeddedRocksDBStateBackend(True) > > state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED) > env.set_state_backend(state_backend) > config = env.get_checkpoint_config() > # > config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/")) > > config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/")) > config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE) > config.set_checkpoint_interval(5000) > > config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) > # define the source > data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id": > [0,0]}', > '{"user_id": ["1"], "goods_id": > [1,0]}', > '{"user_id": ["2"], "goods_id": > [2,0]}', > '{"user_id": ["1"], "goods_id": > [3,0]}', > '{"user_id": ["2"], "goods_id": > [4,0]}', > '{"user_id": ["1"], "goods_id": > [5,0]}', > '{"user_id": ["2"], "goods_id": > [6,0]}', > '{"user_id": ["1"], "goods_id": > [7,0]}', > '{"user_id": ["2"], "goods_id": > [8,0]}', > '{"user_id": ["1"], "goods_id": > [9,0]}', > '{"user_id": ["2"], "goods_id": > [10,0]}', > '{"user_id": ["1"], "goods_id": > [11,0]}', > '{"user_id": ["2"], "goods_id": > [12,0]}', > '{"user_id": ["1"], "goods_id": > [13,0]}', > '{"user_id": ["2"], "goods_id": > [14,0]}', > '{"user_id": ["1"], "goods_id": > [15,0]}', > '{"user_id": ["2"], "goods_id": > [16,0]}', > '{"user_id": ["1"], "goods_id": > [17,0]}', > '{"user_id": ["2"], "goods_id": > [18,0]}', > '{"user_id": ["1"], "goods_id": > [19,0]}', > '{"user_id": ["2"], "goods_id": > [20,0]}', > '{"user_id": ["1"], "goods_id": > [21,0]}', > '{"user_id": ["2"], "goods_id": > [22,0]}', > '{"user_id": ["1"], "goods_id": > [23,0]}', > '{"user_id": ["2"], "goods_id": > [24,0]}', > '{"user_id": ["1"], "goods_id": > [25,0]}', > '{"user_id": ["2"], "goods_id": > [26,0]}', > '{"user_id": ["1"], "goods_id": > [27,0]}', > '{"user_id": ["2"], "goods_id": > [28,0]}', > '{"user_id": ["1"], "goods_id": > [29,0]}', > '{"user_id": ["2"], "goods_id": > [30,0]}']) > data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id": > [0,0]}', > '{"user_id": ["1"], "goods_id": > [1,0]}', > '{"user_id": ["2"], "goods_id": > [2,0]}', > '{"user_id": ["1"], "goods_id": > [3,0]}', > '{"user_id": ["2"], "goods_id": > [4,0]}', > '{"user_id": ["1"], "goods_id": > [5,0]}', > '{"user_id": ["2"], "goods_id": > [6,0]}', > '{"user_id": ["1"], "goods_id": > [7,0]}', > '{"user_id": ["2"], "goods_id": > [8,0]}', > '{"user_id": ["1"], "goods_id": > [9,0]}', > '{"user_id": ["2"], "goods_id": > [10,0]}', > '{"user_id": ["1"], "goods_id": > [11,0]}', > '{"user_id": ["2"], "goods_id": > [12,0]}', > '{"user_id": ["1"], "goods_id": > [13,0]}', > '{"user_id": ["2"], "goods_id": > [14,0]}', > '{"user_id": ["1"], "goods_id": > [15,0]}', > '{"user_id": ["2"], "goods_id": > [16,0]}', > '{"user_id": ["1"], "goods_id": > [17,0]}', > '{"user_id": ["2"], "goods_id": > [18,0]}', > '{"user_id": ["1"], "goods_id": > [19,0]}', > '{"user_id": ["2"], "goods_id": > [20,0]}', > '{"user_id": ["1"], "goods_id": > [21,0]}', > '{"user_id": ["2"], "goods_id": > [22,0]}', > '{"user_id": ["1"], "goods_id": > [23,0]}', > '{"user_id": ["2"], "goods_id": > [24,0]}', > '{"user_id": ["1"], "goods_id": > [25,0]}', > '{"user_id": ["2"], "goods_id": > [26,0]}', > '{"user_id": ["1"], "goods_id": > [27,0]}', > '{"user_id": ["2"], "goods_id": > [28,0]}', > '{"user_id": ["1"], "goods_id": > [29,0]}', > '{"user_id": ["2"], "goods_id": > [30,0]}']) > # group_keys = ['user_id', 'goods_id'] > group_keys = ['user_id'] > sink_to_file_flag = True > data_stream = data_stream1.union(data_stream2) > # user_function = __import__("UserFunction") > ds = data_stream.map(lambda v: to_dict_map(v)) \ > .filter(lambda v: v) \ > .map(lambda v: v) \ > .key_by(lambda v: get_group_key(v, group_keys)) \ > .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \ > .process(CountWindowProcessFunction(lambda v: v), Types.STRING()) > ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE())) > base_path = "/tmp/1.txt" > encoder = Encoder.simple_string_encoder() > file_sink_builder = FileSink.for_row_format(base_path, encoder) > file_sink = file_sink_builder \ > .with_bucket_check_interval(1000) \ > .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \ > .with_output_file_config( > > OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build()) > \ > .build() > ds.sink_to(file_sink) > # submit for execution > env.execute() > {code} > The stream graph is as following: > {code} > { > "nodes" : [ { > "id" : 1, > "type" : "Source: Collection Source", > "pact" : "Data Source", > "contents" : "Source: Collection Source", > "parallelism" : 1 > }, { > "id" : 2, > "type" : "Source: Collection Source", > "pact" : "Data Source", > "contents" : "Source: Collection Source", > "parallelism" : 1 > }, { > "id" : 9, > "type" : "TumblingProcessingTimeWindows", > "pact" : "Operator", > "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), > ProcessingTimeTrigger, CountWindowProcessFunction)", > "parallelism" : 1, > "predecessors" : [ { > "id" : 15, > "ship_strategy" : "HASH", > "side" : "second" > } ] > }, { > "id" : 10, > "type" : "Map", > "pact" : "Operator", > "contents" : "Map", > "parallelism" : 1, > "predecessors" : [ { > "id" : 9, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 15, > "type" : "Map, Filter, Map, _stream_key_by_map_operator", > "pact" : "Operator", > "contents" : "Map, Filter, Map, _stream_key_by_map_operator", > "parallelism" : 1, > "predecessors" : [ { > "id" : 1, > "ship_strategy" : "FORWARD", > "side" : "second" > }, { > "id" : 2, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 16, > "type" : "TumblingProcessingTimeWindows, Map", > "pact" : "Operator", > "contents" : "Window(TumblingProcessingTimeWindows(12000, 0), > ProcessingTimeTrigger, CountWindowProcessFunction)", > "parallelism" : 1, > "predecessors" : [ { > "id" : 15, > "ship_strategy" : "HASH", > "side" : "second" > } ] > }, { > "id" : 18, > "type" : "Sink: Writer", > "pact" : "Operator", > "contents" : "Sink: Writer", > "parallelism" : 1, > "predecessors" : [ { > "id" : 10, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 20, > "type" : "Sink: Committer", > "pact" : "Operator", > "contents" : "Sink: Committer", > "parallelism" : 1, > "predecessors" : [ { > "id" : 18, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > } ] > } > {code} > The plan is incorrect as we can see that TumblingProcessingTimeWindows > appears twice. -- This message was sent by Atlassian Jira (v8.20.10#820010)