Hi TD,

Thanks for the response. Since you mentioned GC, this got me thinking.

Given that we are running in local mode (all in a single JVM) for now, does
the option "spark.executor.extraJavaOptions" set to
"-XX:+UseConcMarkSweepGC" inside SparkConf object take effect at all before
we use it to create the StreamingContext? I ask because that is what we are
doing right now. If not, perhaps we have not been running with the
Concurrent Mark Sweep at all and is that recommended instead of forcing GC
periodically?

Thanks
NB


On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das <t...@databricks.com> wrote:

> There are a couple of options. Increase timeout (see Spark configuration).
>
> Also see past mails in the mailing list.
>
> Another option you may try (I have gut feeling that may work, but I am not
> sure) is calling GC on the driver periodically. The cleaning up of stuff is
> tied to GCing of RDD objects and regular cleaning may help keep things
> clean more rigorously rather than in unpredictable bursts of GC activity.
>
> Let us know how it works out.
>
> TD
>
> On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal <nb.nos...@gmail.com> wrote:
>
>> I have a standalone and local Spark streaming process where we are
>> reading inputs using FlumeUtils. Our longest window size is 6 hours. After
>> about a day and a half of running without any issues, we start seeing
>> Timeout errors while cleaning up input blocks. This seems to cause reading
>> from Flume to cease.
>>
>>
>> ERROR sparkDriver-akka.actor.default-dispatcher-78
>> BlockManagerSlaveActor.logError - Error in removing block
>> input-0-1428182594000
>> org.apache.spark.SparkException: Error sending message [message =
>> UpdateBlockInfo(BlockManagerId(<driver>, localhost,
>> 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
>> 1),0,0,0)]
>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
>> at
>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>> at
>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>> at org.apache.spark.storage.BlockManager.org
>> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
>> at
>> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
>> at
>> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
>> at
>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
>> at
>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
>> at
>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
>> at
>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [30 seconds]
>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> at
>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
>> at
>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
>> at scala.concurrent.Await$.result(package.scala:107)
>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
>> ... 17 more
>>
>> There was a similar query posted here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
>> but did not find any resolution to that issue.
>>
>>
>> Thanks in advance,
>> NB
>>
>>
>

Reply via email to