Hello folks,

I'm on the newest spark 1.6.0-SNAPSHOT Spark Streaming with the new
trackStateByKey API. I'm trying to do something fairly simple that requires
knowing state across minibatches, so I am trying to see if it can be done.

I basically have two types of data to do a join, left-side and right-side,
that will come in batches. I want to accumulate the left-side data and the
right-side data across mini-batches and on each mini batch I want to JOIN
left-side and right-side for ALL the history I have so far (up to the point
of my timeout). However, when I tried trackStateByKey, it works if I dump
all my data at once (an unrealistic case), so all the left-side data and
right-side data are in the same mini-batch, thus everything is available
for a join in that single mini batch. However, when I let the data for
left-side and right-side come in chunks over different mini batches...only
the left data and right data in THAT mini-batch are actually "available" to
me for that JOIN operation! The history is NOT being used in the join
because those "keys" were not touched by the input chunk of data for the
mini-batch.

Maybe I'm doing something wrong? I thought that trackStateByKey would let
me accumulate my input data (subject to a timeout) across separate
mini-batches and let me do my big join.

Here's sample code of what I'm doing. I'm keeping state in stateLEFT and
stateRIGHT, the join key is a tuple of (JoinKey1, JoinKey2). The  strange
part is that my history of state is NOT available when I do the join in the
code. I am NOT using timeouts right now.


val trackStateFn =
  (batchTime: Time,
   stringJson: RawJSON,
   keyWithParsedJson: Option[((JoinKey1, JoinKey2), JValue)],
   state: State[((JoinKey1, JoinKey2), JValue)]) => {
    if (state.isTimingOut) None
    else {
      keyWithParsedJson.foreach(parsed => state.update(parsed))
      keyWithParsedJson // Just return the data ready for joining
    }

  }

 val sparkConf = new SparkConf().setAppName("Denormalizer")

 val ssc = new StreamingContext(sparkConf, Seconds(10))
 ssc.checkpoint("/tmp/denormStateful/")

 val rawLEFT: DStream[String] =
ssc.textFileStream(dirLEFT).window(Seconds(30))
 val rawRIGHT: DStream[String] =
ssc.textFileStream(dirRIGHT).window(Seconds(30))

 implicit val formats = DefaultFormats //++ JodaTimeSerializers.all

 val jsonLEFT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] =
rawLEFT.map(transformParseJoinKeys)
 val jsonRIGHT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] =
rawRIGHT.map(transformParseJoinKeys)

 val stateLEFT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue),
((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] =
   jsonLEFT.trackStateByKey(StateSpec.function(trackStateFn)) // TODO add
timeout, maybe initial state?
 val stateRIGHT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue),
((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] =
   jsonRIGHT.trackStateByKey(StateSpec.function(trackStateFn))

 val jsonJoinedBoth: DStream[((JoinKey1, JoinKey2), (JValue, JValue))] =
stateLEFT.join(stateRIGHT)

 val jsonPretty = jsonJoinedBoth.map { tuple =>
   mergeLEFTandRIGHT(tuple._2._1, tuple._2._1)
 }

 jsonPretty.foreachRDD{rdd =>
   if (rdd.count() > 0){
     val timestamp  = (System.currentTimeMillis() % 100000000).toString
     val fileName = s"denorm-stateful-$timestamp.out"
     rdd.saveAsTextFile(fileName)
     println(s"Output ${rdd.count} lines of data to $fileName")
   }
 }

 ssc.start()
 ssc.awaitTermination()

Reply via email to