Recently had an incident reported to me where somebody was analysing a 
directory of gzipped log files, and was struggling to load them into spark 
because one of the files was corrupted - calling 
sc.textFiles('hdfs:///logs/*.gz') caused an IOException on the particular 
executor that was reading that file, which caused the entire job to be 
cancelled after the retry count was exceeded, without any way of catching and 
recovering from the error.  While normally I think it is entirely appropriate 
to stop execution if something is wrong with your input, sometimes it is useful 
to analyse what you can get (as long as you are aware that input has been 
skipped), and treat corrupt files as acceptable losses.
To cater for this particular case I've added SPARK-6593 (PR at 
https://github.com/apache/spark/pull/5250). Which adds an option 
(spark.hadoop.ignoreInputErrors) to log exceptions raised by the hadoop Input 
format, but to continue on with the next task.
Ideally in this case you would want to report the corrupt file paths back to 
the master so they could be dealt with in a particular way (eg moved to a 
separate directory), but that would require a public API change/addition. I was 
pondering on an addition to Spark's hadoop API that could report processing 
status back to the master via an optional accumulator that collects 
filepath/Option(exception message) tuples so the user has some idea of what 
files are being processed, and what files are being skipped.
Regards,Dale.                                     

Reply via email to