I found the answer. Here the file system of the checkpoint should be a
fault-tolerant file system like HDFS, so we should set it to a HDFS path.
It is not a local file system path.


2014-09-03 10:28 GMT+08:00 Tao Xiao <xiaotao.cs....@gmail.com>:

> I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
> application, the checkpoint directory was set as follows :
>
>     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
>     val ssc =  new StreamingContext(sparkConf, Seconds(2))
>     ssc.checkpoint("checkpoint")
>
>
> After submitting my application into the cluster, I could see the correct
> counting results on the console, but the running application kept
> complaining the following:
>
> 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171
> (Permission denied)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
>   at
> org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>   at org.apache.spark.scheduler.Task.run(Task.scala:53)
>   at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:396)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>   at java.lang.Thread.run(Thread.java:662)
>
>
> On the node where I submitted the applicaition, the checkpoint directory(
> /usr/games/SparkStreaming/checkpoint) was created and some files was
> created there, but there existed no such directory on other nodes of the
> Spark cluster.
>
> I guess that was because processes on other nodes of the cluster didn't
> have appropriate privileges to create the checkpoint directory. So I
> created that directory on each node manually and changed its mode to 777,
> which means any user can write to that directory. But the SparkStreaming
> application still kept throwing that exception.
>
> So what is the real reason?  Thanks.
>
>
>
>

Reply via email to