I have a scenario in which we need to calculate 'charges' for a stream of 
events which has the following details:



1. Event contains eventTime, facet, units

2. Free quantity per facet that needs to offset the earliest events based on 
the eventTime

3. Prices are also specified per facet

4. Events that arrive in a single minute can be considered equivalent (for 
reduced state maintenance) and all of them need to have free units 
proportionally distributed



I was hoping to make it the work in the following manner using spark structured 
streaming



1. Aggregate events at a minute level per facet using the window function per 
facet

2. Join with the price and free quantity

3. Group by facet

4. flatMapGroup by facet to then sort the aggregation by window start time, 
apply the results



what I am noticing is that the output of #4 is just the aggregation for which 
new events came in and not all the aggregation since the watermark.

**Qn: How can I fix this code to get all aggregation since the watermark or 
from a previous wh?**



Could someone help? A sample code is below…

Thanks



---------------

package test



import java.sql.Timestamp

import java.util.UUID

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, 
OutputMode, Trigger}

import org.apache.spark.streaming.{Seconds, StreamingContext}



case class UsageEvent(Id: String, facetId: String, Units: Double, 
timeStamp:Timestamp)

case class FacetPricePoints(facetId: String, Price: Double, FreeUnits: Double)

case class UsageBlock(facetId: String, start:Timestamp, Units: Double)

case class UsageBlockWithPrice(facetId: String, start:Timestamp, Units: Double, 
Price: Double, FreeUnits: Double)

case class UsageBlockWithCharge(facetId: String, start:Timestamp, Units: 
Double, Price: Double, FreeUnits: Double, ChargedUnits: Double, Charge: Double)



object TestProcessing {

  def getUsageEventStream(ts: Timestamp, units: String) : UsageEvent = 
{UsageEvent(UUID.randomUUID().toString, "F1", units.toInt % 20, ts)}

  implicit def ordered: Ordering[Timestamp] = new Ordering[Timestamp] {def 
compare(x: Timestamp, y: Timestamp): Int = x compareTo y}

  def ChargeUsageBlock(Key: String, Value: Iterator[UsageBlockWithPrice]) : 
Iterator[UsageBlockWithCharge] =

  {

    val usageBlocks = Value.toList.sortBy(ub => ub.start)

    var freeUnits = 0.0

    var freeUnitsSet = false

    var newUe = for (ue <- usageBlocks)

      yield {

        freeUnits = if (!freeUnitsSet) ue.FreeUnits else freeUnits

        freeUnitsSet = true

        val freeUnitsInBlock = if (freeUnits > ue.Units) ue.Units else freeUnits

        val chargedUnits = ue.Units - freeUnitsInBlock

        freeUnits -= freeUnitsInBlock // todo: need to specify precision and 
rounding

        UsageBlockWithCharge(ue.facetId, ue.start, ue.Units, ue.Price, 
freeUnitsInBlock, chargedUnits, chargedUnits * ue.Price)

      }

    newUe.iterator

  }

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("Test").getOrCreate()

    val stream = spark.readStream.format("rate").option("rowsPerSecond", 
1).load()

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val prices = ssc.sparkContext.parallelize(List( FacetPricePoints("F1", 
30.0, 100.0))).toDF()



    val getUsageEventStreamUDF = udf((ts: Timestamp, units: String) => 
getUsageEventStream(ts, units)) // .where($"Units" < 2).

    val usageEventsRaw = stream.withColumn("Usage", 
getUsageEventStreamUDF(stream("timestamp"), 
stream("value"))).select("Usage.*").as[UsageEvent].dropDuplicates("Id").withWatermark("timeStamp",
 "1 hour")

    val aggUsage = usageEventsRaw.groupBy($"facetId", window($"timeStamp", "1 
minute")).agg(sum($"Units") as "Units").selectExpr("facetId", "window.start", 
"Units").as[UsageBlock]

    val fifoRate = (Key: String, Value: Iterator[UsageBlockWithPrice]) => { 
ChargeUsageBlock(Key, Value) }

    val aggUsageCharge = aggUsage.joinWith(prices, prices.col("facetId") === 
usageEventsRaw.col("facetId")).select("_1.*", "_2.Price", 
"_2.FreeUnits").as[UsageBlockWithPrice].groupByKey(x => 
x.facetId).flatMapGroups(fifoRate).withWatermark("start", "1 hour")

    val fin = aggUsageCharge.writeStream.trigger((Trigger.ProcessingTime("10 
seconds"))).outputMode(OutputMode.Update).format("console").start()

                // this applies freeUnits for every minute instead of just 
applying it once

    fin.awaitTermination()

    ssc.start()

    ssc.awaitTermination()

  }

}





Reply via email to