Hello Users,
I am using Spark 3.0.1 Structuring streaming with Pyspark.
My use case::
I get so many records in kafka(essentially some metadata with the location
of actual data). I have to take that metadata from kafka and apply some
processing.
Processing includes : Reading the actual data location from metadata and
fetching the actual data and applying some operation on actual data.
What I have tried::
def process_events(event):
fetch_actual_data()
#many more steps
def fetch_actual_data():
#applying operation on actual data
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_URL) \
.option("subscribe", KAFKA_TOPICS) \
.option("startingOffsets",
START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
query = df.writeStream.foreach(process_events).option("checkpointLocation",
"/opt/checkpoint").trigger(processingTime="30 seconds").start()
My Queries:
1. Will this foreach run across different executor processes? Generally in
spark , foreach means it runs on a single executor.
2. I receive too many records in kafka and above code will run multiple
times for each single message. If I change it for foreachbatch, will it
optimize it?
Kind Regards,
Sachit Murarka