Andre Bois-Crettez <andre.b...@kelkoo.com> skrev:
>We never saw your exception when reading bzip2 files with spark. > >But when we wrongly compiled spark against older version of hadoop (was >default in spark), we ended up with sequential reading of bzip2 file, >not taking advantage of block splits to work in parallel. >Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read >in parallel, as expected with a recent hadoop. > >http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions > >Make sure Spark is compiled against Hadoop v2 > >André > >On 2014-05-13 18:08, Xiangrui Meng wrote: >> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes >> the problem you described, but it does contain several fixes to bzip2 >> format. -Xiangrui >> >> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <and...@andrewash.com> wrote: >>> Hi all, >>> >>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with >>> success? >>> >>> >>> I'm finding the following results on a recent commit (756c96 from 24hr ago) >>> and CDH 4.4.0: >>> >>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count >>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) => >>> s+"| " ).count >>> >>> Specifically, I'm getting an exception coming out of the bzip2 libraries >>> (see below stacktraces), which is unusual because I'm able to read from that >>> file without an issue using the same libraries via Pig. It was originally >>> created from Pig as well. >>> >>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc >>> for CBZip2InputStream: >>> >>> "Instances of this class are not threadsafe." [source] >>> >>> >>> My current working theory is that Spark has a much higher level of >>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds >>> exceptions much more frequently (as in can't finish a run over a little 2M >>> row file) vs hardly at all in other libraries. >>> >>> The only other reference I could find to the issue was in presto-users, but >>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want >>> the higher compression levels of .bz2. >>> >>> >>> Would love to hear if I have some kind of configuration issue or if there's >>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other >>> thoughts on the issue. >>> >>> >>> Thanks! >>> Andrew >>> >>> >>> >>> Below are examples of some exceptions I'm getting: >>> >>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to >>> java.lang.ArrayIndexOutOfBoundsException >>> java.lang.ArrayIndexOutOfBoundsException: 65535 >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> at >>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> at java.io.InputStream.read(InputStream.java:101) >>> at >>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> >>> >>> >>> >>> java.lang.ArrayIndexOutOfBoundsException: 900000 >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> at >>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> at java.io.InputStream.read(InputStream.java:101) >>> at >>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>> >>> >>> >>> java.lang.ArrayIndexOutOfBoundsException: -921878509 >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> at >>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432) >>> at java.io.InputStream.read(InputStream.java:101) >>> at >>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>> at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>> at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>> >>> >>> >>> java.lang.ArrayIndexOutOfBoundsException: -1321104434 >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> at >>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> at >>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> at java.io.InputStream.read(InputStream.java:101) >>> at >>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> at >>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> at >>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> at >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) > > >-- >André Bois-Crettez > >Software Architect >Big Data Developer >http://www.kelkoo.com/ > > >Kelkoo SAS >Société par Actions Simplifiée >Au capital de € 4.168.964,30 >Siège social : 8, rue du Sentier 75002 Paris >425 093 069 RCS Paris > >Ce message et les pièces jointes sont confidentiels et établis à l'attention >exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce >message, merci de le détruire et d'en avertir l'expéditeur.