Hi Andrew, I submitted a patch and verified it solves the problem. You can download the patch from https://issues.apache.org/jira/browse/HADOOP-10614 .
Best, Xiangrui On Fri, May 16, 2014 at 6:48 PM, Xiangrui Meng <men...@gmail.com> wrote: > Hi Andrew, > > This is the JIRA I created: > https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully > someone wants to work on it. > > Best, > Xiangrui > > On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng <men...@gmail.com> wrote: >> Hi Andre, >> >> I could reproduce the bug with Hadoop 2.2.0. Some older version of >> Hadoop do not support splittable compression, so you ended up with >> sequential reads. It is easy to reproduce the bug with the following >> setup: >> >> 1) Workers are configured with multiple cores. >> 2) BZip2 files are big enough or minPartitions is large enough when >> you load the file via sc.textFile(), so that one worker has more than >> one tasks. >> >> Best, >> Xiangrui >> >> On Fri, May 16, 2014 at 4:06 PM, Andrew Ash <and...@andrewash.com> wrote: >>> Hi Xiangrui, >>> >>> // FYI I'm getting your emails late due to the Apache mailing list outage >>> >>> I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are >>> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar >>> >>> I'm also glad you were able to reproduce! Please paste a link to the Hadoop >>> bug you file so I can follow along. >>> >>> Thanks! >>> Andrew >>> >>> >>> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <men...@gmail.com> wrote: >>>> >>>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes >>>> the problem you described, but it does contain several fixes to bzip2 >>>> format. -Xiangrui >>>> >>>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <and...@andrewash.com> wrote: >>>> > Hi all, >>>> > >>>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark >>>> > with >>>> > success? >>>> > >>>> > >>>> > I'm finding the following results on a recent commit (756c96 from 24hr >>>> > ago) >>>> > and CDH 4.4.0: >>>> > >>>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count >>>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) >>>> > => >>>> > s+"| " ).count >>>> > >>>> > Specifically, I'm getting an exception coming out of the bzip2 libraries >>>> > (see below stacktraces), which is unusual because I'm able to read from >>>> > that >>>> > file without an issue using the same libraries via Pig. It was >>>> > originally >>>> > created from Pig as well. >>>> > >>>> > Digging a little deeper I found this line in the .bz2 decompressor's >>>> > javadoc >>>> > for CBZip2InputStream: >>>> > >>>> > "Instances of this class are not threadsafe." [source] >>>> > >>>> > >>>> > My current working theory is that Spark has a much higher level of >>>> > parallelism than Pig/Hadoop does and thus I get these wild >>>> > IndexOutOfBounds >>>> > exceptions much more frequently (as in can't finish a run over a little >>>> > 2M >>>> > row file) vs hardly at all in other libraries. >>>> > >>>> > The only other reference I could find to the issue was in presto-users, >>>> > but >>>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do >>>> > want >>>> > the higher compression levels of .bz2. >>>> > >>>> > >>>> > Would love to hear if I have some kind of configuration issue or if >>>> > there's >>>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any >>>> > other >>>> > thoughts on the issue. >>>> > >>>> > >>>> > Thanks! >>>> > Andrew >>>> > >>>> > >>>> > >>>> > Below are examples of some exceptions I'm getting: >>>> > >>>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to >>>> > java.lang.ArrayIndexOutOfBoundsException >>>> > java.lang.ArrayIndexOutOfBoundsException: 65535 >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>>> > at java.io.InputStream.read(InputStream.java:101) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>>> > at >>>> > >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>>> > at >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>>> > >>>> > >>>> > >>>> > >>>> > java.lang.ArrayIndexOutOfBoundsException: 900000 >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>>> > at java.io.InputStream.read(InputStream.java:101) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>>> > at >>>> > >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>>> > at >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>>> > at >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>> > at >>>> > >>>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> > at >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> > at >>>> > >>>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>>> > >>>> > >>>> > >>>> > java.lang.ArrayIndexOutOfBoundsException: -921878509 >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432) >>>> > at java.io.InputStream.read(InputStream.java:101) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>>> > at >>>> > >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>>> > at >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>>> > at >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>> > at >>>> > >>>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> > at >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> > at >>>> > >>>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>>> > >>>> > >>>> > >>>> > java.lang.ArrayIndexOutOfBoundsException: -1321104434 >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>>> > at >>>> > >>>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>>> > at java.io.InputStream.read(InputStream.java:101) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>>> > at >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>>> > at >>>> > >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>>> > at >>>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>>> > at >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>>> > at >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>> > at >>>> > >>>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> > at >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> > at >>>> > >>>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>> >>>