Hey Sparkers,
I would like to use Spark Streaming in production to observe a directory
and process files that are put inside it. The problem is that some of those
files can be broken leading to a IOException from the input reader. This
should be fine for the framework I think: the exception should be caught by
Spark Streaming and logged somewhere and the file causing the problem
should be skipped. Instead, when the exception is thrown the job is aborted
with error and no other files are processed. Ideally I would like to have
my Spark Streaming job to run forever and if something is not readable, to
just log it but stay alive. How can I achieve this?

The stack of the errors that kill my job is similar to

15/08/09 23:42:27 ERROR o.a.s.e.Executor Exception in task 823.0 in
stage 0.0 (TID 823)
java.io.IOException: unexpected end of stream
        at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:971)
        at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506)
        at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335)
        at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425)
        at 
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485)
        at java.io.InputStream.read(InputStream.java:101)
        at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
        at 
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
        at 
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:185)
        at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
        at 
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Mario

Reply via email to