I found the answer. Here the file system of the checkpoint should be a fault-tolerant file system like HDFS, so we should set it to a HDFS path. It is not a local file system path.
2014-09-03 10:28 GMT+08:00 Tao Xiao <xiaotao.cs....@gmail.com>: > I tried to run KafkaWordCount in a Spark standalone cluster. In this > application, the checkpoint directory was set as follows : > > val sparkConf = new SparkConf().setAppName("KafkaWordCount") > val ssc = new StreamingContext(sparkConf, Seconds(2)) > ssc.checkpoint("checkpoint") > > > After submitting my application into the cluster, I could see the correct > counting results on the console, but the running application kept > complaining the following: > > 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to > java.io.FileNotFoundException > java.io.FileNotFoundException: > /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171 > (Permission denied) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:194) > at > org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206) > at > org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202) > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) > at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) > at > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) > at > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) > at > org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) > at > org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) > at > org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:662) > > > On the node where I submitted the applicaition, the checkpoint directory( > /usr/games/SparkStreaming/checkpoint) was created and some files was > created there, but there existed no such directory on other nodes of the > Spark cluster. > > I guess that was because processes on other nodes of the cluster didn't > have appropriate privileges to create the checkpoint directory. So I > created that directory on each node manually and changed its mode to 777, > which means any user can write to that directory. But the SparkStreaming > application still kept throwing that exception. > > So what is the real reason? Thanks. > > > >