Hello folks, I'm on the newest spark 1.6.0-SNAPSHOT Spark Streaming with the new trackStateByKey API. I'm trying to do something fairly simple that requires knowing state across minibatches, so I am trying to see if it can be done.
I basically have two types of data to do a join, left-side and right-side, that will come in batches. I want to accumulate the left-side data and the right-side data across mini-batches and on each mini batch I want to JOIN left-side and right-side for ALL the history I have so far (up to the point of my timeout). However, when I tried trackStateByKey, it works if I dump all my data at once (an unrealistic case), so all the left-side data and right-side data are in the same mini-batch, thus everything is available for a join in that single mini batch. However, when I let the data for left-side and right-side come in chunks over different mini batches...only the left data and right data in THAT mini-batch are actually "available" to me for that JOIN operation! The history is NOT being used in the join because those "keys" were not touched by the input chunk of data for the mini-batch. Maybe I'm doing something wrong? I thought that trackStateByKey would let me accumulate my input data (subject to a timeout) across separate mini-batches and let me do my big join. Here's sample code of what I'm doing. I'm keeping state in stateLEFT and stateRIGHT, the join key is a tuple of (JoinKey1, JoinKey2). The strange part is that my history of state is NOT available when I do the join in the code. I am NOT using timeouts right now. val trackStateFn = (batchTime: Time, stringJson: RawJSON, keyWithParsedJson: Option[((JoinKey1, JoinKey2), JValue)], state: State[((JoinKey1, JoinKey2), JValue)]) => { if (state.isTimingOut) None else { keyWithParsedJson.foreach(parsed => state.update(parsed)) keyWithParsedJson // Just return the data ready for joining } } val sparkConf = new SparkConf().setAppName("Denormalizer") val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint("/tmp/denormStateful/") val rawLEFT: DStream[String] = ssc.textFileStream(dirLEFT).window(Seconds(30)) val rawRIGHT: DStream[String] = ssc.textFileStream(dirRIGHT).window(Seconds(30)) implicit val formats = DefaultFormats //++ JodaTimeSerializers.all val jsonLEFT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] = rawLEFT.map(transformParseJoinKeys) val jsonRIGHT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] = rawRIGHT.map(transformParseJoinKeys) val stateLEFT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] = jsonLEFT.trackStateByKey(StateSpec.function(trackStateFn)) // TODO add timeout, maybe initial state? val stateRIGHT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] = jsonRIGHT.trackStateByKey(StateSpec.function(trackStateFn)) val jsonJoinedBoth: DStream[((JoinKey1, JoinKey2), (JValue, JValue))] = stateLEFT.join(stateRIGHT) val jsonPretty = jsonJoinedBoth.map { tuple => mergeLEFTandRIGHT(tuple._2._1, tuple._2._1) } jsonPretty.foreachRDD{rdd => if (rdd.count() > 0){ val timestamp = (System.currentTimeMillis() % 100000000).toString val fileName = s"denorm-stateful-$timestamp.out" rdd.saveAsTextFile(fileName) println(s"Output ${rdd.count} lines of data to $fileName") } } ssc.start() ssc.awaitTermination()