There are a number of flaws here. You have defined your trigger based processing time within Spark Structured Streaming (SSS) as below
trigger(processingTime='4 minutes') SSS will trigger every 4 minutes, in other words within a micro-batch of 4 minutes. This is what is known as micro-batch interval. The way this works is that SSS is actioned every 4 minutes. If the previous batch finished in 1 minute, then SSS will wait for (4-1 = 3) minutes before processing again. If the previous processing took 5 minutes to finish, then we have a potential backlog and SSS will process immediately after the previous job finishes (in other words it kicks off the next micro-batch). Now the function foreachBatch(convertToDictForEachBatch) performs custom write logic on each micro-batch through convertToDictForEachBatch function. foreachBatch(convertToDictForEachBatch) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. However, in calling the function convertToDictForEachBatch as below def convertToDictForEachBatch(df, batchId): # checks for event in topic - events_topic and further processing takes place if there is new data in the topic events = spark.read.format('kafka') \ .option("kafka.bootstrap.servers", kafkaBrokers) \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", ssl_truststore_location) \ .option("kafka.ssl.truststore.password", ssl_truststore_password) \ .option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \ .option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \ .option("subscribe", topic_reloadpred) \ .option("kafka.group.id", consumerGroupId_reloadpred) \ .load() There is no use case for df -> DataFrame in the code? So what are you checking here? What happens if df is empty? HTH view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > How do you check if new data is in the topic and what happens if not? > > On Sat, 12 Mar 2022 at 00:40, karan alang <karan.al...@gmail.com> wrote: > >> Hello All, >> >> I have a structured Streaming program, which reads data from Kafka topic, >> and does some processing, and finally puts data into target Kafka Topic. >> >> Note : the processing is donee topic in function - >> convertToDictForEachBatch(), which is called using - >> foreachBatch(convertToDictForEachBatcha is in th) >> >> As part of the processing, it reads another Kafka Topic (events_topic), >> and if there is New record(s) after the last read, it does some additional >> processing - reloads data from BigQuery table, and persists it. >> >> Here is the code : >> >> ``` >> >> df_stream = spark.readStream.format('kafka') \ >> .option("kafka.security.protocol", "SSL") \ >> .option("kafka.ssl.truststore.location", ssl_truststore_location) \ >> .option("kafka.ssl.truststore.password", ssl_truststore_password) \ >> .option("kafka.ssl.keystore.location", ssl_keystore_location) \ >> .option("kafka.ssl.keystore.password", ssl_keystore_password) \ >> .option("kafka.bootstrap.servers",kafkaBrokers)\ >> .option("subscribe", topic) \ >> .option("kafka.group.id", consumerGroupId)\ >> .option("startingOffsets", "latest") \ >> .option("failOnDataLoss", "false") \ >> .option("maxOffsetsPerTrigger", 10000) \ >> .load() >> >> >> print(" df_stream -> ", df_stream) >> query = df_stream.selectExpr("CAST(value AS STRING)", >> "timestamp").writeStream \ >> .outputMode("append") \ >> .trigger(processingTime='4 minutes') \ >> .option("numRows",10000)\ >> .option("truncate", "false") \ >> .option("checkpointLocation", checkpoint) \ >> .foreachBatch(convertToDictForEachBatch) \ >> .start() >> >> query.awaitTermination() >> >> ``` >> >> # called from - foreachbatch >> def convertToDictForEachBatch(df, batchId): >> >> # checks for event in topic - events_topic and further processing takes >> place if there is new data in the topic >> events = spark.read.format('kafka') \ >> .option("kafka.bootstrap.servers", kafkaBrokers) \ >> .option("kafka.security.protocol", "SSL") \ >> .option("kafka.ssl.truststore.location", ssl_truststore_location) \ >> .option("kafka.ssl.truststore.password", ssl_truststore_password) \ >> .option("kafka.ssl.keystore.location", >> ssl_keystore_location_reloadpred) \ >> .option("kafka.ssl.keystore.password", >> ssl_keystore_password_reloadpred) \ >> .option("subscribe", topic_reloadpred) \ >> .option("kafka.group.id", consumerGroupId_reloadpred) \ >> .load() >> >> # events is passed to a function, and processing is done if new events >> are generated >> >> ``` >> >> What is the best way to achieve this ? The current code is reading the >> entire data in the kafka topic, i need it to read only the new data. >> >> Additional Details in stackoverflow : >> >> >> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic >> >> >> tia! >> > -- > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >