-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Thanks everyone, that was the problem. the "create new streaming context" function was supposed to setup the stream processing as well as the checkpoint directory. I had missed the whole process of checkpoint setup. With that done, everything works as expected.
For the benefit of others, my final version of the code that works looks like this and it works correctly: object RawLogProcessor extends Logging { import TacomaHelper._ val checkpointDir = "/tmp/checkpointDir_tacoma" var ssc: Option[StreamingContext] = None def createSparkConf(config: Config): SparkConf = { val sparkConf = new SparkConf() config.entrySet.asScala .map(kv => kv.getKey -> kv.getValue) .foreach { case (k, v) => sparkConf.set(s"spark.$k", unquote(v.render())) } sparkConf.registerKryoClasses(Array(classOf[VideoView], classOf[RawLog], classOf[VideoEngagement], classOf[VideoImpression])) sparkConf } // a function that returns a function of type: `() => StreamingContext ` def createContext(sparkConfig: Config, kafkaConf: Config)(f: StreamingContext => StreamingContext) = () => { val batchDurationSecs = sparkConfig.getDuration("streaming.batch_duration", TimeUnit.SECONDS) val sparkConf = createSparkConf(sparkConfig) // calculate sparkContext and streamingContext val streamingContext = new StreamingContext(sparkConf, Durations.seconds(batchDurationSecs)) streamingContext.checkpoint(checkpointDir) // apply the streaming context function to the function f(streamingContext) } def createNewContext(sparkConf: Config, kafkaConf: Config, f: StreamingContext => StreamingContext) = { logInfo("Create new Spark streamingContext with provided pipeline function") StreamingContext.getOrCreate( checkpointPath = checkpointDir, creatingFunc = createContext(sparkConf, kafkaConf)(f), createOnError = true) } def apply(sparkConfig: Config, kafkaConf: Config): StreamingContext = { rawlogTopic = kafkaConf.getString("rawlog.topic") kafkaParams = kafkaConf.entrySet.asScala .map(kv => kv.getKey -> unquote(kv.getValue.render())) .toMap if (ssc.isEmpty) { ssc = Some(createNewContext(sparkConfig, kafkaConf, setupPipeline) ) } ssc.get } var rawlogTopic: String = "qa-rawlog" var kafkaParams: Map[String, String] = Map() def setupPipeline(streamingContext: StreamingContext): StreamingContext = { logInfo("Creating new kafka rawlog stream") // TODO: extract this and pass it around somehow val rawlogDStream = KafkaUtils.createDirectStream[String, Object, StringDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, Set(rawlogTopic)) logInfo("adding step to parse kafka stream into RawLog types (Normalizer)") val eventStream = rawlogDStream .map({ case (key, rawlogVal) => val record = rawlogVal.asInstanceOf[GenericData.Record] val rlog = RawLog.newBuilder() .setId(record.get("id").asInstanceOf[String]) .setAccount(record.get("account").asInstanceOf[String]) .setEvent(record.get("event").asInstanceOf[String]) .setTimestamp(record.get("timestamp").asInstanceOf[Long]) .setUserAgent(record.get("user_agent").asInstanceOf[String]) .setParams(record.get("params").asInstanceOf[java.util.Map[String, String]]) .build() val norm = Normalizer(rlog) (key, rlog.getEvent, norm) }) logInfo("Adding step to filter out VideoView only events and cache them") val videoViewStream = eventStream .filter(_._2 == "video_view") .filter(_._3.isDefined) .map((z) => (z._1, z._3.get)) .map((z) => (z._1, z._2.asInstanceOf[VideoView])) .cache() // repartition by account logInfo("repartition videoView by account and calculate stats") videoViewStream.map((v) => (v._2.getAccount, 1)) .filter(_._1 != null) .window(Durations.seconds(20)) .reduceByKey(_ + _) .print() // repartition by (deviceType, DeviceOS) logInfo("repartition videoView by (DeviceType, DeviceOS) and calculate stats") videoViewStream.map((v) => ((v._2.getDeviceType, v._2.getDeviceOs), 1)) .reduceByKeyAndWindow(_ + _, Durations.seconds(10)) .print() streamingContext } } - - Ankur On 13/05/2015 23:52, NB wrote: > The data pipeline (DAG) should not be added to the StreamingContext > in the case of a recovery scenario. The pipeline metadata is > recovered from the checkpoint folder. That is one thing you will > need to fix in your code. Also, I don't think the > ssc.checkpoint(folder) call should be made in case of the > recovery. > > The idiom to follow is to set up the DAG in the creatingFunc and > not outside of it. This will ensure that if a new context is being > created i.e. checkpoint folder does not exist, the DAG will get > added to it and then checkpointed. Once a recovery happens, this > function is not invoked but everything is recreated from the > checkpointed data. > > Hope this helps, NB > > > > -- View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/kafka-Spark-Stream ing-with-checkPointing-fails-to-restart-tp22864p22878.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > -----BEGIN PGP SIGNATURE----- iQEcBAEBAgAGBQJVVGKmAAoJEOSJAMhvLp3LffcIAMuT9akiKqDQyUdg9leRGiWR nCqc0+zv2EalEReevf8BC826uiXPi6Dcw0i6mvjy8m+je0FNwhkE0btyq/xyubHw gGY3VG/zAjQwmKCeGxmrYscvtESh+kB7nEr2ajB5a+bM6FBJpnlCY/NlZp9NTfcQ t1o//R0B4QjuEXHXRRvjptauyxqIhqU6s6JrU1ESxhF3Tcp6E7Q0upwztdN4Y1S9 jdpJbqgipZxMVLU2D2UcgdnIQwjgFMQSxRgHStVYI/+6eNUM7EcpAzBjd2HuFj0u H2SrWVW/1uraMVrci5MRD+3BK3Ld3L9/sXu3LGi8xkDy/62zajLd83JPR4JZi2I= =2fcq -----END PGP SIGNATURE----- --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org