Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-13 Thread karan alang
; >>> # called from - foreachbatch >>> def convertToDictForEachBatch(df, batchId): >>> >>> # checks for event in topic - events_topic and further processing takes >>> place if there is new data in the topic >>> events = spark.read.format('kafka

Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
rap.servers", kafkaBrokers) \ >> .option("kafka.security.protocol", "SSL") \ >> .option("kafka.ssl.truststore.location", ssl_truststore_location) \ >> .option("kafka.ssl.truststore.password", ssl

Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
.option("kafka.ssl.keystore.password", > ssl_keystore_password_reloadpred) \ > .option("subscribe", topic_reloadpred) \ > .option("kafka.group.id", consumerGroupId_reloadpred) \ > .load() > > # events is passed

StructuredStreaming - processing data based on new events in Kafka topic

2022-03-11 Thread karan alang
opic_reloadpred) \ .option("kafka.group.id", consumerGroupId_reloadpred) \ .load() # events is passed to a function, and processing is done if new events are generated ``` What is the best way to achieve this ? The current code is reading the entire data in the kafka topic, i need it to read only the new data. Additional Details in stackoverflow : https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic tia!