I have a simple spark streaming application which reads the data from the
rabbitMQ
and does some aggregation on window interval of 1 min and 1 hour for
batch interval of 30s.
I have a three node setup. And to enable checkpoint,
I have mounted the same directory using sshfs to all worker node for
creating checkpoint.
When I run the spark streaming App for the first time it works fine .
I could see the results being printed on console and some checkpoints
happening in the network directory.
But when I run the job for the second time , it fails with the following
exception
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
And the exception is repeated.
I am not pumping huge data to the rabbitMQ. When I run the job for the
first time I am dumping only < 100 events .
And when I run for the second time, I have stopped the messages being sent
to RabbitMQ from the producer process.
I have tried setting "spark.streaming.unpersist","true" .
And My Set up has 3 node each having one core allocated for spark and
executor memory per node is 512MB.
Please help me in solving this issue.