You can do something like this:

val fStream = ssc.textFileStream("/sigmoid/data/")
.map(x => {
try{
//Move all the transformations within a try..catch

}catch{
case e: Exception => { logError("Whoops!! "); null }

}

})

Thanks
Best Regards

On Mon, Aug 10, 2015 at 7:44 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

> Hey Sparkers,
> I would like to use Spark Streaming in production to observe a directory
> and process files that are put inside it. The problem is that some of those
> files can be broken leading to a IOException from the input reader. This
> should be fine for the framework I think: the exception should be caught by
> Spark Streaming and logged somewhere and the file causing the problem
> should be skipped. Instead, when the exception is thrown the job is aborted
> with error and no other files are processed. Ideally I would like to have
> my Spark Streaming job to run forever and if something is not readable, to
> just log it but stay alive. How can I achieve this?
>
> The stack of the errors that kill my job is similar to
>
> 15/08/09 23:42:27 ERROR o.a.s.e.Executor Exception in task 823.0 in stage 0.0 
> (TID 823)
> java.io.IOException: unexpected end of stream
>         at 
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:971)
>         at 
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506)
>         at 
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335)
>         at 
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425)
>         at 
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485)
>         at java.io.InputStream.read(InputStream.java:101)
>         at 
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
>         at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>         at 
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
>         at 
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:185)
>         at 
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
>         at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at 
> org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
>         at 
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>         at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>         at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> Thank you,
> Mario
>

Reply via email to