Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same
exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Fabian,
>
> I tried to debug the code, and it turns out a line in my csv data is
> causing the ArrayIndexOutOfBoundsException, here is the exception
> stacktrace:
>
> java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.flink.types.parser.StringParser.parseField(
> StringParser.java:49)
> at org.apache.flink.types.parser.StringParser.parseField(
> StringParser.java:28)
> at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(
> FieldParser.java:98)
> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(
> GenericCsvInputFormat.java:395)
> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(
> CsvInputFormat.java:110)
> at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(
> DelimitedInputFormat.java:470)
> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(
> CsvInputFormat.java:78)
> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(
> MyCsvInputFormat.java:106)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:162)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> And here is a sample CSV:
>
> timestamp,url,id
> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/
> infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000
>
> Using my code, I get the previous exception when parsing the sample CSV.
> If I use the following code, I get an incorrect result : (2016-08-31
> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
> 12:08:11.223, 0000000)
>
> DataSet<Tuple2<String, String>> withReadCSV = 
> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
>         .ignoreFirstLine()
>         .fieldDelimiter(",")
>         .includeFields("101")
>         .ignoreInvalidLines()
>         .types(String.class, String.class);
> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", 
> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>
>
> Is it a bug in Flink or is my data not compliant with the csv standards?
>
> Thanks,
> Yassine
>
>
> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yassine,
>>
>> I ran your code without problems and got the correct result.
>> Can you provide the Stacktrace of the Exception?
>>
>> Thanks, Fabian
>>
>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Thank you Fabian and Stephan for the suggestions.
>>> I couldn't override "readLine()" because it's final, so went with
>>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>>> appreciated.
>>> I created an Input Format which is basically TupleCsvInputFormat for
>>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>>> mask, the job succeeds but outputs the first and second columns. Here is my
>>> code:
>>>
>>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>>> Path histPath = new Path("hdfs:///shared/file.csv");
>>>
>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>>> myInputFormt.enableQuotedStringParsing('"');
>>> myInputFormt.setSkipFirstLineAsHeader(true);
>>> myInputFormt.setLenient(true);
>>>
>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>>> ypeInfo).withParameters(parameters);
>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>>>
>>> and here is the  custom input format:
>>>
>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
>>>     private static final long serialVersionUID = 1L;
>>>     private TupleSerializerBase<OUT> tupleSerializer;
>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>> tupleTypeInfo) {
>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>> tupleTypeInfo);
>>>     }
>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>>>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>> createDefaultMask(tupleTypeInfo.getArity()));
>>>     }
>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>> tupleTypeInfo, int[] includedFieldsMask) {
>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>> tupleTypeInfo, includedFieldsMask);
>>>     }
>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
>>> includedFieldsMask) {
>>>         super(filePath);
>>>         boolean[] mask = (includedFieldsMask == null)
>>>                 ? createDefaultMask(tupleTypeInfo.getArity())
>>>                 : toBooleanMask(includedFieldsMask);
>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>>>     }
>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>> tupleTypeInfo, boolean[] includedFieldsMask) {
>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>> tupleTypeInfo, includedFieldsMask);
>>>     }
>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
>>> includedFieldsMask) {
>>>         super(filePath);
>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>> includedFieldsMask);
>>>     }
>>>     private void configure(String lineDelimiter, String fieldDelimiter,
>>>                            TupleTypeInfoBase<OUT> tupleTypeInfo,
>>> boolean[] includedFieldsMask) {
>>>         if (tupleTypeInfo.getArity() == 0) {
>>>             throw new IllegalArgumentException("Tuple size must be
>>> greater than 0.");
>>>         }
>>>         if (includedFieldsMask == null) {
>>>             includedFieldsMask = createDefaultMask(tupleTypeInf
>>> o.getArity());
>>>         }
>>>         tupleSerializer = (TupleSerializerBase<OUT>)
>>> tupleTypeInfo.createSerializer(new ExecutionConfig());
>>>         setDelimiter(lineDelimiter);
>>>         setFieldDelimiter(fieldDelimiter);
>>>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>>>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>>>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>>>         }
>>>         setFieldsGeneric(includedFieldsMask, classes);
>>>     }
>>>     @Override
>>>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>>>         return tupleSerializer.createOrReuseInstance(parsedValues,
>>> reuse);
>>>     }
>>>
>>>     @Override
>>>     public OUT nextRecord(OUT record) {
>>>         OUT returnRecord = null;
>>>         do {
>>>             try {
>>>                 returnRecord = super.nextRecord(record);
>>>             } catch (IOException e) {
>>>                 e.printStackTrace();
>>>             }
>>>         } while (returnRecord == null && !reachedEnd());
>>>         return returnRecord;
>>>     }
>>> }
>>>
>>> Thanks,
>>> Yassine
>>>
>>>
>>>
>>>
>>>
>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>>
>>>> How about just overriding the "readLine()" method to call
>>>> "super.readLine()" and catching EOF exceptions?
>>>>
>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Yassine,
>>>>>
>>>>> AFAIK, there is no built-in way to ignore corrupted compressed files.
>>>>> You could try to implement a FileInputFormat that wraps the
>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>>>> The wrapper would also catch and ignore the EOFException.
>>>>>
>>>>> If you do that, you would not be able to use the env.readCsvFile()
>>>>> shortcut but would need to create an instance of your own InputFormat and
>>>>> add it with
>>>>> env.readFile(yourIF).
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <
>>>>> y.marzou...@mindlytix.com>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I am reading a large number of GZip compressed csv files, nested in a
>>>>>> HDFS directory:
>>>>>>
>>>>>> Configuration parameters = new Configuration();
>>>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>>>> d/logs/")
>>>>>>                 .ignoreFirstLine()
>>>>>>                 .fieldDelimiter("|")
>>>>>>                 .includeFields("011000")
>>>>>>                 .types(String.class, Long.class)
>>>>>>                 .withParameters(parameters);
>>>>>>
>>>>>> My job is failing with the following exception:
>>>>>>
>>>>>> 2016-10-04 17:19:59,933 INFO  
>>>>>> org.apache.flink.runtime.jobmanager.JobManager                - Status 
>>>>>> of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>>>>
>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>>>
>>>>>>  at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>>>
>>>>>>  at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>>>
>>>>>>  at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>>>
>>>>>>  at 
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>>>
>>>>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>>>
>>>>>>  at java.lang.Thread.run(Unknown Source)
>>>>>>
>>>>>> I think it is due to some unproperly compressed files, how can I handle 
>>>>>> and ignore such exceptions? Thanks.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Yassine
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to