Thank you Fabian and Stephan for the suggestions. I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated. I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a *java.lang.ArrayIndexOutOfBoundsException* when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code:
TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); Path histPath = new Path("hdfs:///shared/file.csv"); CsvInputFormat <Tuple2<String, String>> myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); myInputFormt.enableQuotedStringParsing('"'); myInputFormt.setSkipFirstLineAsHeader(true); myInputFormt.setLenient(true); DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters); test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); and here is the custom input format: public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { private static final long serialVersionUID = 1L; private TupleSerializerBase<OUT> tupleSerializer; public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] includedFieldsMask) { super(filePath); boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(tupleTypeInfo.getArity()) : toBooleanMask(includedFieldsMask); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { super(filePath); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); } private void configure(String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) { if (tupleTypeInfo.getArity() == 0) { throw new IllegalArgumentException("Tuple size must be greater than 0."); } if (includedFieldsMask == null) { includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity()); } tupleSerializer = (TupleSerializerBase<OUT>) tupleTypeInfo.createSerializer(new ExecutionConfig()); setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; for (int i = 0; i < tupleTypeInfo.getArity(); i++) { classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); } setFieldsGeneric(includedFieldsMask, classes); } @Override public OUT fillRecord(OUT reuse, Object[] parsedValues) { return tupleSerializer.createOrReuseInstance(parsedValues, reuse); } @Override public OUT nextRecord(OUT record) { OUT returnRecord = null; do { try { returnRecord = super.nextRecord(record); } catch (IOException e) { e.printStackTrace(); } } while (returnRecord == null && !reachedEnd()); return returnRecord; } } Thanks, Yassine 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: > How about just overriding the "readLine()" method to call > "super.readLine()" and catching EOF exceptions? > > On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Yassine, >> >> AFAIK, there is no built-in way to ignore corrupted compressed files. >> You could try to implement a FileInputFormat that wraps the >> CsvInputFormat and forwards all calls to the wrapped CsvIF. >> The wrapper would also catch and ignore the EOFException. >> >> If you do that, you would not be able to use the env.readCsvFile() >> shortcut but would need to create an instance of your own InputFormat and >> add it with >> env.readFile(yourIF). >> >> Hope this helps, >> Fabian >> >> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: >> >>> Hi all, >>> >>> I am reading a large number of GZip compressed csv files, nested in a >>> HDFS directory: >>> >>> Configuration parameters = new Configuration(); >>> parameters.setBoolean("recursive.file.enumeration", true); >>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>> d/logs/") >>> .ignoreFirstLine() >>> .fieldDelimiter("|") >>> .includeFields("011000") >>> .types(String.class, Long.class) >>> .withParameters(parameters); >>> >>> My job is failing with the following exception: >>> >>> 2016-10-04 17:19:59,933 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - Status of >>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>> >>> java.io.EOFException: Unexpected end of ZLIB input stream >>> >>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>> >>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>> >>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>> >>> at >>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>> >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>> >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>> >>> at >>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>> >>> at >>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>> >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>> >>> at java.lang.Thread.run(Unknown Source) >>> >>> I think it is due to some unproperly compressed files, how can I handle and >>> ignore such exceptions? Thanks. >>> >>> >>> Best, >>> Yassine >>> >>> >> >