Anyone?
On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra <
[email protected]> wrote:
> Hi Olivier,
>
> *the update function is as below*:
>
> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
> Long)]) => {*
> * val previousCount = state.getOrElse((0L, 0L))._2*
> * var startValue: IConcurrentUsers = ConcurrentViewers(0)*
> * var currentCount = 0L*
> * val lastIndexOfConcurrentUsers =*
> * values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])*
> * val subList = values.slice(0, lastIndexOfConcurrentUsers)*
> * val currentCountFromSubList = subList.foldLeft(startValue)(_ op
> _).count + previousCount*
> * val lastConcurrentViewersCount =
> values(lastIndexOfConcurrentUsers).count*
>
> * if (math.abs(lastConcurrentViewersCount - currentCountFromSubList)
> >= 1) {*
> * logger.error(*
> * s"Count using state updation $currentCountFromSubList, " +*
> * s"ConcurrentUsers count $lastConcurrentViewersCount" +*
> * s" resetting to $lastConcurrentViewersCount"*
> * )*
> * currentCount = lastConcurrentViewersCount*
> * }*
> * val remainingValuesList = values.diff(subList)*
> * startValue = ConcurrentViewers(currentCount)*
> * currentCount = remainingValuesList.foldLeft(startValue)(_ op
> _).count*
>
> * if (currentCount < 0) {*
>
> * logger.error(*
> * s"ERROR: Got new count $currentCount < 0, value:$values,
> state:$state, resetting to 0"*
> * )*
> * currentCount = 0*
> * }*
> * // to stop pushing subsequent 0 after receiving first 0*
> * if (currentCount == 0 && previousCount == 0) None*
> * else Some(previousCount, currentCount)*
> * }*
>
> *trait IConcurrentUsers {*
> * val count: Long*
> * def op(a: IConcurrentUsers): IConcurrentUsers =
> IConcurrentUsers.op(this, a)*
> *}*
>
> *object IConcurrentUsers {*
> * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers =
> (a, b) match {*
> * case (_, _: ConcurrentViewers) => *
> * ConcurrentViewers(b.count)*
> * case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => *
> * ConcurrentViewers(a.count + b.count)*
> * case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => *
> * ConcurrentViewers(a.count - b.count)*
> * }*
> *}*
>
> *case class IncrementConcurrentViewers(count: Long) extends
> IConcurrentUsers*
> *case class DecrementConcurrentViewers(count: Long) extends
> IConcurrentUsers*
> *case class ConcurrentViewers(count: Long) extends IConcurrentUsers*
>
>
> *also the error stack trace copied from executor logs is:*
>
> *java.lang.OutOfMemoryError: Java heap space*
> * at
> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)*
> * at
> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)*
> * at
> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)*
> * at
> org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)*
> * at
> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)*
> * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
> * at
> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)*
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> * at java.lang.reflect.Method.invoke(Method.java:601)*
> * at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
> * at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
> * at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> * at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> * at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)*
> * at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)*
> * at
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)*
> * at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)*
> * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)*
> * at
> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)*
> * at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)*
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> * at java.lang.reflect.Method.invoke(Method.java:601)*
> * at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)*
> * at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)*
> * at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> * at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> * at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)*
> * at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)*
> * at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)*
> * at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)*
> *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught
> exception in thread Thread[Executor task launch worker-1,5,main]*
>
>
>
> On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <[email protected]>
> wrote:
>
>> Hi Sourav,
>> Can you post your updateFunc as well please ?
>>
>> Regards,
>>
>> Olivier.
>>
>> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra <
>> [email protected]> a écrit :
>>
>>> Hi,
>>>
>>> We are building a spark streaming application which reads from kafka,
>>> does updateStateBykey based on the received message type and finally stores
>>> into redis.
>>>
>>> After running for few seconds the executor process get killed by
>>> throwing OutOfMemory error.
>>>
>>> The code snippet is below:
>>>
>>>
>>> *NoOfReceiverInstances = 1*
>>>
>>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
>>> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
>>> *)*
>>> *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
>>> Long)]) => {...}*
>>>
>>>
>>> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*
>>>
>>>
>>>
>>> *object RedisHelper {*
>>> * private val client = scredis.Redis(*
>>> *
>>> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
>>> * )*
>>>
>>> * def update(**itr: Iterator[(String, (Long, Long))]) {*
>>> * // redis save operation*
>>> * }*
>>>
>>> *}*
>>>
>>>
>>> *Below is the spark configuration:*
>>>
>>>
>>> * spark.app.name <http://spark.app.name> = "XXXXXXX"*
>>> * spark.jars = "xxxx.jar"*
>>> * spark.home = "/spark-1.1.1-bin-hadoop2.4"*
>>> * spark.executor.memory = 1g*
>>> * spark.streaming.concurrentJobs = 1000*
>>> * spark.logConf = true*
>>> * spark.cleaner.ttl = 3600 //in milliseconds*
>>> * spark.default.parallelism = 12*
>>> * spark.executor.extraJavaOptions = "-Xloggc:gc.log
>>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
>>> -XX:+HeapDumpOnOutOfMemoryError"*
>>> * spark.executor.logs.rolling.strategy = "size"*
>>> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
>>> * spark.executor.logs.rolling.maxRetainedFiles = 10*
>>> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"*
>>> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"*
>>>
>>>
>>> other configurations are below
>>>
>>> *streaming {*
>>> * // All streaming context related configs should come here*
>>> * batch-duration = "1 second"*
>>> * checkpoint-directory = "/tmp"*
>>> * checkpoint-duration = "10 seconds"*
>>> * slide-duration = "1 second"*
>>> * window-duration = "1 second"*
>>> * partitions-for-shuffle-task = 32*
>>> * }*
>>> * kafka {*
>>> * no-of-receivers = 1*
>>> * zookeeper-quorum = "xxxx:2181"*
>>> * consumer-group = "xxxxx"*
>>> * topic = "xxxxx:2"*
>>> * }*
>>>
>>> We tried different combinations like
>>> - with spark 1.1.0 and 1.1.1.
>>> - by increasing executor memory
>>> - by changing the serialization strategy (switching between kryo and
>>> normal java)
>>> - by changing broadcast strategy (switching between http and torrent
>>> broadcast)
>>>
>>>
>>> Can anyone give any insight what we are missing here? How can we fix
>>> this?
>>>
>>> Due to akka version mismatch with some other libraries we cannot upgrade
>>> the spark version.
>>>
>>> Thanks,
>>> --
>>>
>>> Sourav Chandra
>>>
>>> Senior Software Engineer
>>>
>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>
>>> [email protected]
>>>
>>> o: +91 80 4121 8723
>>>
>>> m: +91 988 699 3746
>>>
>>> skype: sourav.chandra
>>>
>>> Livestream
>>>
>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>>> Block, Koramangala Industrial Area,
>>>
>>> Bangalore 560034
>>>
>>> www.livestream.com
>>>
>>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> [email protected]
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>
--
Sourav Chandra
Senior Software Engineer
· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
[email protected]
o: +91 80 4121 8723
m: +91 988 699 3746
skype: sourav.chandra
Livestream
"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,
Bangalore 560034
www.livestream.com