I tried to run KafkaWordCount in a Spark standalone cluster. In this application, the checkpoint directory was set as follows :
val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") After submitting my application into the cluster, I could see the correct counting results on the console, but the running application kept complaining the following: 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171 (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.<init>(FileOutputStream.java:194) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) at org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) On the node where I submitted the applicaition, the checkpoint directory( /usr/games/SparkStreaming/checkpoint) was created and some files was created there, but there existed no such directory on other nodes of the Spark cluster. I guess that was because processes on other nodes of the cluster didn't have appropriate privileges to create the checkpoint directory. So I created that directory on each node manually and changed its mode to 777, which means any user can write to that directory. But the SparkStreaming application still kept throwing that exception. So what is the real reason? Thanks.