Hi Team,

you can ignore this thread. I was able to resolve this.

J.

On Mon, Oct 2, 2023 at 8:40 PM joshua perez <joshp19...@gmail.com> wrote:

> Hi team,
>
> I am trying to read the records from the Kafka topic and below is my very
> basic code as of now
>
> from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
> from pyflink.datastream.stream_execution_environment import 
> StreamExecutionEnvironment, RuntimeExecutionMode
> from pyflink.common import SimpleStringSchema
>
>
> class SourceData(object):
>     def __init__(self, env):
>         self.env = env
>         self.env.add_jars("file:///flink-sql-connector-kafka-1.17.1.jar")
>         self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>         self.env.set_parallelism(1)
>
>     def get_data(self):
>         source = FlinkKafkaConsumer(
>             topics="test-topic1",
>             deserialization_schema=SimpleStringSchema(),
>             properties={
>                 "bootstrap.servers": "localhost:9092",
>                 "auto.offset.reset": "earliest",
>             }
>         )
>         self.env \
>             .add_source(source) \
>             .print()
>         self.env.execute("source")
>
> print(SourceData(StreamExecutionEnvironment.get_execution_environment()).get_data())
>
>
> Now, when I try to add the jars via code or if I put the jars inside pyflink 
> > lib directory, nothing works.
>
>
> Can someone help me to resolve my problem? I am stuck since hours. 
> KafkaSource.builder() also doesn't work.
>
>
> I tried out many solutions available on stackoverflow but no luck.
>
> My python version is 3.10
>
> Java: 11
>
> Flink: 1.17.1
>
>
> J.
>
>
>
>

Reply via email to