Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too.
2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi Fabian, > > I tried to debug the code, and it turns out a line in my csv data is > causing the ArrayIndexOutOfBoundsException, here is the exception > stacktrace: > > java.lang.ArrayIndexOutOfBoundsException: -1 > at org.apache.flink.types.parser.StringParser.parseField( > StringParser.java:49) > at org.apache.flink.types.parser.StringParser.parseField( > StringParser.java:28) > at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse( > FieldParser.java:98) > at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord( > GenericCsvInputFormat.java:395) > at org.apache.flink.api.java.io.CsvInputFormat.readRecord( > CsvInputFormat.java:110) > at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord( > DelimitedInputFormat.java:470) > at org.apache.flink.api.java.io.CsvInputFormat.nextRecord( > CsvInputFormat.java:78) > at org.myorg.quickstart.MyCsvInputFormat.nextRecord( > MyCsvInputFormat.java:106) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:162) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > And here is a sample CSV: > > timestamp,url,id > 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/ > infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 > > Using my code, I get the previous exception when parsing the sample CSV. > If I use the following code, I get an incorrect result : (2016-08-31 > 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 > 12:08:11.223, 0000000) > > DataSet<Tuple2<String, String>> withReadCSV = > env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") > .ignoreFirstLine() > .fieldDelimiter(",") > .includeFields("101") > .ignoreInvalidLines() > .types(String.class, String.class); > withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", > FileSystem.WriteMode.OVERWRITE).setParallelism(1); > > > Is it a bug in Flink or is my data not compliant with the csv standards? > > Thanks, > Yassine > > > 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Yassine, >> >> I ran your code without problems and got the correct result. >> Can you provide the Stacktrace of the Exception? >> >> Thanks, Fabian >> >> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: >> >>> Thank you Fabian and Stephan for the suggestions. >>> I couldn't override "readLine()" because it's final, so went with >>> Fabian's solution, but I'm struggling with csv field masks. Any help is >>> appreciated. >>> I created an Input Format which is basically TupleCsvInputFormat for >>> which I overrode the nextRecord() method to catch the exceptions. But I'm >>> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >>> mask, the job succeeds but outputs the first and second columns. Here is my >>> code: >>> >>> TupleTypeInfo<Tuple2<String, String>> typeInfo = >>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >>> Path histPath = new Path("hdfs:///shared/file.csv"); >>> >>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new >>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >>> myInputFormt.enableQuotedStringParsing('"'); >>> myInputFormt.setSkipFirstLineAsHeader(true); >>> myInputFormt.setLenient(true); >>> >>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t >>> ypeInfo).withParameters(parameters); >>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); >>> >>> and here is the custom input format: >>> >>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { >>> private static final long serialVersionUID = 1L; >>> private TupleSerializerBase<OUT> tupleSerializer; >>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>> tupleTypeInfo) { >>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>> tupleTypeInfo); >>> } >>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { >>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >>> createDefaultMask(tupleTypeInfo.getArity())); >>> } >>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>> tupleTypeInfo, int[] includedFieldsMask) { >>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>> tupleTypeInfo, includedFieldsMask); >>> } >>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] >>> includedFieldsMask) { >>> super(filePath); >>> boolean[] mask = (includedFieldsMask == null) >>> ? createDefaultMask(tupleTypeInfo.getArity()) >>> : toBooleanMask(includedFieldsMask); >>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >>> } >>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>> tupleTypeInfo, boolean[] includedFieldsMask) { >>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>> tupleTypeInfo, includedFieldsMask); >>> } >>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] >>> includedFieldsMask) { >>> super(filePath); >>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, >>> includedFieldsMask); >>> } >>> private void configure(String lineDelimiter, String fieldDelimiter, >>> TupleTypeInfoBase<OUT> tupleTypeInfo, >>> boolean[] includedFieldsMask) { >>> if (tupleTypeInfo.getArity() == 0) { >>> throw new IllegalArgumentException("Tuple size must be >>> greater than 0."); >>> } >>> if (includedFieldsMask == null) { >>> includedFieldsMask = createDefaultMask(tupleTypeInf >>> o.getArity()); >>> } >>> tupleSerializer = (TupleSerializerBase<OUT>) >>> tupleTypeInfo.createSerializer(new ExecutionConfig()); >>> setDelimiter(lineDelimiter); >>> setFieldDelimiter(fieldDelimiter); >>> Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; >>> for (int i = 0; i < tupleTypeInfo.getArity(); i++) { >>> classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); >>> } >>> setFieldsGeneric(includedFieldsMask, classes); >>> } >>> @Override >>> public OUT fillRecord(OUT reuse, Object[] parsedValues) { >>> return tupleSerializer.createOrReuseInstance(parsedValues, >>> reuse); >>> } >>> >>> @Override >>> public OUT nextRecord(OUT record) { >>> OUT returnRecord = null; >>> do { >>> try { >>> returnRecord = super.nextRecord(record); >>> } catch (IOException e) { >>> e.printStackTrace(); >>> } >>> } while (returnRecord == null && !reachedEnd()); >>> return returnRecord; >>> } >>> } >>> >>> Thanks, >>> Yassine >>> >>> >>> >>> >>> >>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: >>> >>>> How about just overriding the "readLine()" method to call >>>> "super.readLine()" and catching EOF exceptions? >>>> >>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Yassine, >>>>> >>>>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>>>> You could try to implement a FileInputFormat that wraps the >>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>>>> The wrapper would also catch and ignore the EOFException. >>>>> >>>>> If you do that, you would not be able to use the env.readCsvFile() >>>>> shortcut but would need to create an instance of your own InputFormat and >>>>> add it with >>>>> env.readFile(yourIF). >>>>> >>>>> Hope this helps, >>>>> Fabian >>>>> >>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI < >>>>> y.marzou...@mindlytix.com>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am reading a large number of GZip compressed csv files, nested in a >>>>>> HDFS directory: >>>>>> >>>>>> Configuration parameters = new Configuration(); >>>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>>>> d/logs/") >>>>>> .ignoreFirstLine() >>>>>> .fieldDelimiter("|") >>>>>> .includeFields("011000") >>>>>> .types(String.class, Long.class) >>>>>> .withParameters(parameters); >>>>>> >>>>>> My job is failing with the following exception: >>>>>> >>>>>> 2016-10-04 17:19:59,933 INFO >>>>>> org.apache.flink.runtime.jobmanager.JobManager - Status >>>>>> of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>>>>> >>>>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>>>> >>>>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>>>> >>>>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>>>> >>>>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>>>> >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>>>> >>>>>> at java.lang.Thread.run(Unknown Source) >>>>>> >>>>>> I think it is due to some unproperly compressed files, how can I handle >>>>>> and ignore such exceptions? Thanks. >>>>>> >>>>>> >>>>>> Best, >>>>>> Yassine >>>>>> >>>>>> >>>>> >>>> >>> >> >