Hi team,

I am trying to read the records from the Kafka topic and below is my very
basic code as of now

from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.common import SimpleStringSchema


class SourceData(object):
    def __init__(self, env):
        self.env = env
        self.env.add_jars("file:///flink-sql-connector-kafka-1.17.1.jar")
        self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
        self.env.set_parallelism(1)

    def get_data(self):
        source = FlinkKafkaConsumer(
            topics="test-topic1",
            deserialization_schema=SimpleStringSchema(),
            properties={
                "bootstrap.servers": "localhost:9092",
                "auto.offset.reset": "earliest",
            }
        )
        self.env \
            .add_source(source) \
            .print()
        self.env.execute("source")

print(SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data())


Now, when I try to add the jars via code or if I put the jars inside
pyflink > lib directory, nothing works.


Can someone help me to resolve my problem? I am stuck since hours.
KafkaSource.builder() also doesn't work.


I tried out many solutions available on stackoverflow but no luck.

My python version is 3.10

Java: 11

Flink: 1.17.1


J.

Reply via email to