Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something.
BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > Hi, > > Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: > > num #instances #bytes class name > ---------------------------------------------- > 1: 40802 145083848 [B > 2: 99264 12716112 <methodKlass> > 3: 99264 12291480 <constMethodKlass> > 4: 8472 9144816 <constantPoolKlass> > 5: 8472 7625192 <instanceKlassKlass> > 6: 186 6097824 > [Lscala.concurrent.forkjoin.ForkJoinTask; > 7: 7045 4804832 <constantPoolCacheKlass> > 8: 139168 4453376 java.util.HashMap$Entry > 9: 9427 3542512 <methodDataKlass> > 10: 141312 3391488 > io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry > 11: 135491 3251784 java.lang.Long > 12: 26192 2765496 [C > 13: 813 1140560 [Ljava.util.HashMap$Entry; > 14: 8997 1061936 java.lang.Class > 15: 16022 851384 [[I > 16: 16447 789456 java.util.zip.Inflater > 17: 13855 723376 [S > 18: 17282 691280 java.lang.ref.Finalizer > 19: 25725 617400 java.lang.String > 20: 320 570368 > [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; > 21: 16066 514112 > java.util.concurrent.ConcurrentHashMap$HashEntry > 22: 12288 491520 > org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment > 23: 13343 426976 > java.util.concurrent.locks.ReentrantLock$NonfairSync > 24: 12288 396416 > [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; > 25: 16447 394728 java.util.zip.ZStreamRef > 26: 565 370080 [I > 27: 508 272288 <objArrayKlassKlass> > 28: 16233 259728 java.lang.Object > 29: 771 209232 > [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; > 30: 2524 192312 [Ljava.lang.Object; > > But as I mentioned above, the heap memory seems OK, the extra memory is > consumed by some off-heap data. I can't find a way to figure out what is in > there. > > Besides, I did some extra experiments, i.e. run the same program in > difference environments to test whether it has off-heap memory issue: > > spark1.0 + standalone = no > spark1.0 + yarn = no > spark1.3 + standalone = no > spark1.3 + yarn = yes > > I'm using CDH5.1, so the spark1.0 is provided by cdh, and > spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. > > I could use spark1.0 + yarn, but I can't find a way to handle the logs, > level and rolling, so it'll explode the harddrive. > > Currently I'll stick to spark1.0 + standalone, until our ops team decides > to upgrade cdh. > > > > On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> wrote: > >> While you are running is it possible for you login into the YARN node and >> get histograms of live objects using "jmap -histo:live". That may reveal >> something. >> >> >> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >> >>> Hi, >>> >>> Unfortunately, they're still growing, both driver and executors. >>> >>> I run the same job with local mode, everything is fine. >>> >>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Can you replace your counting part with this? >>>> >>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>>> >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I wrote a simple test job, it only does very basic operations. for >>>>> example: >>>>> >>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>> Map(topic -> 1)).map(_._2) >>>>> val logs = lines.flatMap { line => >>>>> try { >>>>> Some(parse(line).extract[Impression]) >>>>> } catch { >>>>> case _: Exception => None >>>>> } >>>>> } >>>>> >>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>> rdd.foreachPartition { iter => >>>>> iter.foreach(count => logger.info(count.toString)) >>>>> } >>>>> } >>>>> >>>>> It receives messages from Kafka, parse the json, filter and count the >>>>> records, and then print it to logs. >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <ak...@sigmoidanalytics.com >>>>> > wrote: >>>>> >>>>>> Hi Zhang, >>>>>> >>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>> inside the code to fill up memory. >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>> >>>>>>> >>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Are you using the createStream or createDirectStream api? If its >>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it >>>>>>>> might slow things down though). Another way would be to try the later >>>>>>>> one. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Best Regards >>>>>>>> >>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Akhil, >>>>>>>>> >>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, >>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling Delay, >>>>>>>>> so I >>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. >>>>>>>>> Or >>>>>>>>> maybe it is, but how to tell? >>>>>>>>> >>>>>>>>> I googled about how to check the off-heap memory usage, there's a >>>>>>>>> tool called pmap, but I don't know how to interprete the results. >>>>>>>>> >>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>> >>>>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit >>>>>>>>>> then you can see all JVM params. Are you using the highlevel consumer >>>>>>>>>> (receiver based) for receiving data from Kafka? In that case if your >>>>>>>>>> throughput is high and the processing delay exceeds batch interval >>>>>>>>>> then you >>>>>>>>>> will hit this memory issues as the data will keep on receiving and is >>>>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but >>>>>>>>>> it slows >>>>>>>>>> things down). Another alternate will be to use the lowlevel >>>>>>>>>> kafka consumer <https://github.com/dibbhatt/kafka-spark-consumer> >>>>>>>>>> or to use the non-receiver based directStream >>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>> that comes up with spark. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. >>>>>>>>>>> I find out that YARN is killing the driver and executor process >>>>>>>>>>> because of >>>>>>>>>>> excessive use of memory. Here's something I tried: >>>>>>>>>>> >>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), >>>>>>>>>>> so the extra memory is not used by heap. >>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is 384), >>>>>>>>>>> but the memory just keeps growing and then hits the limit. >>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>>>>>>> standalone mode. >>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>>>>>> 'leak'. >>>>>>>>>>> >>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>> >>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>> >>>>>>>>>>> Thanks. >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Jerry >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jerry >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Jerry >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Jerry >>>>> >>>> >>>> >>> >>> >>> -- >>> Jerry >>> >> > > > -- > Jerry >