I am getting the following error when I kill the spark driver and restart the job:
15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
> file
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
> file
> hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-1423589100000.bk
> java.io.IOException: java.lang.ClassNotFoundException:
> com.example.spark.streaming.reporting.live.jobs.Bucket
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
> at
> org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
Spark version is 1.2.0
The streaming job is executing every 10 seconds with the following steps:
1. Consuming JSON from a kafka topic called journeys and converting to
case classes
2. Filters resulting journeys stream based on a time attribute being set
3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
e.g. ("HOUR1234569000", ActiveState("HOUR", 1234569000, hyperLogLog(journey
id), 360) )
4. ReduceByKey adding hyperloglogs
5. UpdateStateByKey to add to previous states hyperloglog
6. Then output results to Cassandra
I have made a sample app below to mimic the problem and put all classes
into one file, it is also attached to this email.
To get around the issue for the moment, I have removed the Bucket class and
stopped passing in a bucket array to the ActiveJourney class.
And instead I hard code all the time buckets I need in the ActiveJourney
class; this approach works and recovers from checkpointing but is not
extensible.
Can the Spark gurus explain why I get that ClassNotFound exception?
Need any more information, please let me know.
Much thanks,
Conor
package com.example.spark.streaming.reporting.live.jobs
> import java.util.Date
> import scala.Array.canBuildFrom
> import scala.collection.mutable.MutableList
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.Seconds
> import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
> import org.apache.spark.streaming.dstream.DStream
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.json4s.DefaultFormats
> import org.json4s.jackson.JsonMethods.parse
> import org.json4s.jvalue2extractable
> import org.json4s.string2JsonInput
> import com.example.spark.streaming.utils.MilliSecondUtils
> import com.example.spark.streaming.utils.constants.ColumnFamilies
> import com.example.spark.streaming.utils.constants.Constants
> import com.example.spark.streaming.utils.constants.Milliseconds
> import com.example.spark.streaming.utils.constants.SparkConfig
> import com.datastax.spark.connector.SomeColumns
> import com.datastax.spark.connector.streaming.toDStreamFunctions
> import com.datastax.spark.connector.toNamedColumnRef
> import com.twitter.algebird.HLL
> import com.twitter.algebird.HyperLogLogMonoid
// Json parsing classes
> case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
> case class JourneyDetails(_id: String)
> case class JourneyCommand($set: Option[JourneySet])
> case class JourneySet(awayAt: Date)
> case class Bucket(val bucketType: String, val roundDown: (Long) => Long,
> val columnFamily: String, val size: Long, val maxIntervals: Int)
>
case class ActiveState(var bucketType: String, var time: Long, var
hyperLogLog: HLL, var ttl: Int)
> object SampleJob {
> private final val Name = this.getClass().getSimpleName()
> def main(args: Array[String]) {
> if (args.length < 8) {
> System.err.println(s"Usage: $Name <enviroment> <zkQuorum> <group>
> <topics> <numThreads> <hdfsUri> <cassandra> <intervalSeconds>")
> System.exit(1)
> }
> System.out.print(args)
> val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
> cassandra, intervalSeconds) = args
> val checkpointDirectory = hdfsUri + "/reporting/" + Name + getClass().
> getPackage().getImplementationVersion()
> def functionToCreateContext(): StreamingContext = {
>
> // how many buckets
> val fifteen = Bucket("QUARTER_HOUR", MilliSecondUtils.
> roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
> FifteenMinutes, 90)
> val hour = Bucket("HOUR", MilliSecondUtils.roundDownToNearestHour,
> ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
> val day = Bucket("DAY", MilliSecondUtils.roundDownToNearestDay,
> ColumnFamilies.Visits_1440, Milliseconds.Day, 8640)
> val activeJourneys = new ActiveJourney(Array(fifteen,hour,day))
> val sparkConf = new SparkConf()
> .setAppName(Name)
> .set(SparkConfig.SparkMesosCoarse, Constants.True)
> .set(SparkConfig.SparkCleanerTtl, "300")
> .set(SparkConfig.SparkDriverMemory, "128m")
> .set(SparkConfig.SparkExecutorMemory, "128m")
> .set(SparkConfig.SparkDriverMaxResultSize, "128m")
> .set(SparkConfig.SparkDefaultParallelism, "3")
> .set(SparkConfig.SparkCoresMax, "2")
> .set(SparkConfig.SparkStreamingUnpersist, Constants.True)
> .set(SparkConfig.SparkStreamingBlockInterval, "5000")
> .set(SparkConfig.SparkCassandraConnectionHost, cassandra)
> val scc = new StreamingContext(sparkConf, Seconds(intervalSeconds
> .toInt))
> scc.checkpoint(checkpointDirectory)
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val lines = KafkaUtils.createStream(scc, zkQuorum, group, topicMap).
> map(_._2)
> // convert from Json to Scala classes
> val journeys = lines.map(line => {
> // lossless is used to extract milliseconds
> implicit val formats = DefaultFormats.lossless
> // 'parse' function comes from the json4s lib
> parse(line).extract[Journey]
> })
> activeJourneys.process(journeys)
> scc
> }
> val scc = StreamingContext.getOrCreate(checkpointDirectory,
> functionToCreateContext _)
> sys.ShutdownHookThread {
> System.err.println(s"Gracefully stopping $Name Spark Streaming
> Application")
> scc.stop(stopSparkContext = true, stopGracefully = true)
> System.err.println(s"$Name streaming job stopped")
> }
> scc.start()
> scc.awaitTermination()
> }
> }
> class ActiveJourney(val buckets: Array[Bucket]) extends Serializable {
> val BIT_SIZE = 12
> val hll = new HyperLogLogMonoid(BIT_SIZE)
>
> def process(journeys: DStream[Journey]) {
> action(transform(journeys.filter(this.filter)))
> }
> def filter(journey: Journey): Boolean = {
> val awayAt = journey.o.getOrElse(new JourneyCommand(Some(new
> JourneySet(null)))).$set.getOrElse(new JourneySet(null)).awayAt
> val isSettingAwayAt = awayAt != null
> isSettingAwayAt
> }
> def transform(stream: DStream[Journey]): DStream[ActiveState] = {
> val bucketCounts = stream.flatMap(bucketize(_)).reduceByKey((x, y) =>
> {
> x.hyperLogLog += y.hyperLogLog
> x
> }).updateStateByKey(updateFunction).map(_._2)
> bucketCounts
> }
> def action(stream: DStream[ActiveState]) = {
> buckets.foreach(bucket => {
> val filteredActiveJourneys = stream.filter(_.bucketType == bucket.
> bucketType)
> val cassandraFormat = filteredActiveJourneys.map(total => ("website",
> new Date(total.time), "website", total.hyperLogLog.estimatedSize.toInt));
> cassandraFormat.saveToCassandra("reporting", bucket.columnFamily,
> SomeColumns("key", "ts", "key_type", "total"))
> })
> }
> def updateFunction(journeys: Seq[ActiveState], state: Option[ActiveState])
> = {
> val currentState = state.getOrElse(journeys(0))
> journeys.foreach(x => {
> currentState.hyperLogLog += x.hyperLogLog
> })
> currentState.ttl -= 1
> if (currentState.ttl < 0) {
> None
> } else {
> Some(currentState)
> }
> }
> def bucketize(journey: Journey): MutableList[(String, ActiveState)] = {
> var timeBuckets = MutableList.empty[(String, ActiveState)]
> buckets.foreach(bucket => {
> var timeBucket = bucket.roundDown(journey.o.get.$set.get.awayAt.
> getTime())
> var journeyState = (bucket.bucketType + timeBucket, ActiveState(
> bucket.bucketType, timeBucket, hll(journey.o2.get._id.getBytes()), bucket.
> maxIntervals))
> timeBuckets += journeyState
> })
> timeBuckets
> }
> }
>
SampleJob.scala
Description: Binary data
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
