I have a standalone and local Spark streaming process where we are reading
inputs using FlumeUtils. Our longest window size is 6 hours. After about a
day and a half of running without any issues, we start seeing Timeout
errors while cleaning up input blocks. This seems to cause reading from
Flume to cease.


ERROR sparkDriver-akka.actor.default-dispatcher-78
BlockManagerSlaveActor.logError - Error in removing block
input-0-1428182594000
org.apache.spark.SparkException: Error sending message [message =
UpdateBlockInfo(BlockManagerId(<driver>, localhost,
55067),input-0-1428182594000,StorageLevel(false, false, false, false,
1),0,0,0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
at
org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
at
org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
at
org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
at
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
... 17 more

There was a similar query posted here
http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
but did not find any resolution to that issue.


Thanks in advance,
NB

Reply via email to