There are a number of flaws here.

You have defined your trigger based processing time within Spark
Structured Streaming (SSS) as below

trigger(processingTime='4 minutes')


SSS will trigger every 4 minutes, in other words within a micro-batch of 4
minutes. This is what is known as micro-batch interval. The way this works
is that SSS is actioned every 4 minutes. If the previous batch finished in
1 minute, then SSS will wait for (4-1 = 3) minutes before processing again.
If the previous processing took 5 minutes to finish, then we have a
potential backlog and SSS will process immediately after the previous job
finishes (in other words it kicks off the next micro-batch).


Now the function foreachBatch(convertToDictForEachBatch) performs custom
write logic on each micro-batch through convertToDictForEachBatch function.
foreachBatch(convertToDictForEachBatch) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch.
However, in calling the function convertToDictForEachBatch as below

def convertToDictForEachBatch(df, batchId):

    # checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

There is no use case for df -> DataFrame in the code? So what are you
checking here? What happens if df is empty?

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> How do you check if new data is in the topic and what happens if not?
>
> On Sat, 12 Mar 2022 at 00:40, karan alang <karan.al...@gmail.com> wrote:
>
>> Hello All,
>>
>> I have a structured Streaming program, which reads data from Kafka topic,
>> and does some processing, and finally puts data into target Kafka Topic.
>>
>> Note : the processing is donee topic in function -
>> convertToDictForEachBatch(), which is called using -
>> foreachBatch(convertToDictForEachBatcha is in th)
>>
>> As part of the processing, it reads another Kafka Topic (events_topic),
>> and if there is New record(s) after the last read, it does some additional
>> processing - reloads data from BigQuery table, and persists it.
>>
>> Here is the code :
>>
>> ```
>>
>> df_stream = spark.readStream.format('kafka') \
>>         .option("kafka.security.protocol", "SSL") \
>>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>         .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>>         .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>>         .option("kafka.bootstrap.servers",kafkaBrokers)\
>>         .option("subscribe", topic) \
>>         .option("kafka.group.id", consumerGroupId)\
>>         .option("startingOffsets", "latest") \
>>         .option("failOnDataLoss", "false") \
>>         .option("maxOffsetsPerTrigger", 10000) \
>>         .load()
>>
>>
>>     print(" df_stream -> ", df_stream)
>>     query = df_stream.selectExpr("CAST(value AS STRING)", 
>> "timestamp").writeStream \
>>         .outputMode("append") \
>>         .trigger(processingTime='4 minutes') \
>>         .option("numRows",10000)\
>>         .option("truncate", "false") \
>>         .option("checkpointLocation", checkpoint) \
>>         .foreachBatch(convertToDictForEachBatch) \
>>         .start()
>>
>>     query.awaitTermination()
>>
>> ```
>>
>> # called from - foreachbatch
>> def convertToDictForEachBatch(df, batchId):
>>
>>     # checks for event in topic - events_topic and further processing takes 
>> place if there is new data in the topic
>>     events = spark.read.format('kafka') \
>>         .option("kafka.bootstrap.servers", kafkaBrokers) \
>>         .option("kafka.security.protocol", "SSL") \
>>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>         .option("kafka.ssl.keystore.location", 
>> ssl_keystore_location_reloadpred) \
>>         .option("kafka.ssl.keystore.password", 
>> ssl_keystore_password_reloadpred) \
>>         .option("subscribe", topic_reloadpred) \
>>         .option("kafka.group.id", consumerGroupId_reloadpred) \
>>         .load()
>>
>>     # events is passed to a function, and processing is done if new events 
>> are generated
>>
>> ```
>>
>> What is the best way to achieve this ? The current code is reading the
>> entire data in the kafka topic, i need it to read only the new data.
>>
>> Additional Details in stackoverflow :
>>
>>
>> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>>
>>
>> tia!
>>
> --
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to