;
>>> # called from - foreachbatch
>>> def convertToDictForEachBatch(df, batchId):
>>>
>>> # checks for event in topic - events_topic and further processing takes
>>> place if there is new data in the topic
>>> events = spark.read.format('kafka
rap.servers", kafkaBrokers) \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", ssl
.option("kafka.ssl.keystore.password",
> ssl_keystore_password_reloadpred) \
> .option("subscribe", topic_reloadpred) \
> .option("kafka.group.id", consumerGroupId_reloadpred) \
> .load()
>
> # events is passed
opic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()
# events is passed to a function, and processing is done if new
events are generated
```
What is the best way to achieve this ? The current code is reading the
entire data in the kafka topic, i need it to read only the new data.
Additional Details in stackoverflow :
https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
tia!