I am getting the following error when I kill the spark driver and restart
the job:

15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
file
hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
file
hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
java.io.IOException: java.lang.ClassNotFoundException:
com.example.spark.streaming.reporting.live.jobs.Bucket
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
at
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)



Spark version is 1.2.0

The streaming job is executing every 10 seconds with the following steps:

  1. Consuming JSON from a kafka topic called journeys and converting to
  case classes
  2. Filters resulting journeys stream based on a time attribute being set
  3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
  e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000,
hyperLogLog(journey
  id), 360) )
  4. ReduceByKey adding hyperloglogs
  5. UpdateStateByKey to add to previous states hyperloglog
  6. Then output results to Cassandra


I have pasted in a sample app below to mimic the problem and put all classes
into one file, it is also attached here  SampleJob.scala
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala>
  

To get around the issue for the moment, I have removed the Bucket class and
stopped passing in a bucket array to the ActiveJourney class.
And instead I hard code all the time buckets I need in the ActiveJourney
class; this approach works and recovers from checkpointing but is not
extensible.

Can the Spark gurus explain why I get that ClassNotFound exception?

Need any more information, please let me know.

Much thanks,
Conor



package com.example.spark.streaming.reporting.live.jobs
import java.util.Date
import scala.Array.canBuildFrom
import scala.collection.mutable.MutableList
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jvalue2extractable
import org.json4s.string2JsonInput
import com.example.spark.streaming.utils.MilliSecondUtils
import com.example.spark.streaming.utils.constants.ColumnFamilies
import com.example.spark.streaming.utils.constants.Constants
import com.example.spark.streaming.utils.constants.Milliseconds
import com.example.spark.streaming.utils.constants.SparkConfig
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.streaming.toDStreamFunctions
import com.datastax.spark.connector.toNamedColumnRef
import com.twitter.algebird.HLL
import com.twitter.algebird.HyperLogLogMonoid
// Json parsing classes
case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
case class JourneyDetails(_id: String)
case class JourneyCommand($set: Option[JourneySet])
case class JourneySet(awayAt: Date)
// Class not found bucket
case class Bucket(val bucketType: String, val roundDown: (Long) => Long, val
columnFamily: String, val size: Long, val maxIntervals: Int)

// used for updateStateByKey
case class ActiveState(var bucketType: String, var time: Long, var
hyperLogLog: HLL, var ttl: Int)

object SampleJob {
 private final val Name = this.getClass().getSimpleName()
 def main(args: Array[String]) {
   if (args.length < 8) {
     System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group>
<topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>")
     System.exit(1)
   }
   System.out.print(args)
   val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
cassandra, intervalSeconds) = args
   val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass().
getPackage().getImplementationVersion()
   def functionToCreateContext(): StreamingContext = {

     // how many buckets
     val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils.
roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
FifteenMinutes, 90)
     val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour,
ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
     val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay,
ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
     val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
     val sparkConf = new SparkConf()
       .setAppName(Name)
       .set(SparkConfig.SparkMesosCoarse, Constants.True)
       .set(SparkConfig.SparkCleanerTtl, "300")
       .set(SparkConfig.SparkDriverMemory, "128m")
       .set(SparkConfig.SparkExecutorMemory, "128m")
       .set(SparkConfig.SparkDriverMaxResultSize, "128m")
       .set(SparkConfig.SparkDefaultParallelism, "3")
       .set(SparkConfig.SparkCoresMax, "2")
       .set(SparkConfig.SparkStreamingUnpersist, Constants.True)
       .set(SparkConfig.SparkStreamingBlockInterval, "5000")
       .set(SparkConfig.SparkCassandraConnectionHost, cassandra)
     val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds
.toInt))
     scc.checkpoint(checkpointDirectory)
     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
     val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap).
map(_._2)
     // convert from Json to Scala classes
     val journeys = lines.map(line => {
       // lossless is used to extract milliseconds
       implicit val formats = DefaultFormats.lossless
       // 'parse' function comes from the json4s lib
       parse(line).extract[Journey]
     })
     activeJourneys.process(journeys)
     scc
   }
   val scc = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
   sys.ShutdownHookThread {
     System.err.println(s"Gracefully stopping $Name Spark Streaming
Application")
     scc.stop(stopSparkContext = true, stopGracefully = true)
     System.err.println(s"$Name streaming job stopped")
   }
   scc.start()
   scc.awaitTermination()
 }
}
class ActiveJourney(val buckets: Array[Bucket]) extends Serializable {
 val BIT_SIZE = 12
 val hll = new HyperLogLogMonoid(BIT_SIZE)

 def process(journeys: DStream[Journey]) {
   action(transform(journeys.filter(this.filter)))
 }
 def filter(journey: Journey): Boolean = {
   val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new JourneySet(
null)))).$set.getOrElse(new JourneySet(null)).awayAt
   val isSettingAwayAt = awayAt != null
   isSettingAwayAt
 }
 def transform(stream: DStream[Journey]): DStream[ActiveState] = {
   val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) => {
     x.hyperLogLog += y.hyperLogLog
     x
   }).updateStateByKey(updateFunction).map(_._2)
   bucketCounts
 }
 def action(stream: DStream[ActiveState]) = {
   buckets.foreach(bucket => {
     val filteredActiveJourneys = stream.filter(_.bucketType == bucket.
bucketType)
     val cassandraFormat = filteredActiveJourneys.map(total => ("website",
new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt));
     cassandraFormat.saveToCassandra("reporting", bucket.columnFamily,
SomeColumns("key", "ts", "key_type", "total"))
   })
 }
 def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState])
= {
   val currentState = state.getOrElse(journeys(0))
   journeys.foreach(x => {
     currentState.hyperLogLog += x.hyperLogLog
   })
   currentState.ttl -= 1
   if (currentState.ttl < 0) {
     None
   } else {
     Some(currentState)
   }
 }
 def bucketize(journey: Journey): MutableList[(String, ActiveState)] = {
   var timeBuckets = MutableList.empty[(String, ActiveState)]
   buckets.foreach(bucket => {
     var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt.
getTime())
     var journeyState = (bucket.bucketType + timeBucket, ActiveState(bucket
.bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket.
maxIntervals))
     timeBuckets += journeyState
   })
   timeBuckets
 }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-throwing-ClassNotFound-exception-when-recovering-from-checkpointing-tp21582.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to