You can try this
val kafkaReadStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", topicName)
.option("startingOffsets", startingOffsetsMode)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
kafkaReadStream
.writeStream
.foreachBatch((df: DataFrame, batchId: Long) => sendToSink(df, batchId))
.trigger(Trigger.ProcessingTime(s"${triggerProcessingTime} seconds"))
.option("checkpointLocation", checkpoint_path)
.start()
.awaitTermination()
Notice the function sendToSink
The foreachBatch method ensures that the sendToSink function is called for
each micro-batch, regardless of whether the DataFrame contains data or not.
Let us look at that function
import org.apache.spark.sql.functions._
def sendToSink(df: DataFrame, batchId: Long): Unit = {
if (!df.isEmpty) {
println(s"From sendToSink, batchId is $batchId, at
${java.time.LocalDateTime.now()}")
// df.show(100, false)
df.persist()
// Write to BigQuery batch table
// s.writeTableToBQ(df, "append",
config.getString("MDVariables.targetDataset"),
config.getString("MDVariables.targetTable"))
df.unpersist()
// println("wrote to DB")
} else {
println("DataFrame df is empty")
}
}
If the DataFrame is empty, it prints a message indicating that the
DataFrame is empty. You can of course adapt it for your case
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Thu, 21 Mar 2024 at 23:14, Рамик И <[email protected]> wrote:
>
> Hi!
> I want to exucute code inside forEachBatch that will trigger regardless of
> whether there is data in the batch or not.
>
>
> val kafkaReadStream = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", broker)
> .option("subscribe", topicName)
> .option("startingOffsets", startingOffsetsMode)
> .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
> .load()
>
>
> kafkaReadStream
> .writeStream
> .trigger(Trigger.ProcessingTime(s"$triggerProcessingTime seconds"))
> .foreachBatch {
>
> ....
> }
> .start()
> .awaitTermination()
>