Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The
avro files are from a third party and a few of them are corrupted.
val path = "{myDirecotry of avro files}"
val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") val
sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.avroFile(path); data.select(.....)
When I run the above code, I get the following exception.
org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222)
~[classes/:1.7.7] at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library.jar:na] at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library.jar:na] at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
~[spark-sql_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
~[spark-sql_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.Task.run(Task.scala:64)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by:
java.io.IOException: Invalid sync! at
org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314)
~[classes/:1.7.7] at
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209)
~[classes/:1.7.7] ... 25 common frames omitted
Is there an easy way to skip a corrupted avro file without reading the files
one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is
to have my own version of org.apache.avro.file.DataFileStream with method
hasNext returns false (to signal the end file), when java.io.IOException:
Invalid sync! is thrown. Please see line 210 in
https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
Thanks in advance for any assistance ! Shing