Dear Team,
I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.14.4 & using reference code from pyflink getting started pages. I am getting following error . ImportError: cannot import name 'TumblingEventTimeWindows' from 'pyflink.datastream.window' (C:\Users\Admin\PycharmProjects\pythonProject8\venv\lib\site-packages\pyflin k\datastream\window.py) Below is my code for reference.. import sys import argparse from typing import Iterable from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy from pyflink.common import Types, WatermarkStrategy, Time, Encoder from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp) -> int: return int(value[1]) class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]): def process(self, key: str, context: ProcessWindowFunction.Context[TimeWindow], elements: Iterable[tuple]) -> Iterable[tuple]: return [(key, context.window().start, context.window().end, len([e for e in elements]))] def clear(self, context: ProcessWindowFunction.Context) -> None: pass if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--output', dest='output', required=False, help='Output file to write results to.') argv = sys.argv[1:] known_args, _ = parser.parse_known_args(argv) output_path = known_args.output env = StreamExecutionEnvironment.get_execution_environment() # write all the data to one file env.set_parallelism(1) # define the source data_stream = env.from_collection([ ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)], type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # define the watermark strategy watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ .with_timestamp_assigner(MyTimestampAssigner()) ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ .key_by(lambda x: x[0], key_type=Types.STRING()) \ .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \ .process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()])) # define the sink if output_path is not None: ds.sink_to( sink=FileSink.for_row_format( base_path=output_path, encoder=Encoder.simple_string_encoder()) .with_output_file_config( OutputFileConfig.builder() .with_part_prefix("prefix") .with_part_suffix(".ext") .build()) .with_rolling_policy(RollingPolicy.default_rolling_policy()) .build() ) else: print("Printing result to stdout. Use --output to specify output path.") ds.print() # submit for execution env.execute() Thanks and Regards, Harshit