From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] 
Sent: Thursday, April 14, 2022 4:04 PM
To: user-i...@flink.apache.org
Cc: harshit.varsh...@iktara.ai
Subject: Pyflink Kafka consumer error (version 1.14.4)

 

Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages. 

 

I am getting following error when using FlinkKafkaConsumer connector. 

: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: java.lang.NoSuchMethodError:
org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/ap
ache/flink/metrics/MetricGroup;

 

Below is my code for reference..

 

def streaming_square_roots():

    env = StreamExecutionEnvironment.get_execution_environment()

    # the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues

 
env.add_jars("file:///C:/Users/Admin/Desktop/test11/flink-sql-connector-kafk
a.jar <file:///C:\Users\Admin\Desktop\test11\flink-sql-connector-kafka.jar>
")

 

    deserialization_schema = SimpleStringSchema()

 

    kafka_consumer = FlinkKafkaConsumer(

        topics='new-numbers',

        deserialization_schema=deserialization_schema,

        properties={'bootstrap.servers': 'localhost:9092'})

 

    ds = env.add_source(kafka_consumer)

 

    ds.print()

 

 

    # 4. create sink and emit result to sink

 

    env.execute(job_name='streaming_square_roots')

 

 

if __name__ == '__main__':

    streaming_square_roots()

 

Thanks and Regards,

Harshit

 

Reply via email to