I am still facing this issue. Executor dies due to org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) ... Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) at java.io.DataInputStream.read(DataInputStream.java:149)
Spark automatically launched new executors and the whole job completed fine. Anyone has a clue what's going on? The spark job reads avro files from a directory, do some basic map/filter and then repartition to 1, write the result to HDFS. I use spark 1.3 with spark-avro (1.0.0). The error only happens when running on the whole dataset. When running on 1/3 of the files, the same job completes without error. On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang <ljia...@gmail.com> wrote: > Hi, there > > Here is the problem I ran into when executing a Spark Job (Spark 1.3). The > spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0 > library. Then it does some filter/map transformation, repartition to 1 > partition and then write to HDFS. It creates 2 stages. The total HDFS block > number is around 12000, thus it creates 12000 partitions, thus 12000 tasks > for the first stage. I have total 9 executors launched with 5 thread for > each. The job has run fine until the very end. When it reaches 19980/20000 > tasks succeeded, it suddenly failed the last 20 tasks and I lost 2 > executors. The spark did launched 2 new executors and finishes the job > eventually by reprocessing the 20 tasks. > > I only ran into this issue when I run the spark application on the full > dataset. When I run the 1/3 of the dataset, everything finishes fine > without error. > > Question 1: What is the root cause of this issue? It is simiar to > http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed > and https://issues.apache.org/jira/browse/SPARK-3052, but it says the > issue has been fixed since 1.2 > Quesiton 2: I am a little surprised that after the 2 new executors were > launched, replacing the two failed executors, they simply reprocessed the > failed 20 tasks/partitions. What about the results for other parititons > processed by the 2 failed executors before? I assumed the results of these > parititons are stored to the local disk and thus do not need to be computed > by the new exectuors? When are the data stored locally? Is it > configuration? This question is for my own understanding about the spark > framework. > > The exception causing the exectuor failure is below > > org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem > closed > at > org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278) > at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64) > at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Filesystem closed > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794) > at > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897) > at java.io.DataInputStream.read(DataInputStream.java:149) > at org.apache.avro.mapred.FsInput.read(FsInput.java:54) > at > org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210) > at > org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839) > at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444) > at > org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264) >