Hi, all

I write a spark program on yarn. When I use small size input file, my
program can run well. But my job will failed if input size is more than 40G.

the error log:
java.io.FileNotFoundException (java.io.FileNotFoundException:
/home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
(Too many open files))
java.io.FileOutputStream.openAppend(Native Method)
java.io.FileOutputStream.<init>(FileOutputStream.java:192)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
org.apache.spark.scheduler.Task.run(Task.scala:53)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)


my object:
object Test {

  def main(args: Array[String]) {
    val sc = new SparkContext(args(0), "Test",
      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

    val mg = sc.textFile("/user/.../part-*")
    val mct = sc.textFile("/user/.../part-*")

    val pair1 = mg.map {
      s =>
        val cols = s.split("\t")
        (cols(0), cols(1))
    }
    val pair2 = mct.map {
      s =>
        val cols = s.split("\t")
        (cols(0), cols(1))
    }
    val merge = pair1.union(pair2)
    val result = merge.reduceByKey(_ + _)
    val outputPath = new Path("/user/xxx/temp/spark-output")
    outputPath.getFileSystem(new Configuration()).delete(outputPath, true)
    result.saveAsTextFile(outputPath.toString)

    System.exit(0)
  }

}

My spark version is 0.9 and I run my job use this command
"/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
--worker-cores 2"
  • [no subject] Hahn Jiang
    • Re: Sonal Goyal
      • Re: Hahn Jiang
        • Re: Mayur Rustagi
          • Re: Hahn Jiang

Reply via email to