Hi, To test the resiliency of Kafka Spark streaming, I killed the worker reading from Kafka Topic and noticed that the driver is unable to replace the worker and the job becomes a rogue job that keeps running doing nothing from that point on.
Is this a known issue? Are there any workarounds? Here is the exception that I see on the driver: 2014-08-21 03:43:22,163 [spark-akka.actor.default-dispatcher-16] WARN org.apache.spark.streaming.scheduler.ReceiverTracker - Error reported by receiver for stream 0: Error in block pushing thread - org.apache.spark.SparkException: Error sending message to BlockManagerMaster [message = org.apache.spark.storage.BlockManagerMessages$UpdateBlockInfo@5fc95796] at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:68) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:283) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:259) at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:866) at org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:267) at org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:256) at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:179) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:80) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:113) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:96) at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:139) at org.apache.spark.streaming.receiver.BlockGenerator.org $apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:112) at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:57) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:237) ... 18 more Thanks, Bharat