Have you profiled you spout / bolt logic as recommended earlier in this thread?
On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <[email protected]> wrote: > You are right , I have already increased the heap in yaml to 2 G for each > worker, but still have the issue, so I doubt I may running into some other > causes, receive,send buffer size? And in general, before I see the GC > overhead in storm ui, I came cross other errors in worker log as well, > like Netty connection, null pointer,etc, as I show in another post. > > Thanks > On Mar 9, 2015 5:36 PM, "Nathan Leung" <[email protected]> wrote: > >> I still think you should try running with a larger heap. :) Max spout >> pending determines how many tuples can be pending (tuple tree is not fully >> acked) per spout task. If you have many spout tasks per worker this can be >> a large amount of memory. It also depends on how big your tuples are. >> >> On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <[email protected]> wrote: >> >>> Hi, Nathan >>> >>> We have played around max spout pending in dev, if we set it as 10, it >>> is OK, but if we set it more than 50, GC overhead starts to come out. We >>> are finally writing tuples into postgresqlDB, the highest speed for writing >>> into DB is around 40Krecords/minute, which is supposed to be very slow, >>> maybe that is why tuples getting accumulated in memory before dumped into >>> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed >>> in the flight? >>> >>> thanks >>> >>> AL >>> >>> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <[email protected]> wrote: >>> >>>> I've not modified netty so I can't comment on that. I would set max >>>> spout pending; try 1000 at first. This will limit the number of tuples >>>> that you can have in flight simultaneously and therefore limit the amount >>>> of memory used by these tuples and their processing. >>>> >>>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote: >>>> >>>>> Hi, Nathan >>>>> >>>>> THe log size of that kafka topic is 23515541, each record is about 3K, >>>>> I check the yaml file, I don't have max spout pending set, so I >>>>> assume it is should be default: topology.max.spout.pending: null >>>>> >>>>> Should I set it to a certain value? Also I sometimes seeing the >>>>> java.nio.channels.ClosedChannelException: null, or b.s.d.worker >>>>> [ERROR] Error on initialization of server mk-worker >>>>> does this mean I should add >>>>> storm.messaging.netty.server_worker_threads: 1 >>>>> storm.messaging.netty.client_worker_threads: 1 >>>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer >>>>> storm.messaging.netty.max_retries: 30 >>>>> storm.messaging.netty.max_wait_ms: 1000 >>>>> storm.messaging.netty.min_wait_ms: 100 >>>>> >>>>> into yaml, and modfiy the values? >>>>> >>>>> >>>>> >>>>> thanks >>>>> >>>>> >>>>> >>>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> >>>>> wrote: >>>>> >>>>>> How much data do you have in Kafka? How is your max spout pending >>>>>> set? If you have a high max spout pending (or if you emit unanchored >>>>>> tuples) you could be using up a lot of memory. >>>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote: >>>>>> >>>>>>> Hi, Nathan >>>>>>> >>>>>>> I have met a strange issue, when I set >>>>>>> spoutConf.forceFromStart=true, it will quickly run into GC overhead >>>>>>> limit, >>>>>>> even I already increase the heap size, but I if I remove this setting >>>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming >>>>>>> data much faster than the data being written into postgresDB, and data >>>>>>> will >>>>>>> quick take the memory and causing heap overflow. But I did the same >>>>>>> test on >>>>>>> my DEV cluster, it will working fine, even I set >>>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and >>>>>>> production, they are all same. >>>>>>> >>>>>>> Any hints? >>>>>>> >>>>>>> thanks >>>>>>> >>>>>>> AL >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I don't see anything glaring. I would try increasing heap size. >>>>>>>> It could be that you're right on the threshold of what you've >>>>>>>> allocated and >>>>>>>> you just need more memory. >>>>>>>> >>>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hi, All, >>>>>>>>> , >>>>>>>>> I kind locate where the problem come from, in my running command, >>>>>>>>> I will specify the clientid of TridentKafkaConfig, if I keep the >>>>>>>>> clientid >>>>>>>>> as the one I used before, it will cause GC error, otherwise I am >>>>>>>>> completely >>>>>>>>> OK. Here is the code: >>>>>>>>> >>>>>>>>> if (parameters.containsKey("clientid")) { >>>>>>>>> logger.info("topic=>" + parameters.get("clientid") + "/" + >>>>>>>>> parameters.get("topic")); >>>>>>>>> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >>>>>>>>> parameters.get("clientid")); >>>>>>>>> >>>>>>>>> Any idea about this error? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> AL >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Sorry, continue last thread: >>>>>>>>>> >>>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote >>>>>>>>>> address is not reachable. We will close this client >>>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703 >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:24) >>>>>>>>>> [clojure-1.5.1.jar:na] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not >>>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/ >>>>>>>>>> 10.100.98.103:6703 >>>>>>>>>> at >>>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> ... 6 common frames omitted >>>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: >>>>>>>>>> ("Async loop died!") >>>>>>>>>> java.lang.RuntimeException: ("Async loop died!") >>>>>>>>>> at >>>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>>>>>>>>> [clojure-1.5.1.jar:na] >>>>>>>>>> at >>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at >>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:24) >>>>>>>>>> [clojure-1.5.1.jar:na] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down >>>>>>>>>> worker eventsStreamerv1-48-1425499636 >>>>>>>>>> 0673ece0-cea2-4185-9b3e-6c49ad585576 >>>>>>>>>> 6703 >>>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty >>>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703 >>>>>>>>>> >>>>>>>>>> I doubt this is caused by my eventUpfater, which write data in >>>>>>>>>> batch >>>>>>>>>> >>>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> >>>>>>>>>> { >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public List<String> init(){ >>>>>>>>>> return null; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public List<String> reduce(List<String> curr, >>>>>>>>>> TridentTuple tuple) { >>>>>>>>>> List<String> updated = null ; >>>>>>>>>> >>>>>>>>>> if ( curr == null ) { >>>>>>>>>> String event = (String) >>>>>>>>>> tuple.getValue(1); >>>>>>>>>> System.out.println("===:" + >>>>>>>>>> event + ":"); >>>>>>>>>> updated = >>>>>>>>>> Lists.newArrayList(event); >>>>>>>>>> } else { >>>>>>>>>> System.out.println("===+" + >>>>>>>>>> tuple + ":"); >>>>>>>>>> updated = curr ; >>>>>>>>>> } >>>>>>>>>> // System.out.println("(())"); >>>>>>>>>> return updated ; >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> How do you think >>>>>>>>>> >>>>>>>>>> THanks >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you very much for the reply, here is error I saw in >>>>>>>>>>> production server worker-6703.log, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected] >>>>>>>>>>> > wrote: >>>>>>>>>>> >>>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in >>>>>>>>>>>> the remote machines and run the tools over X or something. I'm >>>>>>>>>>>> assuming >>>>>>>>>>>> this is a development cluster (not live / production) and that >>>>>>>>>>>> installing >>>>>>>>>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that >>>>>>>>>>>>> runs in a worker that resides in a different cluster node than >>>>>>>>>>>>> the one the >>>>>>>>>>>>> profiling tool runs he won't be able to attach the process since >>>>>>>>>>>>> it resides >>>>>>>>>>>>> in a different physical machine, me thinks (well, now that I >>>>>>>>>>>>> think of it >>>>>>>>>>>>> better it can be done... via remote debugging but that's just a >>>>>>>>>>>>> pain in the >>>>>>>>>>>>> ***). >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> A. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you >>>>>>>>>>>>>> can get a lot of mileage by profiling your logic in a standalone >>>>>>>>>>>>>> program. >>>>>>>>>>>>>> For jvisualvm, you can just run your program (a loop that runs >>>>>>>>>>>>>> for a long >>>>>>>>>>>>>> time is best) then attach to the running process with jvisualvm. >>>>>>>>>>>>>> It's >>>>>>>>>>>>>> pretty straightforward to use and you can also find good guides >>>>>>>>>>>>>> with a >>>>>>>>>>>>>> Google search. >>>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Well... detecting memory leaks in Java is a bit tricky as >>>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid >>>>>>>>>>>>>>> using "new" >>>>>>>>>>>>>>> operator and close any resources that you do not use you should >>>>>>>>>>>>>>> be fine... >>>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell >>>>>>>>>>>>>>> you the whole >>>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and >>>>>>>>>>>>>>> test drive it. >>>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable >>>>>>>>>>>>>>> code that has a >>>>>>>>>>>>>>> main function in it) in order to profile it, although if you >>>>>>>>>>>>>>> want to >>>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>>>>>>>>> algorithm >>>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is >>>>>>>>>>>>>>> able to be >>>>>>>>>>>>>>> executed locally for you to profile it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hope this helped. Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> A. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the >>>>>>>>>>>>>>>>> computations and data-structures you have; in my case for >>>>>>>>>>>>>>>>> example I use >>>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in >>>>>>>>>>>>>>>>> memory... having said >>>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate >>>>>>>>>>>>>>>>> how much >>>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not >>>>>>>>>>>>>>>>> possible you >>>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. >>>>>>>>>>>>>>>>> Check for memory >>>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> A. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. >>>>>>>>>>>>>>>>>>> It's quite possible that this is not enough. Try increasing >>>>>>>>>>>>>>>>>>> Xmx i >>>>>>>>>>>>>>>>>>> worker.childopts. >>>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, All >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I have been running a trident topology on production >>>>>>>>>>>>>>>>>>>> server, code is like this: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), >>>>>>>>>>>>>>>>>>>> new Fields("eventType"), new EventUpdater(), new >>>>>>>>>>>>>>>>>>>> Fields("eventWord")) >>>>>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, >>>>>>>>>>>>>>>>>>>> 1); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, >>>>>>>>>>>>>>>>>>>> parse to different field and write into postgresDB. But in >>>>>>>>>>>>>>>>>>>> storm UI, I did see such error, >>>>>>>>>>>>>>>>>>>> "java.lang.OutOfMemoryError: GC overhead limit exceeded". >>>>>>>>>>>>>>>>>>>> It all happens in same worker of each node - 6703. I >>>>>>>>>>>>>>>>>>>> understand this is because by default the JVM is >>>>>>>>>>>>>>>>>>>> configured to throw this error if you are spending more >>>>>>>>>>>>>>>>>>>> than *98% of the total time in GC and after the GC less >>>>>>>>>>>>>>>>>>>> than 2% of the heap is recovered*. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it >>>>>>>>>>>>>>>>>>>> OK by simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - 6700 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - 6701 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - 6702 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> - 6703 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m >>>>>>>>>>>>>>>>>>>> -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m >>>>>>>>>>>>>>>>>>>> -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best >>>>>>>>>>>>>>>>>>>> way to overcome? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> AL >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >>
