Hi, Nathan THe log size of that kafka topic is 23515541, each record is about 3K, I check the yaml file, I don't have max spout pending set, so I assume it is should be default: topology.max.spout.pending: null
Should I set it to a certain value? Also I sometimes seeing the java.nio.channels.ClosedChannelException: null, or b.s.d.worker [ERROR] Error on initialization of server mk-worker does this mean I should add storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 into yaml, and modfiy the values? thanks On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote: > How much data do you have in Kafka? How is your max spout pending set? If > you have a high max spout pending (or if you emit unanchored tuples) you > could be using up a lot of memory. > On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote: > >> Hi, Nathan >> >> I have met a strange issue, when I set spoutConf.forceFromStart=true, it >> will quickly run into GC overhead limit, even I already increase the heap >> size, but I if I remove this setting >> it will work fine, I was thinking maybe the kafkaSpout consuming data >> much faster than the data being written into postgresDB, and data will >> quick take the memory and causing heap overflow. But I did the same test on >> my DEV cluster, it will working fine, even I set >> spoutConf.forceFromStart=true. I check the storm config for DEV and >> production, they are all same. >> >> Any hints? >> >> thanks >> >> AL >> >> >> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote: >> >>> I don't see anything glaring. I would try increasing heap size. It >>> could be that you're right on the threshold of what you've allocated and >>> you just need more memory. >>> >>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: >>> >>>> Hi, All, >>>> , >>>> I kind locate where the problem come from, in my running command, I >>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as >>>> the one I used before, it will cause GC error, otherwise I am completely >>>> OK. Here is the code: >>>> >>>> if (parameters.containsKey("clientid")) { >>>> logger.info("topic=>" + parameters.get("clientid") + "/" + >>>> parameters.get("topic")); >>>> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >>>> parameters.get("clientid")); >>>> >>>> Any idea about this error? >>>> >>>> >>>> Thanks >>>> >>>> >>>> AL >>>> >>>> >>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote: >>>> >>>>> Sorry, continue last thread: >>>>> >>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address >>>>> is not reachable. We will close this client >>>>> Netty-Client-complicated-laugh/ >>>>> 10.100.98.103:6703 >>>>> at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>> Caused by: java.lang.RuntimeException: Remote address is not >>>>> reachable. We will close this client Netty-Client-complicated-laugh/ >>>>> 10.100.98.103:6703 >>>>> at >>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at backtype.storm.messaging.netty.Client.send(Client.java:194) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>> ... 6 common frames omitted >>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async >>>>> loop died!") >>>>> java.lang.RuntimeException: ("Async loop died!") >>>>> at >>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>>>> [storm-core-0.9.3.jar:0.9.3] >>>>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>>>> [clojure-1.5.1.jar:na] >>>>> at >>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>>>> [storm-core-0.9.3.jar:0.9.3] >>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>>>> [storm-core-0.9.3.jar:0.9.3] >>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker >>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703 >>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty >>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703 >>>>> >>>>> I doubt this is caused by my eventUpfater, which write data in batch >>>>> >>>>> static class EventUpdater implements ReducerAggregator<List<String>> { >>>>> >>>>> @Override >>>>> public List<String> init(){ >>>>> return null; >>>>> } >>>>> >>>>> @Override >>>>> public List<String> reduce(List<String> curr, TridentTuple >>>>> tuple) { >>>>> List<String> updated = null ; >>>>> >>>>> if ( curr == null ) { >>>>> String event = (String) >>>>> tuple.getValue(1); >>>>> System.out.println("===:" + event + >>>>> ":"); >>>>> updated = Lists.newArrayList(event); >>>>> } else { >>>>> System.out.println("===+" + tuple + >>>>> ":"); >>>>> updated = curr ; >>>>> } >>>>> // System.out.println("(())"); >>>>> return updated ; >>>>> } >>>>> } >>>>> >>>>> How do you think >>>>> >>>>> THanks >>>>> >>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote: >>>>> >>>>>> Thank you very much for the reply, here is error I saw in production >>>>>> server worker-6703.log, >>>>>> >>>>>> >>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the >>>>>>> remote machines and run the tools over X or something. I'm assuming >>>>>>> this >>>>>>> is a development cluster (not live / production) and that installing >>>>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>>>> >>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs >>>>>>>> in a worker that resides in a different cluster node than the one the >>>>>>>> profiling tool runs he won't be able to attach the process since it >>>>>>>> resides >>>>>>>> in a different physical machine, me thinks (well, now that I think of >>>>>>>> it >>>>>>>> better it can be done... via remote debugging but that's just a pain >>>>>>>> in the >>>>>>>> ***). >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> A. >>>>>>>> >>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> You don't need to change your code. As Andrew mentioned you can >>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. >>>>>>>>> For >>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long >>>>>>>>> time >>>>>>>>> is best) then attach to the running process with jvisualvm. It's >>>>>>>>> pretty >>>>>>>>> straightforward to use and you can also find good guides with a Google >>>>>>>>> search. >>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Well... detecting memory leaks in Java is a bit tricky as Java >>>>>>>>>> does a lot for you. Generally though, as long as you avoid using >>>>>>>>>> "new" >>>>>>>>>> operator and close any resources that you do not use you should be >>>>>>>>>> fine... >>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you >>>>>>>>>> the whole >>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test >>>>>>>>>> drive it. >>>>>>>>>> I am pretty sure that you need a working jar (or compilable code >>>>>>>>>> that has a >>>>>>>>>> main function in it) in order to profile it, although if you want to >>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>>>> algorithm >>>>>>>>>> (or portions of it) can be put in a sample test program that is able >>>>>>>>>> to be >>>>>>>>>> executed locally for you to profile it. >>>>>>>>>> >>>>>>>>>> Hope this helped. Regards, >>>>>>>>>> >>>>>>>>>> A. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations >>>>>>>>>>>> and data-structures you have; in my case for example I use more >>>>>>>>>>>> than 2GB >>>>>>>>>>>> since I need to keep a large matrix in memory... having said that, >>>>>>>>>>>> in most >>>>>>>>>>>> cases it should be relatively easy to estimate how much memory you >>>>>>>>>>>> are >>>>>>>>>>>> going to need and use that... or if that's not possible you can >>>>>>>>>>>> just >>>>>>>>>>>> increase it and try the "set and see" approach. Check for memory >>>>>>>>>>>> leaks as >>>>>>>>>>>> well... (unclosed resources and so on...!) >>>>>>>>>>>> >>>>>>>>>>>> Regards. >>>>>>>>>>>> >>>>>>>>>>>> A. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's >>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i >>>>>>>>>>>>>> worker.childopts. >>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, All >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have been running a trident topology on production server, >>>>>>>>>>>>>>> code is like this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new >>>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new >>>>>>>>>>>>>>> Fields("eventWord")) >>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse >>>>>>>>>>>>>>> to different field and write into postgresDB. But in storm UI, >>>>>>>>>>>>>>> I did see such error, "java.lang.OutOfMemoryError: GC overhead >>>>>>>>>>>>>>> limit exceeded". It all happens in same worker of each node - >>>>>>>>>>>>>>> 6703. I understand this is because by default the JVM is >>>>>>>>>>>>>>> configured to throw this error if you are spending more than >>>>>>>>>>>>>>> *98% of the total time in GC and after the GC less than 2% of >>>>>>>>>>>>>>> the heap is recovered*. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by >>>>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - 6700 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - 6701 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - 6702 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - 6703 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to >>>>>>>>>>>>>>> overcome? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> AL >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>
