I posted a workaround for that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java
On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <fhue...@gmail.com> wrote: > Hi, > > Flink's String parser does not support escaped quotes. You data contains a > double double quote (""). The parser identifies this as the end of the > string field. > As a workaround, you can read the file as a regular text file, line by > line and do the parsing in a MapFunction. > > Best, Fabian > > 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the >> same exception with the second code too. >> >> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: >> >>> Hi Fabian, >>> >>> I tried to debug the code, and it turns out a line in my csv data is >>> causing the ArrayIndexOutOfBoundsException, here is the exception >>> stacktrace: >>> >>> java.lang.ArrayIndexOutOfBoundsException: -1 >>> at org.apache.flink.types.parser.StringParser.parseField(String >>> Parser.java:49) >>> at org.apache.flink.types.parser.StringParser.parseField(String >>> Parser.java:28) >>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd >>> Parse(FieldParser.java:98) >>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe >>> cord(GenericCsvInputFormat.java:395) >>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn >>> putFormat.java:110) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco >>> rd(DelimitedInputFormat.java:470) >>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn >>> putFormat.java:78) >>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF >>> ormat.java:106) >>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>> aSourceTask.java:162) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> And here is a sample CSV: >>> >>> timestamp,url,id >>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr >>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 >>> >>> Using my code, I get the previous exception when parsing the sample CSV. >>> If I use the following code, I get an incorrect result : (2016-08-31 >>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 >>> 12:08:11.223, 0000000) >>> >>> DataSet<Tuple2<String, String>> withReadCSV = >>> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") >>> .ignoreFirstLine() >>> .fieldDelimiter(",") >>> .includeFields("101") >>> .ignoreInvalidLines() >>> .types(String.class, String.class); >>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", >>> FileSystem.WriteMode.OVERWRITE).setParallelism(1); >>> >>> >>> Is it a bug in Flink or is my data not compliant with the csv standards? >>> >>> Thanks, >>> Yassine >>> >>> >>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> Hi Yassine, >>>> >>>> I ran your code without problems and got the correct result. >>>> Can you provide the Stacktrace of the Exception? >>>> >>>> Thanks, Fabian >>>> >>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>> >: >>>> >>>>> Thank you Fabian and Stephan for the suggestions. >>>>> I couldn't override "readLine()" because it's final, so went with >>>>> Fabian's solution, but I'm struggling with csv field masks. Any help is >>>>> appreciated. >>>>> I created an Input Format which is basically TupleCsvInputFormat for >>>>> which I overrode the nextRecord() method to catch the exceptions. But I'm >>>>> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >>>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >>>>> mask, the job succeeds but outputs the first and second columns. Here is >>>>> my >>>>> code: >>>>> >>>>> TupleTypeInfo<Tuple2<String, String>> typeInfo = >>>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >>>>> Path histPath = new Path("hdfs:///shared/file.csv"); >>>>> >>>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new >>>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >>>>> myInputFormt.enableQuotedStringParsing('"'); >>>>> myInputFormt.setSkipFirstLineAsHeader(true); >>>>> myInputFormt.setLenient(true); >>>>> >>>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t >>>>> ypeInfo).withParameters(parameters); >>>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE >>>>> ); >>>>> >>>>> and here is the custom input format: >>>>> >>>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { >>>>> private static final long serialVersionUID = 1L; >>>>> private TupleSerializerBase<OUT> tupleSerializer; >>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>> tupleTypeInfo) { >>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo); >>>>> } >>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { >>>>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>>> createDefaultMask(tupleTypeInfo.getArity())); >>>>> } >>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>> tupleTypeInfo, int[] includedFieldsMask) { >>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); >>>>> } >>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] >>>>> includedFieldsMask) { >>>>> super(filePath); >>>>> boolean[] mask = (includedFieldsMask == null) >>>>> ? createDefaultMask(tupleTypeInfo.getArity()) >>>>> : toBooleanMask(includedFieldsMask); >>>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >>>>> } >>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>> tupleTypeInfo, boolean[] includedFieldsMask) { >>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); >>>>> } >>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] >>>>> includedFieldsMask) { >>>>> super(filePath); >>>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>>> includedFieldsMask); >>>>> } >>>>> private void configure(String lineDelimiter, String fieldDelimiter, >>>>> TupleTypeInfoBase<OUT> tupleTypeInfo, >>>>> boolean[] includedFieldsMask) { >>>>> if (tupleTypeInfo.getArity() == 0) { >>>>> throw new IllegalArgumentException("Tuple size must be >>>>> greater than 0."); >>>>> } >>>>> if (includedFieldsMask == null) { >>>>> includedFieldsMask = createDefaultMask(tupleTypeInf >>>>> o.getArity()); >>>>> } >>>>> tupleSerializer = (TupleSerializerBase<OUT>) >>>>> tupleTypeInfo.createSerializer(new ExecutionConfig()); >>>>> setDelimiter(lineDelimiter); >>>>> setFieldDelimiter(fieldDelimiter); >>>>> Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; >>>>> for (int i = 0; i < tupleTypeInfo.getArity(); i++) { >>>>> classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); >>>>> } >>>>> setFieldsGeneric(includedFieldsMask, classes); >>>>> } >>>>> @Override >>>>> public OUT fillRecord(OUT reuse, Object[] parsedValues) { >>>>> return tupleSerializer.createOrReuseInstance(parsedValues, >>>>> reuse); >>>>> } >>>>> >>>>> @Override >>>>> public OUT nextRecord(OUT record) { >>>>> OUT returnRecord = null; >>>>> do { >>>>> try { >>>>> returnRecord = super.nextRecord(record); >>>>> } catch (IOException e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } while (returnRecord == null && !reachedEnd()); >>>>> return returnRecord; >>>>> } >>>>> } >>>>> >>>>> Thanks, >>>>> Yassine >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: >>>>> >>>>>> How about just overriding the "readLine()" method to call >>>>>> "super.readLine()" and catching EOF exceptions? >>>>>> >>>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Yassine, >>>>>>> >>>>>>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>>>>>> You could try to implement a FileInputFormat that wraps the >>>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>>>>>> The wrapper would also catch and ignore the EOFException. >>>>>>> >>>>>>> If you do that, you would not be able to use the env.readCsvFile() >>>>>>> shortcut but would need to create an instance of your own InputFormat >>>>>>> and >>>>>>> add it with >>>>>>> env.readFile(yourIF). >>>>>>> >>>>>>> Hope this helps, >>>>>>> Fabian >>>>>>> >>>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI < >>>>>>> y.marzou...@mindlytix.com>: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I am reading a large number of GZip compressed csv files, nested in >>>>>>>> a HDFS directory: >>>>>>>> >>>>>>>> Configuration parameters = new Configuration(); >>>>>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>>>>>> d/logs/") >>>>>>>> .ignoreFirstLine() >>>>>>>> .fieldDelimiter("|") >>>>>>>> .includeFields("011000") >>>>>>>> .types(String.class, Long.class) >>>>>>>> .withParameters(parameters); >>>>>>>> >>>>>>>> My job is failing with the following exception: >>>>>>>> >>>>>>>> 2016-10-04 17:19:59,933 INFO >>>>>>>> org.apache.flink.runtime.jobmanager.JobManager - Status >>>>>>>> of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>>>>>>> >>>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>>>>>> >>>>>>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>>>>>> >>>>>>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>>>>>> >>>>>>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>>>>>> >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>>>>>> >>>>>>>> at java.lang.Thread.run(Unknown Source) >>>>>>>> >>>>>>>> I think it is due to some unproperly compressed files, how can I >>>>>>>> handle and ignore such exceptions? Thanks. >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Yassine >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >