I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
application, the checkpoint directory was set as follows :

    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc =  new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")


After submitting my application into the cluster, I could see the correct
counting results on the console, but the running application kept
complaining the following:

14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-00000-attempt-171
(Permission denied)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.<init>(FileOutputStream.java:194)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:206)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:202)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
  at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:384)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
  at
org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
  at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:396)
  at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
  at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
  at java.lang.Thread.run(Thread.java:662)


On the node where I submitted the applicaition, the checkpoint directory(
/usr/games/SparkStreaming/checkpoint) was created and some files was
created there, but there existed no such directory on other nodes of the
Spark cluster.

I guess that was because processes on other nodes of the cluster didn't
have appropriate privileges to create the checkpoint directory. So I
created that directory on each node manually and changed its mode to 777,
which means any user can write to that directory. But the SparkStreaming
application still kept throwing that exception.

So what is the real reason?  Thanks.

Reply via email to