Hello All -
I'm using Apache Spark Structured Streaming to read data from Kafka topic,
and do some processing. I'm using watermark to account for late-coming
records and the code works fine.
Here is the working(sample) code:
```
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import
from_json, col, to_timestamp, window, max,exprfrom pyspark.sql.types
import StructType, StructField, StringType, DoubleType,IntegerType
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Sliding Window Demo") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 1) \
.getOrCreate()
stock_schema = StructType([
StructField("LogType", StringType()),
StructField("CreatedTime", StringType()),
StructField("Type", StringType()),
StructField("Amount", IntegerType()),
StructField("BrokerCode", StringType())
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "trades") \
.option("startingOffsets", "earliest") \
.load()
value_df = kafka_df.select(from_json(col("value").cast("string"),
stock_schema).alias("value"))
trade_df = value_df.select("value.*") \
.withColumn("CreatedTime", to_timestamp(col("CreatedTime"),
"yyyy-MM-dd HH:mm:ss")) \
.withColumn("Buy", expr("case when Type == 'BUY' then Amount
else 0 end")) \
.withColumn("Sell", expr("case when Type == 'SELL' then Amount
else 0 end"))
window_agg_df = trade_df \
.withWatermark("CreatedTime", "10 minute") \
.groupBy(window(col("CreatedTime"), "10 minute")) \
.agg({"Buy":"sum",
"Sell":"sum"}).withColumnRenamed("sum(Buy)",
"TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell")
output_df = window_agg_df.select("window.start", "window.end",
"TotalBuy", "TotalSell")
window_query = output_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir-mar28") \
.trigger(processingTime="30 second") \
.start()
window_query.awaitTermination()
```
Currently, I'm processing a single LogType, the requirement is to process
multiple LogTypes in the same flow .. LogTypes will be config driven (not
hard-coded). Objective is to have generic code that can process all
logTypes.
As an example, for LogType X, I will need to get groupby columns col1, col2
and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy
columns will remain the same but the sum will be on column col3 instead.
w/o the watermark, I can look at the LogType and do the processing in batch
mode (using foreachBatch). However, with watermark - i'm unable to figure
out how to process based on LogType.
Any inputs on this ?
Here is the stackoverflow for this
https://stackoverflow.com/questions/76547349/apache-spark-with-watermark-processing-data-different-logtypes-in-same-kafka-t
tia!