Hi Mich,

The code I sent for the function 'convertToDictForEachBatch' is not the
complete code.
It does use the DF to do a bunch of transformations/operations.

Specific to the problem I sent the email for :
One piece of the code reloads the prediction data from Bigquery based on
the 'event' in topic, the event indicates that the prediction data in
Bigquery is changed.
Here is the code with comments, hope that clarifies.

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

   # Uses the dataframe to do processing of data, that code is Not
added, since it is not relevant to this question

   # Additional processing i.e. reloading of prediction data from Big
query, into DataFrame - based on event in Kafka topic
   # checks for event in topic - topic_reloadpred and further
processing takes place if there is new data in the topic

# requirement : read data from topic - topic_reloadpred, then check if
there are additional rows added, if yes - call method to reload data
from BigQuery

    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

# if event dataframe had new records(from last read time), call method
to reload data from BigQuery

reloadDataFromBigQuery()

```
The requirement is to identify that new rows have been added to the topic -
topic_reloadpred, and then reload data from BigQuery to dataframe, if
required.
(pls note - the data loaded from BigQuery is persisted ie df.persist(), and
changes in-frequently)

One idea is to store the maxOffset read from each batch read from
topic_reloadpred, and when the next batch is read - compare that with
'stored' maxOffset,
to determine if new records have been added to the topic.

What is the best way to fulfill this requirement ?

regds,
Karan Alang







On Sat, Mar 12, 2022 at 12:42 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> There are a number of flaws here.
>
> You have defined your trigger based processing time within Spark
> Structured Streaming (SSS) as below
>
> trigger(processingTime='4 minutes')
>
>
> SSS will trigger every 4 minutes, in other words within a micro-batch of 4
> minutes. This is what is known as micro-batch interval. The way this works
> is that SSS is actioned every 4 minutes. If the previous batch finished in
> 1 minute, then SSS will wait for (4-1 = 3) minutes before processing again.
> If the previous processing took 5 minutes to finish, then we have a
> potential backlog and SSS will process immediately after the previous job
> finishes (in other words it kicks off the next micro-batch).
>
>
> Now the function foreachBatch(convertToDictForEachBatch) performs custom
> write logic on each micro-batch through convertToDictForEachBatch
> function. foreachBatch(convertToDictForEachBatch) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch. However, in calling the function convertToDictForEachBatch as
> below
>
> def convertToDictForEachBatch(df, batchId):
>
>     # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
>     events = spark.read.format('kafka') \
>         .option("kafka.bootstrap.servers", kafkaBrokers) \
>         .option("kafka.security.protocol", "SSL") \
>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>         .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
>         .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
>         .option("subscribe", topic_reloadpred) \
>         .option("kafka.group.id", consumerGroupId_reloadpred) \
>         .load()
>
> There is no use case for df -> DataFrame in the code? So what are you 
> checking here? What happens if df is empty?
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> How do you check if new data is in the topic and what happens if not?
>>
>> On Sat, 12 Mar 2022 at 00:40, karan alang <karan.al...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> I have a structured Streaming program, which reads data from Kafka
>>> topic, and does some processing, and finally puts data into target Kafka
>>> Topic.
>>>
>>> Note : the processing is donee topic in function -
>>> convertToDictForEachBatch(), which is called using -
>>> foreachBatch(convertToDictForEachBatcha is in th)
>>>
>>> As part of the processing, it reads another Kafka Topic (events_topic),
>>> and if there is New record(s) after the last read, it does some additional
>>> processing - reloads data from BigQuery table, and persists it.
>>>
>>> Here is the code :
>>>
>>> ```
>>>
>>> df_stream = spark.readStream.format('kafka') \
>>>         .option("kafka.security.protocol", "SSL") \
>>>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>>         .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>>>         .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>>>         .option("kafka.bootstrap.servers",kafkaBrokers)\
>>>         .option("subscribe", topic) \
>>>         .option("kafka.group.id", consumerGroupId)\
>>>         .option("startingOffsets", "latest") \
>>>         .option("failOnDataLoss", "false") \
>>>         .option("maxOffsetsPerTrigger", 10000) \
>>>         .load()
>>>
>>>
>>>     print(" df_stream -> ", df_stream)
>>>     query = df_stream.selectExpr("CAST(value AS STRING)", 
>>> "timestamp").writeStream \
>>>         .outputMode("append") \
>>>         .trigger(processingTime='4 minutes') \
>>>         .option("numRows",10000)\
>>>         .option("truncate", "false") \
>>>         .option("checkpointLocation", checkpoint) \
>>>         .foreachBatch(convertToDictForEachBatch) \
>>>         .start()
>>>
>>>     query.awaitTermination()
>>>
>>> ```
>>>
>>> # called from - foreachbatch
>>> def convertToDictForEachBatch(df, batchId):
>>>
>>>     # checks for event in topic - events_topic and further processing takes 
>>> place if there is new data in the topic
>>>     events = spark.read.format('kafka') \
>>>         .option("kafka.bootstrap.servers", kafkaBrokers) \
>>>         .option("kafka.security.protocol", "SSL") \
>>>         .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>>>         .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>>>         .option("kafka.ssl.keystore.location", 
>>> ssl_keystore_location_reloadpred) \
>>>         .option("kafka.ssl.keystore.password", 
>>> ssl_keystore_password_reloadpred) \
>>>         .option("subscribe", topic_reloadpred) \
>>>         .option("kafka.group.id", consumerGroupId_reloadpred) \
>>>         .load()
>>>
>>>     # events is passed to a function, and processing is done if new events 
>>> are generated
>>>
>>> ```
>>>
>>> What is the best way to achieve this ? The current code is reading the
>>> entire data in the kafka topic, i need it to read only the new data.
>>>
>>> Additional Details in stackoverflow :
>>>
>>>
>>> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>>>
>>>
>>> tia!
>>>
>> --
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Reply via email to