Hi Yassine, AFAIK, there is no built-in way to ignore corrupted compressed files. You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF. The wrapper would also catch and ignore the EOFException.
If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with env.readFile(yourIF). Hope this helps, Fabian 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi all, > > I am reading a large number of GZip compressed csv files, nested in a HDFS > directory: > > Configuration parameters = new Configuration(); > parameters.setBoolean("recursive.file.enumeration", true); > DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:/// > shared/logs/") > .ignoreFirstLine() > .fieldDelimiter("|") > .includeFields("011000") > .types(String.class, Long.class) > .withParameters(parameters); > > My job is failing with the following exception: > > 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) > changed to FAILING. > > java.io.EOFException: Unexpected end of ZLIB input stream > > at java.util.zip.InflaterInputStream.fill(Unknown Source) > > at java.util.zip.InflaterInputStream.read(Unknown Source) > > at java.util.zip.GZIPInputStream.read(Unknown Source) > > at > org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) > > at > org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) > > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) > > at java.lang.Thread.run(Unknown Source) > > I think it is due to some unproperly compressed files, how can I handle and > ignore such exceptions? Thanks. > > > Best, > Yassine > >