Hello Todd,

Thank you for your suggestion! I have first tried increasing the Driver
memory to 2G and it worked without any problems, but I will also test with
the parameters and values you've shared.

Kind regards,
Emre Sevinç
http://www.bigindustries.be/


On Fri, Feb 20, 2015 at 3:25 PM, Todd Nist <tsind...@gmail.com> wrote:

> Hi Emre,
>
> Have you tried adjusting these:
>
> .set("spark.akka.frameSize", "500").set("spark.akka.askTimeout", 
> "30").set("spark.core.connection.ack.wait.timeout", "600")
>
> -Todd
>
> On Fri, Feb 20, 2015 at 8:14 AM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
>
>> Hello,
>>
>> We are building a Spark Streaming application that listens to a directory
>> on HDFS, and uses the SolrJ library to send newly detected files to a Solr
>> server. When we put 10.000 files to the directory it is listening to, it
>> starts to process them by sending the files to our Solr server but after
>> about a few thousand files the Spark Streaming application dies.
>>
>> Before the application dies, It gives some TimeoutException errors
>> related to Akka, such as:
>>
>>   util.AkkaUtils: Error sending message in 1 attempts
>>   java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>   akka.pattern.AskTimeoutException: Timed out
>>
>> Any ideas on how to deal with this? Should we add/change/tweak some Spark
>> configuration parameters?
>>
>> We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and
>> 2GB of memory to that application when invoking it via spark-submit command.
>>
>> Below you can read the last few lines of the log file, showing what our
>> Spark Streaming application logged just before it died:
>>
>>
>> 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split:
>> hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620
>> 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading
>> broadcast variable 3004
>> 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called
>> with curMem=31171148, maxMem=1111794647
>> 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0
>> stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB)
>> 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block
>> broadcast_3004_piece0
>> 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast
>> variable 3004 took 7897 ms
>> 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363)
>> called with curMem=31192144, maxMem=1111794647
>> 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored
>> as values in memory (estimated size 339.2 KB, free 1030.2 MB)
>> 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called
>> with curMem=31539507, maxMem=1111794647
>> 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as
>> bytes in memory (estimated size 2.6 KB, free 1030.2 MB)
>> 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block
>> rdd_3659_3
>> 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client,
>> config:maxConnections=128&maxConnectionsPerHost=32&followRedirects=false
>> 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called
>> with curMem=31542134, maxMem=1111794647
>> 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as
>> bytes in memory (estimated size 5.0 B, free 1030.2 MB)
>> 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block
>> rdd_3660_3
>> 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage
>> 245.0 (TID 3455). 2516 bytes result sent to driver
>> 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts
>> java.util.concurrent.TimeoutException: Futures timed out after [30
>> seconds]
>>     at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>     at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>     at
>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>     at scala.concurrent.Await$.result(package.scala:107)
>>     at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
>>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
>> 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver
>> Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] ->
>> [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated!
>> Shutting down.
>>
>> LogType: stdout
>> LogLength: 0
>> Log Contents:
>>
>>
>>
>> Container: container_1422006251277_0837_01_000004 on
>> node08.demo.hadoop_8041
>>
>> ==============================================================================
>> LogType: stderr
>> LogLength: 2952
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered
>> signal handlers for [TERM, HUP, INT]
>> 15/02/20 13:29:27 INFO spark.SecurityManager: Changing view acls to:
>> yarn,bjorn
>> 15/02/20 13:29:27 INFO spark.SecurityManager: Changing modify acls to:
>> yarn,bjorn
>> 15/02/20 13:29:27 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(yarn, bjorn); users with modify permissions: Set(yarn, bjorn)
>> 15/02/20 13:29:28 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 15/02/20 13:29:28 INFO Remoting: Starting remoting
>> 15/02/20 13:29:29 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://driverpropsfetc...@node08.demo.hadoop:60995]
>> 15/02/20 13:29:29 INFO Remoting: Remoting now listens on addresses:
>> [akka.tcp://driverpropsfetc...@node08.demo.hadoop:60995]
>> 15/02/20 13:29:29 INFO util.Utils: Successfully started service
>> 'driverPropsFetcher' on port 60995.
>> 15/02/20 13:29:59 WARN security.UserGroupInformation:
>> PriviledgedActionException as:bjorn (auth:SIMPLE)
>> cause:akka.pattern.AskTimeoutException: Timed out
>> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>>     at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1655)
>>     at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
>>     at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
>>     at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:161)
>>     at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
>> Caused by: akka.pattern.AskTimeoutException: Timed out
>>     at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>>     at akka.actor.Scheduler$$anon$11.run(Scheduler.scala:118)
>>     at
>> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>>     at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>>     at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:455)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.executeBucket$1(Scheduler.scala:407)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.nextTick(Scheduler.scala:411)
>>     at
>> akka.actor.LightArrayRevolverScheduler$$anon$12.run(Scheduler.scala:363)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Kind regards,
>>
>> Emre Sevinç
>> http://www.bigindustries.be/
>>
>>
>


-- 
Emre Sevinc

Reply via email to