Hi, Nathan I have met a strange issue, when I set spoutConf.forceFromStart=true, it will quickly run into GC overhead limit, even I already increase the heap size, but I if I remove this setting it will work fine, I was thinking maybe the kafkaSpout consuming data much faster than the data being written into postgresDB, and data will quick take the memory and causing heap overflow. But I did the same test on my DEV cluster, it will working fine, even I set spoutConf.forceFromStart=true. I check the storm config for DEV and production, they are all same.
Any hints? thanks AL On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote: > I don't see anything glaring. I would try increasing heap size. It could > be that you're right on the threshold of what you've allocated and you just > need more memory. > > On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: > >> Hi, All, >> , >> I kind locate where the problem come from, in my running command, I will >> specify the clientid of TridentKafkaConfig, if I keep the clientid as the >> one I used before, it will cause GC error, otherwise I am completely OK. >> Here is the code: >> >> if (parameters.containsKey("clientid")) { >> logger.info("topic=>" + parameters.get("clientid") + "/" + >> parameters.get("topic")); >> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >> parameters.get("clientid")); >> >> Any idea about this error? >> >> >> Thanks >> >> >> AL >> >> >> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote: >> >>> Sorry, continue last thread: >>> >>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address >>> is not reachable. We will close this client Netty-Client-complicated-laugh/ >>> 10.100.98.103:6703 >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>> Caused by: java.lang.RuntimeException: Remote address is not reachable. >>> We will close this client Netty-Client-complicated-laugh/ >>> 10.100.98.103:6703 >>> at >>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at backtype.storm.messaging.netty.Client.send(Client.java:194) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>> ~[storm-core-0.9.3.jar:0.9.3] >>> ... 6 common frames omitted >>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async >>> loop died!") >>> java.lang.RuntimeException: ("Async loop died!") >>> at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>> [storm-core-0.9.3.jar:0.9.3] >>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>> [clojure-1.5.1.jar:na] >>> at >>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>> [storm-core-0.9.3.jar:0.9.3] >>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>> [storm-core-0.9.3.jar:0.9.3] >>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker >>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703 >>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client >>> Netty-Client-beloved-judge/10.100.98.104:6703 >>> >>> I doubt this is caused by my eventUpfater, which write data in batch >>> >>> static class EventUpdater implements ReducerAggregator<List<String>> { >>> >>> @Override >>> public List<String> init(){ >>> return null; >>> } >>> >>> @Override >>> public List<String> reduce(List<String> curr, TridentTuple >>> tuple) { >>> List<String> updated = null ; >>> >>> if ( curr == null ) { >>> String event = (String) >>> tuple.getValue(1); >>> System.out.println("===:" + event + >>> ":"); >>> updated = Lists.newArrayList(event); >>> } else { >>> System.out.println("===+" + tuple + >>> ":"); >>> updated = curr ; >>> } >>> // System.out.println("(())"); >>> return updated ; >>> } >>> } >>> >>> How do you think >>> >>> THanks >>> >>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote: >>> >>>> Thank you very much for the reply, here is error I saw in production >>>> server worker-6703.log, >>>> >>>> >>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> >>>> wrote: >>>> >>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the >>>>> remote machines and run the tools over X or something. I'm assuming this >>>>> is a development cluster (not live / production) and that installing >>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>> >>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>> [email protected]> wrote: >>>>> >>>>>> Nathan I think that if he wants to profile a bolt per se that runs in >>>>>> a worker that resides in a different cluster node than the one the >>>>>> profiling tool runs he won't be able to attach the process since it >>>>>> resides >>>>>> in a different physical machine, me thinks (well, now that I think of it >>>>>> better it can be done... via remote debugging but that's just a pain in >>>>>> the >>>>>> ***). >>>>>> >>>>>> Regards, >>>>>> >>>>>> A. >>>>>> >>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> You don't need to change your code. As Andrew mentioned you can get >>>>>>> a lot of mileage by profiling your logic in a standalone program. For >>>>>>> jvisualvm, you can just run your program (a loop that runs for a long >>>>>>> time >>>>>>> is best) then attach to the running process with jvisualvm. It's pretty >>>>>>> straightforward to use and you can also find good guides with a Google >>>>>>> search. >>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> Well... detecting memory leaks in Java is a bit tricky as Java >>>>>>>> does a lot for you. Generally though, as long as you avoid using "new" >>>>>>>> operator and close any resources that you do not use you should be >>>>>>>> fine... >>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the >>>>>>>> whole >>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test >>>>>>>> drive it. >>>>>>>> I am pretty sure that you need a working jar (or compilable code that >>>>>>>> has a >>>>>>>> main function in it) in order to profile it, although if you want to >>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>> algorithm >>>>>>>> (or portions of it) can be put in a sample test program that is able >>>>>>>> to be >>>>>>>> executed locally for you to profile it. >>>>>>>> >>>>>>>> Hope this helped. Regards, >>>>>>>> >>>>>>>> A. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Unfortunately that is not fixed, it depends on the computations >>>>>>>>>> and data-structures you have; in my case for example I use more than >>>>>>>>>> 2GB >>>>>>>>>> since I need to keep a large matrix in memory... having said that, >>>>>>>>>> in most >>>>>>>>>> cases it should be relatively easy to estimate how much memory you >>>>>>>>>> are >>>>>>>>>> going to need and use that... or if that's not possible you can just >>>>>>>>>> increase it and try the "set and see" approach. Check for memory >>>>>>>>>> leaks as >>>>>>>>>> well... (unclosed resources and so on...!) >>>>>>>>>> >>>>>>>>>> Regards. >>>>>>>>>> >>>>>>>>>> A. >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <[email protected] >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite >>>>>>>>>>>> possible that this is not enough. Try increasing Xmx i >>>>>>>>>>>> worker.childopts. >>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, All >>>>>>>>>>>>> >>>>>>>>>>>>> I have been running a trident topology on production server, >>>>>>>>>>>>> code is like this: >>>>>>>>>>>>> >>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>> >>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new >>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new Fields("eventWord")) >>>>>>>>>>>>> ; >>>>>>>>>>>>> >>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>> >>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1); >>>>>>>>>>>>> >>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to >>>>>>>>>>>>> different field and write into postgresDB. But in storm UI, I did >>>>>>>>>>>>> see such error, "java.lang.OutOfMemoryError: GC overhead limit >>>>>>>>>>>>> exceeded". It all happens in same worker of each node - 6703. I >>>>>>>>>>>>> understand this is because by default the JVM is configured to >>>>>>>>>>>>> throw this error if you are spending more than *98% of the total >>>>>>>>>>>>> time in GC and after the GC less than 2% of the heap is >>>>>>>>>>>>> recovered*. >>>>>>>>>>>>> >>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by >>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>> >>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>> >>>>>>>>>>>>> - 6700 >>>>>>>>>>>>> >>>>>>>>>>>>> - 6701 >>>>>>>>>>>>> >>>>>>>>>>>>> - 6702 >>>>>>>>>>>>> >>>>>>>>>>>>> - 6703 >>>>>>>>>>>>> >>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>> >>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>> >>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>> >>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to >>>>>>>>>>>>> overcome? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>> >>>>>>>>>>>>> AL >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> >> >
