Working on a POC to determine if Spark is a good fit for our use case. I have a stream of readings from IoT devices that are being published to a Kafka topic. I need to consume those readings along with recently consumed readings for the same device to determine the quality of the current reading. For example, is the current reading changing too fast (ie. did it change more than three times the standard deviation of the last 20 readings)? Another example might be, for the last 2 hours has there been at least a minimal amount of variation in the reading, or is it "stuck"?
These quality checks are pretty specific and don't fit well into the typical SQL paradigm as far as I can tell so I am planning to need some amount of UDFs and/or UDAFs to accomplish this. The part I am getting stuck on though is getting ahold of the previous related readings so I have all the information I need to perform the quality evaluation. I am basically attempting to enrich a stream of readings with quality analysis. That analysis is dependent on previous readings. *Does this seem like a good fit for Spark or should I be looking at alternatives?* *Does it seem possible to do this with pyspark or should I be looking at Java/Scala to get access to mapGroupsWithState/flatMapGroupsWithState?* Thank you so much for your help! --------- /If you want to read more....../ *Reference:* schema = StructType() \ .add("id", StringType()) \ .add("readingValue", IntegerType()) \ .add("readingTime", TimestampType()) \ .add("uid", StringType()) dfRaw = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", bootstrapServers) \ .option(subscribeType, topics) \ .load() df = dfRaw \ .select(from_json(col("value").cast("string"), schema).alias("parsedValue")) \ .withColumn("eventTime", col("parsedValue.readingTime")) dfWithWatermark = df.withWatermark("eventTime", "10 seconds").alias("current") *A few things I have tried:* Join the dataframe to itself using a fixed interval dfSelfJoined = dfWithWatermark.alias("leftDf").join( dfWithWatermark.alias("rightDf"), expr(""" leftDf.parsedValue.id = rightDf.parsedValue.id AND leftDf.parsedValue.uid != rightDf.parsedValue.uid AND leftDf.eventTime >= rightDf.eventTime AND rightDf.eventTime >= leftDf.eventTime - interval 10 minutes """) ) This does what you might think. It outputs a row for each match between the current reading and any readings within the last 10 minutes of that reading. The only problem is I don't want a row for each match I want them aggregated... dfSelfJoined \ .groupBy("leftDf.parsedValue.uid", "leftDf.eventTime") \ .agg(collect_list(col("rightDf.parsedValue"))) \ .writeStream \ .outputMode('append') \ .format('console') \ .option('truncate', 'false') \ .start() This aggregates them but there is no output until, I assume, we have crossed the watermark. I suppose this is expected. Is there any harm in setting the watermark to be 0 second so it is immediately appended? Do I have any guarantee that there will be no late data if all the data is published to Kafka with the same key, in order? Join the dataframe to an aggregated stream of recent readings dfRecent = dfWithWatermark \ .groupBy(window("eventTime", "10 minutes", "5 minutes"), "parsedValue.id") \ .agg(max(col("parsedValue.readingTime")).alias("maxTime"), collect_list(col("parsedValue")).alias("values") ).alias("recent") \ .withColumn("secondsFromStart", col("maxTime").cast("Long") - col("window.start").cast("Long")) \ .withColumn("secondsToEnd", col("window.end").cast("Long") - col("maxTime").cast("Long")) \ .filter(col("secondsFromStart") >= col("secondsToEnd")) dfWithWatermark \ .join(dfRecent, expr("current.parsedValue.id == recent.id")) \ .writeStream \ .outputMode('append') \ .format('console') \ .option('truncate', 'false') \ .start() This makes sense in my head :-) but nothing is ever output. Reading the docs some more I concluded that joining a streaming aggregation to another stream is not supported. I have been investigating this for a few days now and would greatly appreciate another opinion. Thanks for reading! -Nathan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org