Thanks for the info !
Shing
On Tuesday, 5 May 2015, 15:11, Imran Rashid <[email protected]> wrote:
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593
and the discussion around the PRs.
This is probably more complicated than what you are looking for, but you could
copy the code for HadoopReliableRDD in the PR into your own code and use it,
without having to wait for the issue to get resolved.
On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man <[email protected]>
wrote:
Hi, I am using Spark 1.3.1 to read a directory of about 2000 avro files. The
avro files are from a third party and a few of them are corrupted.
val path = "{myDirecotry of avro files}"
val sparkConf = new SparkConf().setAppName("avroDemo").setMaster("local") val
sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.avroFile(path); data.select(.....)
When I run the above code, I get the following exception.
org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync! at
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222)
~[classes/:1.7.7] at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at
org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
~[avro-mapred-1.7.7-hadoop2.jar:1.7.7] at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library.jar:na] at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library.jar:na] at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
~[spark-sql_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
~[spark-sql_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.scheduler.Task.run(Task.scala:64)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
~[spark-core_2.10-1.3.1.jar:1.3.1] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by:
java.io.IOException: Invalid sync! at
org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314)
~[classes/:1.7.7] at
org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209)
~[classes/:1.7.7] ... 25 common frames omitted
Is there an easy way to skip a corrupted avro file without reading the files
one by one using sqlContext.avroFile(file) ? At present, my solution (hack) is
to have my own version of org.apache.avro.file.DataFileStream with method
hasNext returns false (to signal the end file), when java.io.IOException:
Invalid sync! is thrown. Please see line 210 in
https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
Thanks in advance for any assistance ! Shing