Thank you Fabian and Stephan for the suggestions.
I couldn't override "readLine()" because it's final, so went with Fabian's
solution, but I'm struggling with csv field masks. Any help is appreciated.
I created an Input Format which is basically TupleCsvInputFormat for which
I overrode the nextRecord() method to catch the exceptions. But I'm having
a *java.lang.ArrayIndexOutOfBoundsException*  when I add a boolean[]{true,
false, true} field mask . If I add a int[]{1,0,1} field mask, the job
succeeds but outputs the first and second columns. Here is my code:

TupleTypeInfo<Tuple2<String, String>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
Path histPath = new Path("hdfs:///shared/file.csv");

CsvInputFormat <Tuple2<String, String>> myInputFormt = new
MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
myInputFormt.enableQuotedStringParsing('"');
myInputFormt.setSkipFirstLineAsHeader(true);
myInputFormt.setLenient(true);

DataSet<Tuple2<String, String>> test =
env.createInput(myInputFormt,typeInfo).withParameters(parameters);
test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);

and here is the  custom input format:

public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
    private static final long serialVersionUID = 1L;
    private TupleSerializerBase<OUT> tupleSerializer;
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
tupleTypeInfo) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
tupleTypeInfo);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
        this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
createDefaultMask(tupleTypeInfo.getArity()));
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
tupleTypeInfo, int[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
includedFieldsMask) {
        super(filePath);
        boolean[] mask = (includedFieldsMask == null)
                ? createDefaultMask(tupleTypeInfo.getArity())
                : toBooleanMask(includedFieldsMask);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
    }
    public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
tupleTypeInfo, boolean[] includedFieldsMask) {
        this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
tupleTypeInfo, includedFieldsMask);
    }
    public MyCsvInputFormat(Path filePath, String lineDelimiter, String
fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
includedFieldsMask) {
        super(filePath);
        configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
includedFieldsMask);
    }
    private void configure(String lineDelimiter, String fieldDelimiter,
                           TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
includedFieldsMask) {
        if (tupleTypeInfo.getArity() == 0) {
            throw new IllegalArgumentException("Tuple size must be greater
than 0.");
        }
        if (includedFieldsMask == null) {
            includedFieldsMask =
createDefaultMask(tupleTypeInfo.getArity());
        }
        tupleSerializer = (TupleSerializerBase<OUT>)
tupleTypeInfo.createSerializer(new ExecutionConfig());
        setDelimiter(lineDelimiter);
        setFieldDelimiter(fieldDelimiter);
        Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
        for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
            classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
        }
        setFieldsGeneric(includedFieldsMask, classes);
    }
    @Override
    public OUT fillRecord(OUT reuse, Object[] parsedValues) {
        return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
    }

    @Override
    public OUT nextRecord(OUT record) {
        OUT returnRecord = null;
        do {
            try {
                returnRecord = super.nextRecord(record);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } while (returnRecord == null && !reachedEnd());
        return returnRecord;
    }
}

Thanks,
Yassine





2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>:

> How about just overriding the "readLine()" method to call
> "super.readLine()" and catching EOF exceptions?
>
> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Yassine,
>>
>> AFAIK, there is no built-in way to ignore corrupted compressed files.
>> You could try to implement a FileInputFormat that wraps the
>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>> The wrapper would also catch and ignore the EOFException.
>>
>> If you do that, you would not be able to use the env.readCsvFile()
>> shortcut but would need to create an instance of your own InputFormat and
>> add it with
>> env.readFile(yourIF).
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Hi all,
>>>
>>> I am reading a large number of GZip compressed csv files, nested in a
>>> HDFS directory:
>>>
>>> Configuration parameters = new Configuration();
>>> parameters.setBoolean("recursive.file.enumeration", true);
>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>> d/logs/")
>>>                 .ignoreFirstLine()
>>>                 .fieldDelimiter("|")
>>>                 .includeFields("011000")
>>>                 .types(String.class, Long.class)
>>>                 .withParameters(parameters);
>>>
>>> My job is failing with the following exception:
>>>
>>> 2016-10-04 17:19:59,933 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                - Status of 
>>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>
>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>
>>>     at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>
>>>     at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>
>>>     at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>
>>>     at 
>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>
>>>     at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>
>>>     at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>
>>>     at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>
>>>     at 
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>
>>>     at 
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>
>>>     at java.lang.Thread.run(Unknown Source)
>>>
>>> I think it is due to some unproperly compressed files, how can I handle and 
>>> ignore such exceptions? Thanks.
>>>
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>
>

Reply via email to