I am getting the following error when I kill the spark driver and restart the job:
15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have pasted in a sample app below to mimic the problem and put all classes into one file, it is also attached here SampleJob.scala <http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala> To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) // Class not found bucket case class Bucket(val bucketType: String, val roundDown: (Long) => Long, val columnFamily: String, val size: Long, val maxIntervals: Int) // used for updateStateByKey case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length < 8) { System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group> <topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>") System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay, ColumnFamilies.Visits_1440, Milliseconds.Day, 8640) val activeJourneys = new ActiveJourney(Array(fifteen,hour,day)) val sparkConf = new SparkConf() .setAppName(Name) .set(SparkConfig.SparkMesosCoarse, Constants.True) .set(SparkConfig.SparkCleanerTtl, "300") .set(SparkConfig.SparkDriverMemory, "128m") .set(SparkConfig.SparkExecutorMemory, "128m") .set(SparkConfig.SparkDriverMaxResultSize, "128m") .set(SparkConfig.SparkDefaultParallelism, "3") .set(SparkConfig.SparkCoresMax, "2") .set(SparkConfig.SparkStreamingUnpersist, Constants.True) .set(SparkConfig.SparkStreamingBlockInterval, "5000") .set(SparkConfig.SparkCassandraConnectionHost, cassandra) val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds .toInt)) scc.checkpoint(checkpointDirectory) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap). map(_._2) // convert from Json to Scala classes val journeys = lines.map(line => { // lossless is used to extract milliseconds implicit val formats = DefaultFormats.lossless // 'parse' function comes from the json4s lib parse(line).extract[Journey] }) activeJourneys.process(journeys) scc } val scc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) sys.ShutdownHookThread { System.err.println(s"Gracefully stopping $Name Spark Streaming Application") scc.stop(stopSparkContext = true, stopGracefully = true) System.err.println(s"$Name streaming job stopped") } scc.start() scc.awaitTermination() } } class ActiveJourney(val buckets: Array[Bucket]) extends Serializable { val BIT_SIZE = 12 val hll = new HyperLogLogMonoid(BIT_SIZE) def process(journeys: DStream[Journey]) { action(transform(journeys.filter(this.filter))) } def filter(journey: Journey): Boolean = { val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new JourneySet( null)))).$set.getOrElse(new JourneySet(null)).awayAt val isSettingAwayAt = awayAt != null isSettingAwayAt } def transform(stream: DStream[Journey]): DStream[ActiveState] = { val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) => { x.hyperLogLog += y.hyperLogLog x }).updateStateByKey(updateFunction).map(_._2) bucketCounts } def action(stream: DStream[ActiveState]) = { buckets.foreach(bucket => { val filteredActiveJourneys = stream.filter(_.bucketType == bucket. bucketType) val cassandraFormat = filteredActiveJourneys.map(total => ("website", new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt)); cassandraFormat.saveToCassandra("reporting", bucket.columnFamily, SomeColumns("key", "ts", "key_type", "total")) }) } def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState]) = { val currentState = state.getOrElse(journeys(0)) journeys.foreach(x => { currentState.hyperLogLog += x.hyperLogLog }) currentState.ttl -= 1 if (currentState.ttl < 0) { None } else { Some(currentState) } } def bucketize(journey: Journey): MutableList[(String, ActiveState)] = { var timeBuckets = MutableList.empty[(String, ActiveState)] buckets.foreach(bucket => { var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt. getTime()) var journeyState = (bucket.bucketType + timeBucket, ActiveState(bucket .bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket. maxIntervals)) timeBuckets += journeyState }) timeBuckets } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-throwing-ClassNotFound-exception-when-recovering-from-checkpointing-tp21582.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org