Dear Team,
I am new to pyflink and request for your support in issue I am facing with Pyflink. I am using Pyflink version 1.15.0 & using reference code from pyflink reference code. The errors I am getting Traceback (most recent call last): File "E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane. py", line 470, in input_elements element = received.get(timeout=1) File "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py", line 178, in get raise Empty _queue.Empty RuntimeError: Channel closed prematurely. My code is: import json import os import time from datetime import datetime from pyflink.common import SimpleStringSchema, JsonRowDeserializationSchema, Types, JsonRowSerializationSchema from pyflink.datastream import StreamExecutionEnvironment, WindowFunction, HashMapStateBackend, CheckpointingMode, \ FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext, EmbeddedRocksDBStateBackend, RocksDBStateBackend from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, ListStateDescriptor from sklearn.preprocessing import LabelEncoder import pickle import pandas as pd from pyflink.common import Row import argparse from typing import Iterable from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy from pyflink.common import Types, WatermarkStrategy, Time, Encoder from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows class MyTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp) -> int: return int(value[0]) class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]): def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]): # return [(key, result)] return [(key, len([e for e in inputs]))] class Storage(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): state_descriptor = ValueStateDescriptor("state", Types.FLOAT()) state_ttl_config = StateTtlConfig \ .new_builder(Time.days(7)) \ .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \ .disable_cleanup_in_background() \ .build() state_descriptor.enable_time_to_live(state_ttl_config) self.state = runtime_context.get_state(state_descriptor) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): # retrieve the current count current = self.state.value() if current is None: current = 0 current = value[1] self.state.update(current) yield current,time.time() def write_to_kafka(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.enable_checkpointing(1000) env.get_checkpoint_config().set_min_pause_between_checkpoints(5000) env.set_state_backend(EmbeddedRocksDBStateBackend()) env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAS T_ONCE) #env.get_checkpoint_config().enable_unaligned_checkpoints() check = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'checkpoint-dir11') env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStora ge("file:///{}".format(check))) kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flink-sql-connector-kafka_2.11-1.14.4.jar') env.add_jars("file:///{}".format(kafka_jar)) rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flink-statebackend-rocksdb_2.11-1.0.0.jar') env.add_jars("file:///{}".format(rocksdb_jar)) # deserialization_schema = SimpleStringSchema() deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW_NAMED(["time_stamp", "Bill_number", "Store_Code", "itemdescription", "Item_code", "Gross_Price", "Discount", "Net_Price", "purchaseorReturn", "Membership_No", "Billing_Date", "Billing_Time"], [Types.DOUBLE(), Types.STRING(), Types.INT(), Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT(), Types.FLOAT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id': 'test_group1'}) ds = env.add_source(kafka_consumer) # ds.print() watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ .with_timestamp_assigner(MyTimestampAssigner()) # ds = ds.map(lambda x: eval(x)) ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \ .key_by(lambda x: '/root', key_type=Types.STRING()) \ .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) \ .apply(CountWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) ds1.print() ds4 = ds1.key_by(lambda value: value[0]) \ .process(Storage()) ds4.print() # ds1 = ds.map(mlfunc) # ds1.print() env.execute('write_to_kafka') if __name__ == '__main__': print("start writing data to kafka") write_to_kafka()