What is the best way to fail the application when job gets aborted? On Wed, Oct 14, 2015 at 1:27 PM, Tathagata Das <t...@databricks.com> wrote:
> When a job gets aborted, it means that the internal tasks were retried a > number of times before the system gave up. You can control the number > retries (see Spark's configuration page). The job by default does not get > resubmitted. > > You could try getting the logs of the failed executor, to see what caused > the failure. Could be a memory limit issue, and YARN killing it somehow. > > > > On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie <sparknewbie1...@gmail.com> > wrote: > >> Is it slowing things down or blocking progress. >> >> I didn't see slowing of processing, but I do see jobs aborted >> consecutively for a period of 18 batches (5 minute batch intervals). So I >> am worried about what happened to the records that these jobs were >> processing. >> Also, one more thing to mention is that the >> StreamingListenerBatchCompleted.numRecords information shows all >> received records as processed even if the batch/job failed. The processing >> time as well shows as the same time it takes for a successful batch. >> It seems like it is the numRecords which was the input to the batch >> regardless of whether they were successfully processed or not. >> >> On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie <sparknewbie1...@gmail.com >> > wrote: >> >>> I ran 2 different spark 1.5 clusters that have been running for more >>> than a day now. I do see jobs getting aborted due to task retry's maxing >>> out (default 4) due to ConnectionException. It seems like the executors die >>> and get restarted and I was unable to find the root cause (same app code >>> and conf used on spark 1.4.1 I don't see ConnectionException). >>> >>> Another question related to this, what happens to the kinesis records >>> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I >>> am using) does the job gets resubmitted with the same received records? Or >>> does the kinesis-asl library get those records again based on sequence >>> numbers it tracks? It would good for me to understand the story around >>> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when >>> jobs are aborted. Any pointers or quick explanation would be very helpful. >>> >>> >>> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Is this happening too often? Is it slowing things down or blocking >>>> progress. Failures once in a while is part of the norm, and the system >>>> should take care of itself. >>>> >>>> On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie < >>>> sparknewbie1...@gmail.com> wrote: >>>> >>>>> Hi Spark users, >>>>> >>>>> I'm seeing the below exception in my spark streaming application. It >>>>> happens in the first stage where the kinesis receivers receive records and >>>>> perform a flatMap operation on the unioned Dstream. A coalesce step also >>>>> happens as a part of that stage for optimizing the performance. >>>>> >>>>> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When >>>>> I look at the executor logs I do not see any exceptions indicating the >>>>> root >>>>> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did >>>>> that service go down. >>>>> >>>>> Any help debugging this problem will be helpful. >>>>> >>>>> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while >>>>> beginning fetch of 1 outstanding blocks >>>>> java.io.IOException: Failed to connect to >>>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>>>> at >>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >>>>> at >>>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >>>>> at >>>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >>>>> at >>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >>>>> at >>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >>>>> at >>>>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) >>>>> at >>>>> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) >>>>> at >>>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) >>>>> at >>>>> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593) >>>>> at >>>>> org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:579) >>>>> at >>>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:623) >>>>> at >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.net.ConnectException: Connection refused: >>>>> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684 >>>>> >>>>> Thanks, >>>>> Bharath >>>>> >>>>> >>>> >>> >> >