Hi Xiangrui, many thanks to you and Sandy for fixing this issue!
On Fri, May 16, 2014 at 10:23 PM, Xiangrui Meng <[email protected]> wrote: > Hi Andrew, > > I submitted a patch and verified it solves the problem. You can > download the patch from > https://issues.apache.org/jira/browse/HADOOP-10614 . > > Best, > Xiangrui > > On Fri, May 16, 2014 at 6:48 PM, Xiangrui Meng <[email protected]> wrote: > > Hi Andrew, > > > > This is the JIRA I created: > > https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully > > someone wants to work on it. > > > > Best, > > Xiangrui > > > > On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng <[email protected]> wrote: > >> Hi Andre, > >> > >> I could reproduce the bug with Hadoop 2.2.0. Some older version of > >> Hadoop do not support splittable compression, so you ended up with > >> sequential reads. It is easy to reproduce the bug with the following > >> setup: > >> > >> 1) Workers are configured with multiple cores. > >> 2) BZip2 files are big enough or minPartitions is large enough when > >> you load the file via sc.textFile(), so that one worker has more than > >> one tasks. > >> > >> Best, > >> Xiangrui > >> > >> On Fri, May 16, 2014 at 4:06 PM, Andrew Ash <[email protected]> > wrote: > >>> Hi Xiangrui, > >>> > >>> // FYI I'm getting your emails late due to the Apache mailing list > outage > >>> > >>> I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The > .jars are > >>> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar > >>> > >>> I'm also glad you were able to reproduce! Please paste a link to the > Hadoop > >>> bug you file so I can follow along. > >>> > >>> Thanks! > >>> Andrew > >>> > >>> > >>> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <[email protected]> > wrote: > >>>> > >>>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes > >>>> the problem you described, but it does contain several fixes to bzip2 > >>>> format. -Xiangrui > >>>> > >>>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <[email protected]> > wrote: > >>>> > Hi all, > >>>> > > >>>> > Is anyone reading and writing to .bz2 files stored in HDFS from > Spark > >>>> > with > >>>> > success? > >>>> > > >>>> > > >>>> > I'm finding the following results on a recent commit (756c96 from > 24hr > >>>> > ago) > >>>> > and CDH 4.4.0: > >>>> > > >>>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count > >>>> > Doesn't work: val r = > sc.textFile("/user/aa/myfile.bz2").map((s:String) > >>>> > => > >>>> > s+"| " ).count > >>>> > > >>>> > Specifically, I'm getting an exception coming out of the bzip2 > libraries > >>>> > (see below stacktraces), which is unusual because I'm able to read > from > >>>> > that > >>>> > file without an issue using the same libraries via Pig. It was > >>>> > originally > >>>> > created from Pig as well. > >>>> > > >>>> > Digging a little deeper I found this line in the .bz2 decompressor's > >>>> > javadoc > >>>> > for CBZip2InputStream: > >>>> > > >>>> > "Instances of this class are not threadsafe." [source] > >>>> > > >>>> > > >>>> > My current working theory is that Spark has a much higher level of > >>>> > parallelism than Pig/Hadoop does and thus I get these wild > >>>> > IndexOutOfBounds > >>>> > exceptions much more frequently (as in can't finish a run over a > little > >>>> > 2M > >>>> > row file) vs hardly at all in other libraries. > >>>> > > >>>> > The only other reference I could find to the issue was in > presto-users, > >>>> > but > >>>> > the recommendation to leave .bz2 for .lzo doesn't help if I > actually do > >>>> > want > >>>> > the higher compression levels of .bz2. > >>>> > > >>>> > > >>>> > Would love to hear if I have some kind of configuration issue or if > >>>> > there's > >>>> > a bug in .bz2 that's fixed in later versions of CDH, or generally > any > >>>> > other > >>>> > thoughts on the issue. > >>>> > > >>>> > > >>>> > Thanks! > >>>> > Andrew > >>>> > > >>>> > > >>>> > > >>>> > Below are examples of some exceptions I'm getting: > >>>> > > >>>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to > >>>> > java.lang.ArrayIndexOutOfBoundsException > >>>> > java.lang.ArrayIndexOutOfBoundsException: 65535 > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) > >>>> > at java.io.InputStream.read(InputStream.java:101) > >>>> > at > >>>> > > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) > >>>> > at > >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) > >>>> > at > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > java.lang.ArrayIndexOutOfBoundsException: 900000 > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) > >>>> > at java.io.InputStream.read(InputStream.java:101) > >>>> > at > >>>> > > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) > >>>> > at > >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) > >>>> > at > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) > >>>> > at > >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > >>>> > at > >>>> > > >>>> > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > >>>> > at > >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >>>> > at > >>>> > > >>>> > org.apache.spark.rdd.RDD.org > $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) > >>>> > > >>>> > > >>>> > > >>>> > java.lang.ArrayIndexOutOfBoundsException: -921878509 > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432) > >>>> > at java.io.InputStream.read(InputStream.java:101) > >>>> > at > >>>> > > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) > >>>> > at > >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) > >>>> > at > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) > >>>> > at > >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > >>>> > at > >>>> > > >>>> > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > >>>> > at > >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >>>> > at > >>>> > > >>>> > org.apache.spark.rdd.RDD.org > $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) > >>>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) > >>>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) > >>>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) > >>>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) > >>>> > > >>>> > > >>>> > > >>>> > java.lang.ArrayIndexOutOfBoundsException: -1321104434 > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) > >>>> > at java.io.InputStream.read(InputStream.java:101) > >>>> > at > >>>> > > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) > >>>> > at > >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) > >>>> > at > >>>> > > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) > >>>> > at > >>>> > > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) > >>>> > at > >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) > >>>> > at > >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > >>>> > at > >>>> > > >>>> > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>> > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > >>>> > at > >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >>>> > at > >>>> > > >>>> > org.apache.spark.rdd.RDD.org > $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) > >>> > >>> >
