Hi Team, you can ignore this thread. I was able to resolve this.
J. On Mon, Oct 2, 2023 at 8:40 PM joshua perez <joshp19...@gmail.com> wrote: > Hi team, > > I am trying to read the records from the Kafka topic and below is my very > basic code as of now > > from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer > from pyflink.datastream.stream_execution_environment import > StreamExecutionEnvironment, RuntimeExecutionMode > from pyflink.common import SimpleStringSchema > > > class SourceData(object): > def __init__(self, env): > self.env = env > self.env.add_jars("file:///flink-sql-connector-kafka-1.17.1.jar") > self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING) > self.env.set_parallelism(1) > > def get_data(self): > source = FlinkKafkaConsumer( > topics="test-topic1", > deserialization_schema=SimpleStringSchema(), > properties={ > "bootstrap.servers": "localhost:9092", > "auto.offset.reset": "earliest", > } > ) > self.env \ > .add_source(source) \ > .print() > self.env.execute("source") > > print(SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data()) > > > Now, when I try to add the jars via code or if I put the jars inside pyflink > > lib directory, nothing works. > > > Can someone help me to resolve my problem? I am stuck since hours. > KafkaSource.builder() also doesn't work. > > > I tried out many solutions available on stackoverflow but no luck. > > My python version is 3.10 > > Java: 11 > > Flink: 1.17.1 > > > J. > > > >