Hi Kadiyala,
I think that there is a typo from your email:
Fink version 1.7.1
May be 1.17.1 ?
About the error, The reason why your code can't run successfully
is: the class
"*org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer" had
been obsolete since Flink 1.14。*
*
*
You need to use "KafkaSource" and "KafkaSink" to finish your requirement.
Regards,
*Leo*
*
*
在 2023/6/7 23:27, Kadiyala, Ruthvik via user 写道:
Hi,
Please find below the code I have been using to consume a Kafka Stream
that is hosted on confluent. It returns an error regarding the jar
files. Please find the error below the code snippet. Let me know what
I am doing wrong. I am running this on *Docker *with *Flink Version:
1.7.1.*
*Code:*
frompyflink.common.typeinfoimportTypes
frompyflink.datastreamimportStreamExecutionEnvironment
frompyflink.datastream.connectors.kafkaimportFlinkKafkaConsumer
frompyflink.datastream.formats.jsonimportJsonRowDeserializationSchema
importglob
importos
importsys
importlogging
# Set up the execution environment
env=StreamExecutionEnvironment.get_execution_environment()
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
# the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues
env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
# Set up the Confluent Cloud Kafka configuration
kafka_config= {
'bootstrap.servers': 'bootstrap-server',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config':
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="API_KEY" password="API_SECRET";'
}
topic='TOPIC_NAME'
deserialization_schema=JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
# Set up the Kafka consumer properties
consumer_props= {
'bootstrap.servers': kafka_config['bootstrap.servers'],
'security.protocol': kafka_config['security.protocol'],
'sasl.mechanism': kafka_config['sasl.mechanism'],
'sasl.jaas.config': kafka_config['sasl.jaas.config'],
'group.id': 'python-group-1'
}
# Create a Kafka consumer
kafka_consumer=FlinkKafkaConsumer(
topics=topic, # Kafka topic
deserialization_schema=deserialization_schema,
properties=consumer_props, # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream=env.add_source(kafka_consumer)
# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()
# Execute the job
env.execute()
*Error:*
*
*
*Traceback (most recent call last):*
File "/home/pyflink/test.py", line 45, in <module>
kafka_consumer = FlinkKafkaConsumer(
File
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
line 203, in __init__
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties,
deserialization_schema,
File
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
line 161, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
File
"/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py",
line 185, in wrapped_call
raise TypeError(
*TypeError: Could not found the Java class
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The
Java dependencies could be specified via command line argument
'--jarfile' or the config option 'pipeline.jars'*
Cheers & Regards,
Ruthvik Kadiyala