Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.
The errors I am getting
Traceback (most recent call last):
File
"E:\pythonProject16\lib\site-packages\apache_beam\runners\worker\data_plane.
py", line 470, in input_elements
element = received.get(timeout=1)
File "C:\Users\Admin\AppData\Local\Programs\Python\Python38\lib\queue.py",
line 178, in get
raise Empty
_queue.Empty
RuntimeError: Channel closed prematurely.
My code is:
import json
import os
import time
from datetime import datetime
from pyflink.common import SimpleStringSchema, JsonRowDeserializationSchema,
Types, JsonRowSerializationSchema
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction,
HashMapStateBackend, CheckpointingMode, \
FileSystemCheckpointStorage, KeyedProcessFunction, RuntimeContext,
EmbeddedRocksDBStateBackend, RocksDBStateBackend
from pyflink.datastream.connectors import FlinkKafkaConsumer,
FlinkKafkaProducer
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
ListStateDescriptor
from sklearn.preprocessing import LabelEncoder
import pickle
import pandas as pd
from pyflink.common import Row
import argparse
from typing import Iterable
from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow,
TumblingProcessingTimeWindows
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[0])
class CountWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
# return [(key, result)]
return [(key, len([e for e in inputs]))]
class Storage(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
state_ttl_config = StateTtlConfig \
.new_builder(Time.days(7)) \
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
.disable_cleanup_in_background() \
.build()
state_descriptor.enable_time_to_live(state_ttl_config)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = 0
current = value[1]
self.state.update(current)
yield current,time.time()
def write_to_kafka():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.enable_checkpointing(1000)
env.get_checkpoint_config().set_min_pause_between_checkpoints(5000)
env.set_state_backend(EmbeddedRocksDBStateBackend())
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.AT_LEAS
T_ONCE)
#env.get_checkpoint_config().enable_unaligned_checkpoints()
check = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'checkpoint-dir11')
env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStora
ge("file:///{}".format(check)))
kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-sql-connector-kafka_2.11-1.14.4.jar')
env.add_jars("file:///{}".format(kafka_jar))
rocksdb_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-statebackend-rocksdb_2.11-1.0.0.jar')
env.add_jars("file:///{}".format(rocksdb_jar))
# deserialization_schema = SimpleStringSchema()
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW_NAMED(["time_stamp",
"Bill_number", "Store_Code",
"itemdescription", "Item_code",
"Gross_Price", "Discount",
"Net_Price",
"purchaseorReturn",
"Membership_No",
"Billing_Date", "Billing_Time"],
[Types.DOUBLE(),
Types.STRING(), Types.INT(), Types.STRING(),
Types.STRING(), Types.FLOAT(),
Types.FLOAT(), Types.FLOAT(),
Types.STRING(),
Types.STRING(), Types.STRING(),
Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='test',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '192.168.1.37:9092', 'group.id':
'test_group1'})
ds = env.add_source(kafka_consumer)
# ds.print()
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
# ds = ds.map(lambda x: eval(x))
ds1 = ds.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: '/root', key_type=Types.STRING()) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) \
.apply(CountWindowFunction(), Types.TUPLE([Types.STRING(),
Types.INT()]))
ds1.print()
ds4 = ds1.key_by(lambda value: value[0]) \
.process(Storage())
ds4.print()
# ds1 = ds.map(mlfunc)
# ds1.print()
env.execute('write_to_kafka')
if __name__ == '__main__':
print("start writing data to kafka")
write_to_kafka()