Sebastian Neef created FLINK-3568: ------------------------------------- Summary: Hadoop's Bzip decompression is not thread safe Key: FLINK-3568 URL: https://issues.apache.org/jira/browse/FLINK-3568 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Affects Versions: 0.10.1 Reporter: Sebastian Neef
Hi, first of all, this is my first time filing a bug report for Apache Flink. If you need any additional information or something else, please let me know. h1. Background I was trying to process [Wikipedia's XML dumps|https://dumps.wikimedia.org/enwiki/20160204/] with Apache Flink. To save disk space I decided to use the bziped versions. Apache Flink is compatible to [Hadoop's InputFormats |https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html] and Hadoop's TextInputFormat [supports compressed files|https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html#line.5]. [Bzip2 files are splittable|http://comphadoop.weebly.com/index.html] and thus perfect for parallel processing. h1. Problem I started to test the decompression with a simple Job based on the Apache Flink Quickstart code: {code} public static void main(String[] args) throws Exception { if(args.length != 2) { System.err.println("USAGE: Job <wikipediadump.xml.bz2> <output.txt>"); return; } // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, args[0]); input.writeAsText(args[1]); // execute program env.execute("Bzip compression test"); } {code} When starting the job, I get the following exception: {noformat} 02/29/2016 11:59:50 CHAIN DataSource (at createInput(ExecutionEnvironment.java:508) (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Map (Map at main(Job.java:67))(1/2) switched to FAILED java.lang.ArrayIndexOutOfBoundsException: 18002 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:730) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:801) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.fetchNext(HadoopInputFormatBase.java:185) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.reachedEnd(HadoopInputFormatBase.java:179) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:166) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) {noformat} This does not happen with "-p 1", but a parallelism greater 1. h1. Research Googling the error message leads to some spark/hadoop mailing lists and it looks like the used "compress.bzip2.CBZip2InputStream" class is not threadsafe: - [Link one|https://issues.apache.org/jira/browse/HADOOP-10614] - [Link two|http://stackoverflow.com/questions/5159602/processing-a-bzip-string-file-in-scala] - [Link three|http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-using-sc-textFile-on-BZ2-compressed-files-td22905.html] - [Link four|https://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c1402318634.7682.yahoomail...@web190401.mail.sg3.yahoo.com%3E] Especially Link one is the most interesting one, because the Hadoop project resolved the issue: {quote} Hadoop uses CBZip2InputStream to decode bzip2 files. However, the implementation is not threadsafe. This is not a really problem for Hadoop MapReduce because Hadoop runs each task in a separate JVM. But for other libraries that utilize multithreading and use Hadoop's InputFormat, e.g., Spark, it will cause exceptions like the following: {quote} My guess is that Apache Flink needs to update/patch the CBZip2InputStream class to resolve the problem? All the best, Sebastian -- This message was sent by Atlassian JIRA (v6.3.4#6332)