From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai]
Sent: Thursday, April 14, 2022 4:04 PM
To: user-i...@flink.apache.org
Cc: harshit.varsh...@iktara.ai
Subject: Pyflink Kafka consumer error (version 1.14.4)
Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages.
I am getting following error when using FlinkKafkaConsumer connector.
: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/ap
ache/flink/metrics/MetricGroup;
Below is my code for reference..
def streaming_square_roots():
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues
env.add_jars("file:///C:/Users/Admin/Desktop/test11/flink-sql-connector-kafk
a.jar <file:///C:\Users\Admin\Desktop\test11\flink-sql-connector-kafka.jar>
")
deserialization_schema = SimpleStringSchema()
kafka_consumer = FlinkKafkaConsumer(
topics='new-numbers',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092'})
ds = env.add_source(kafka_consumer)
ds.print()
# 4. create sink and emit result to sink
env.execute(job_name='streaming_square_roots')
if __name__ == '__main__':
streaming_square_roots()
Thanks and Regards,
Harshit