Hi,

To test the resiliency of Kafka Spark streaming, I killed the worker
reading from Kafka Topic and noticed that the driver is unable to replace
the worker and the job becomes a rogue job that keeps running doing nothing
from that point on.

Is this a known issue?  Are there any workarounds?

Here is the exception that I see on the driver:

2014-08-21 03:43:22,163 [spark-akka.actor.default-dispatcher-16] WARN
 org.apache.spark.streaming.scheduler.ReceiverTracker - Error reported by
receiver for stream 0: Error in block pushing thread -
org.apache.spark.SparkException: Error sending message to
BlockManagerMaster [message =
org.apache.spark.storage.BlockManagerMessages$UpdateBlockInfo@5fc95796]
        at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:251)
        at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:68)
        at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:283)
        at
org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:259)
        at
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:866)
        at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:267)
        at
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:256)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:256)
        at
org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:179)
        at
org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:80)
        at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:663)
        at org.apache.spark.storage.BlockManager.put(BlockManager.scala:574)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:113)
        at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:96)
        at
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:139)
        at org.apache.spark.streaming.receiver.BlockGenerator.org
$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:112)
        at
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:57)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:237)
        ... 18 more

Thanks,
Bharat

Reply via email to