What is the best way to fail the application when job gets aborted?

On Wed, Oct 14, 2015 at 1:27 PM, Tathagata Das <t...@databricks.com> wrote:

> When a job gets aborted, it means that the internal tasks were retried a
> number of times before the system gave up. You can control the number
> retries (see Spark's configuration page). The job by default does not get
> resubmitted.
>
> You could try getting the logs of the failed executor, to see what caused
> the failure. Could be a memory limit issue, and YARN killing it somehow.
>
>
>
> On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie <sparknewbie1...@gmail.com>
> wrote:
>
>> Is it slowing things down or blocking progress.
>> >> I didn't see slowing of processing, but I do see jobs aborted
>> consecutively for a period of 18 batches (5 minute batch intervals). So I
>> am worried about what happened to the records that these jobs were
>> processing.
>> Also, one more thing to mention is that the
>> StreamingListenerBatchCompleted.numRecords information shows all
>> received records as processed even if the batch/job failed. The processing
>> time as well shows as the same time it takes for a successful batch.
>> It seems like it is the numRecords which was the input to the batch
>> regardless of whether they were successfully processed or not.
>>
>> On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com
>> > wrote:
>>
>>> I ran 2 different spark 1.5 clusters that have been running for more
>>> than a day now. I do see jobs getting aborted due to task retry's maxing
>>> out (default 4) due to ConnectionException. It seems like the executors die
>>> and get restarted and I was unable to find the root cause (same app code
>>> and conf used on spark 1.4.1 I don't see ConnectionException).
>>>
>>> Another question related to this, what happens to the kinesis records
>>> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
>>> am using) does the job gets resubmitted with the same received records? Or
>>> does the kinesis-asl library get those records again based on sequence
>>> numbers it tracks? It would good for me to understand the story around
>>> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
>>> jobs are aborted. Any pointers or quick explanation would be very helpful.
>>>
>>>
>>> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Is this happening too often? Is it slowing things down or blocking
>>>> progress. Failures once in a while is part of the norm, and the system
>>>> should take care of itself.
>>>>
>>>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <
>>>> sparknewbie1...@gmail.com> wrote:
>>>>
>>>>> Hi Spark users,
>>>>>
>>>>> I'm seeing the below exception in my spark streaming application. It
>>>>> happens in the first stage where the kinesis receivers receive records and
>>>>> perform a flatMap operation on the unioned Dstream. A coalesce step also
>>>>> happens as a part of that stage for optimizing the performance.
>>>>>
>>>>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When
>>>>> I look at the executor logs I do not see any exceptions indicating the 
>>>>> root
>>>>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
>>>>> that service go down.
>>>>>
>>>>> Any help debugging this problem will be helpful.
>>>>>
>>>>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
>>>>> beginning fetch of 1 outstanding blocks
>>>>> java.io.IOException: Failed to connect to
>>>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>>>>         at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>>>>         at
>>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>>>>         at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>>>>         at
>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>>>>         at
>>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>>>>         at
>>>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
>>>>>         at
>>>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
>>>>>         at
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>         at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579)
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>>>         at
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
>>>>>         at
>>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
>>>>>         at
>>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>>>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>         at
>>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>>>>         at
>>>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.net.ConnectException: Connection refused:
>>>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
>>>>>
>>>>> Thanks,
>>>>> Bharath
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to