Seems to be related to this JIRA : https://issues.apache.org/jira/browse/SPARK-3612 ?
On Tue, Jun 9, 2015 at 7:39 AM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Hi Snehal > > Are you running the latest kafka consumer from github/spark-packages ? If > not can you take the latest changes. This low level receiver will make > attempt to keep trying if underlying BlockManager gives error. Are you see > those retry cycle in log ? If yes then there is issue writing blocks to > blockmanager and spark not able to recover from this failure but Receivet > keep trying .. > > Which version of Spark you are using ? > > Dibyendu > On Jun 9, 2015 5:14 AM, "Snehal Nagmote" <nagmote.sne...@gmail.com> wrote: > >> All, >> >> I am using Kafka Spark Consumer >> https://github.com/dibbhatt/kafka-spark-consumer in spark streaming >> job . >> >> After spark streaming job runs for few hours , all executors exit and I >> still see status of application on SPARK UI as running >> >> Does anyone know cause of this exception and how to fix this ? >> >> >> WARN [sparkDriver-akka.actor.default-dispatcher-17:Logging$class@71] - >> Error reported by receiver for stream 7: Error While Store for Partition >> Partition{host=dal-kafka-broker01.bfd.walmart.com:9092, partition=27} - >> org.apache.spark.SparkException: Error sending message [message = >> UpdateBlockInfo(BlockManagerId(2, dfw-searcher.com, >> 33621),input-7-1433793457165,StorageLevel(false, true, false, false, >> 1),10492,0,0)] >> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) >> at >> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) >> at >> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) >> at >> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) >> at >> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:812) >> at >> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) >> at >> org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:71) >> at >> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:161) >> at >> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushIterator(ReceiverSupervisorImpl.scala:136) >> at >> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:152) >> at consumer.kafka.PartitionManager.next(PartitionManager.java:215) >> at consumer.kafka.KafkaConsumer.createStream(KafkaConsumer.java:75) >> at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:108) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.util.concurrent.TimeoutException: Futures timed out after >> [30 seconds] >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at >> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >> at >> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >> at scala.concurrent.Await$.result(package.scala:107) >> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >> >> ... 14 more WARN >> [sparkDriver-akka.actor.default-dispatcher-30:Logging$class@92] - Error >> sending message [message = UpdateBlockInfo(BlockManagerId(<driver>, >> dfw-searcher.com, 57286),broadcast_10665_piece0,StorageLevel(false, false, >> false, false, 1),0,0,0)] in 2 attempts >> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at >> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >> at >> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) >> at >> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) >> at >> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) >> at scala.concurrent.Await$.result(package.scala:107) >> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >> at >> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) >> at >> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) >> at >> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:384) >> at >> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:360) >> at >> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1104) >> at >> org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081) >> at >> org.apache.spark.storage.BlockManager$$anonfun$removeBroadcast$2.apply(BlockManager.scala:1081) >> at scala.collection.immutable.Set$Set2.foreach(Set.scala:94) >> at >> org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1081) >> at >> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcI$sp(BlockManagerSlaveActor.scala:63) >> at >> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63) >> at >> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply(BlockManagerSlaveActor.scala:63) >> at >> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> Thanks, >> >> Snehal >> >