How much data do you have in Kafka? How is your max spout pending set? If you have a high max spout pending (or if you emit unanchored tuples) you could be using up a lot of memory. On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote:
> Hi, Nathan > > I have met a strange issue, when I set spoutConf.forceFromStart=true, it > will quickly run into GC overhead limit, even I already increase the heap > size, but I if I remove this setting > it will work fine, I was thinking maybe the kafkaSpout consuming data much > faster than the data being written into postgresDB, and data will quick > take the memory and causing heap overflow. But I did the same test on my > DEV cluster, it will working fine, even I set > spoutConf.forceFromStart=true. I check the storm config for DEV and > production, they are all same. > > Any hints? > > thanks > > AL > > > On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote: > >> I don't see anything glaring. I would try increasing heap size. It >> could be that you're right on the threshold of what you've allocated and >> you just need more memory. >> >> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: >> >>> Hi, All, >>> , >>> I kind locate where the problem come from, in my running command, I will >>> specify the clientid of TridentKafkaConfig, if I keep the clientid as the >>> one I used before, it will cause GC error, otherwise I am completely OK. >>> Here is the code: >>> >>> if (parameters.containsKey("clientid")) { >>> logger.info("topic=>" + parameters.get("clientid") + "/" + >>> parameters.get("topic")); >>> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >>> parameters.get("clientid")); >>> >>> Any idea about this error? >>> >>> >>> Thanks >>> >>> >>> AL >>> >>> >>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote: >>> >>>> Sorry, continue last thread: >>>> >>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address >>>> is not reachable. We will close this client Netty-Client-complicated-laugh/ >>>> 10.100.98.103:6703 >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>> Caused by: java.lang.RuntimeException: Remote address is not reachable. >>>> We will close this client Netty-Client-complicated-laugh/ >>>> 10.100.98.103:6703 >>>> at >>>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at backtype.storm.messaging.netty.Client.send(Client.java:194) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>> ~[storm-core-0.9.3.jar:0.9.3] >>>> ... 6 common frames omitted >>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async >>>> loop died!") >>>> java.lang.RuntimeException: ("Async loop died!") >>>> at >>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>>> [storm-core-0.9.3.jar:0.9.3] >>>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>>> [clojure-1.5.1.jar:na] >>>> at >>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>>> [storm-core-0.9.3.jar:0.9.3] >>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>>> [storm-core-0.9.3.jar:0.9.3] >>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker >>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703 >>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client >>>> Netty-Client-beloved-judge/10.100.98.104:6703 >>>> >>>> I doubt this is caused by my eventUpfater, which write data in batch >>>> >>>> static class EventUpdater implements ReducerAggregator<List<String>> { >>>> >>>> @Override >>>> public List<String> init(){ >>>> return null; >>>> } >>>> >>>> @Override >>>> public List<String> reduce(List<String> curr, TridentTuple >>>> tuple) { >>>> List<String> updated = null ; >>>> >>>> if ( curr == null ) { >>>> String event = (String) >>>> tuple.getValue(1); >>>> System.out.println("===:" + event + >>>> ":"); >>>> updated = Lists.newArrayList(event); >>>> } else { >>>> System.out.println("===+" + tuple + >>>> ":"); >>>> updated = curr ; >>>> } >>>> // System.out.println("(())"); >>>> return updated ; >>>> } >>>> } >>>> >>>> How do you think >>>> >>>> THanks >>>> >>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote: >>>> >>>>> Thank you very much for the reply, here is error I saw in production >>>>> server worker-6703.log, >>>>> >>>>> >>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> >>>>> wrote: >>>>> >>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the >>>>>> remote machines and run the tools over X or something. I'm assuming this >>>>>> is a development cluster (not live / production) and that installing >>>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>>> >>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Nathan I think that if he wants to profile a bolt per se that runs >>>>>>> in a worker that resides in a different cluster node than the one the >>>>>>> profiling tool runs he won't be able to attach the process since it >>>>>>> resides >>>>>>> in a different physical machine, me thinks (well, now that I think of it >>>>>>> better it can be done... via remote debugging but that's just a pain in >>>>>>> the >>>>>>> ***). >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> A. >>>>>>> >>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> You don't need to change your code. As Andrew mentioned you can get >>>>>>>> a lot of mileage by profiling your logic in a standalone program. For >>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long >>>>>>>> time >>>>>>>> is best) then attach to the running process with jvisualvm. It's >>>>>>>> pretty >>>>>>>> straightforward to use and you can also find good guides with a Google >>>>>>>> search. >>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Well... detecting memory leaks in Java is a bit tricky as Java >>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new" >>>>>>>>> operator and close any resources that you do not use you should be >>>>>>>>> fine... >>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the >>>>>>>>> whole >>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test >>>>>>>>> drive it. >>>>>>>>> I am pretty sure that you need a working jar (or compilable code that >>>>>>>>> has a >>>>>>>>> main function in it) in order to profile it, although if you want to >>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>>> algorithm >>>>>>>>> (or portions of it) can be put in a sample test program that is able >>>>>>>>> to be >>>>>>>>> executed locally for you to profile it. >>>>>>>>> >>>>>>>>> Hope this helped. Regards, >>>>>>>>> >>>>>>>>> A. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations >>>>>>>>>>> and data-structures you have; in my case for example I use more >>>>>>>>>>> than 2GB >>>>>>>>>>> since I need to keep a large matrix in memory... having said that, >>>>>>>>>>> in most >>>>>>>>>>> cases it should be relatively easy to estimate how much memory you >>>>>>>>>>> are >>>>>>>>>>> going to need and use that... or if that's not possible you can just >>>>>>>>>>> increase it and try the "set and see" approach. Check for memory >>>>>>>>>>> leaks as >>>>>>>>>>> well... (unclosed resources and so on...!) >>>>>>>>>>> >>>>>>>>>>> Regards. >>>>>>>>>>> >>>>>>>>>>> A. >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's >>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i >>>>>>>>>>>>> worker.childopts. >>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, All >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have been running a trident topology on production server, >>>>>>>>>>>>>> code is like this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new >>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new Fields("eventWord")) >>>>>>>>>>>>>> ; >>>>>>>>>>>>>> >>>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse >>>>>>>>>>>>>> to different field and write into postgresDB. But in storm UI, I >>>>>>>>>>>>>> did see such error, "java.lang.OutOfMemoryError: GC overhead >>>>>>>>>>>>>> limit exceeded". It all happens in same worker of each node - >>>>>>>>>>>>>> 6703. I understand this is because by default the JVM is >>>>>>>>>>>>>> configured to throw this error if you are spending more than >>>>>>>>>>>>>> *98% of the total time in GC and after the GC less than 2% of >>>>>>>>>>>>>> the heap is recovered*. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by >>>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>>> >>>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - 6700 >>>>>>>>>>>>>> >>>>>>>>>>>>>> - 6701 >>>>>>>>>>>>>> >>>>>>>>>>>>>> - 6702 >>>>>>>>>>>>>> >>>>>>>>>>>>>> - 6703 >>>>>>>>>>>>>> >>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>> >>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>> >>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>> >>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to >>>>>>>>>>>>>> overcome? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>>> >>>>>>>>>>>>>> AL >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
