Hi, Nathan

I have met a strange issue, when I set spoutConf.forceFromStart=true, it
will quickly run into GC overhead limit, even I already increase the heap
size, but I if I remove this setting
it will work fine, I was thinking maybe the kafkaSpout consuming data much
faster than the data being written into postgresDB, and data will quick
take the memory and causing heap overflow. But I did the same test on my
DEV cluster, it will working fine, even I set
spoutConf.forceFromStart=true. I check the storm config for DEV and
production, they are all same.

Any hints?

thanks

AL


On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote:

> I don't see anything glaring.  I would try increasing heap size.  It could
> be that you're right on the threshold of what you've allocated and you just
> need more memory.
>
> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote:
>
>> Hi, All,
>> ,
>> I kind locate where the problem come from, in my running command, I will
>> specify the clientid of TridentKafkaConfig, if I keep the clientid as the
>> one I used before, it will cause GC error, otherwise I am completely OK.
>> Here is the code:
>>
>> if (parameters.containsKey("clientid")) {
>>     logger.info("topic=>" + parameters.get("clientid") + "/" + 
>> parameters.get("topic"));
>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), 
>> parameters.get("clientid"));
>>
>> Any idea about this error?
>>
>>
>> Thanks
>>
>>
>> AL
>>
>>
>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote:
>>
>>> Sorry, continue last thread:
>>>
>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address
>>> is not reachable. We will close this client Netty-Client-complicated-laugh/
>>> 10.100.98.103:6703
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>> Caused by: java.lang.RuntimeException: Remote address is not reachable.
>>> We will close this client Netty-Client-complicated-laugh/
>>> 10.100.98.103:6703
>>>         at
>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         ... 6 common frames omitted
>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>>> loop died!")
>>> java.lang.RuntimeException: ("Async loop died!")
>>>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>> [clojure-1.5.1.jar:na]
>>>         at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
>>> Netty-Client-beloved-judge/10.100.98.104:6703
>>>
>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>
>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>
>>>             @Override
>>>             public List<String> init(){
>>>                      return null;
>>>             }
>>>
>>>             @Override
>>>             public List<String> reduce(List<String> curr, TridentTuple 
>>> tuple) {
>>>                    List<String> updated = null ;
>>>
>>>                    if ( curr == null ) {
>>>                                     String event = (String) 
>>> tuple.getValue(1);
>>>                                     System.out.println("===:" + event + 
>>> ":");
>>>                                     updated = Lists.newArrayList(event);
>>>                    } else {
>>>                                     System.out.println("===+" +  tuple + 
>>> ":");
>>>                                     updated = curr ;
>>>                    }
>>> //              System.out.println("(())");
>>>               return updated ;
>>>             }
>>>         }
>>>
>>> How do you think
>>>
>>> THanks
>>>
>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote:
>>>
>>>> Thank you very much for the reply, here is error I saw in production
>>>> server worker-6703.log,
>>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>> is a development cluster (not live / production) and that installing
>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>
>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Nathan I think that if he wants to profile a bolt per se that runs in
>>>>>> a worker that resides in a different cluster node than the one the
>>>>>> profiling tool runs he won't be able to attach the process since it 
>>>>>> resides
>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>> better it can be done... via remote debugging but that's just a pain in 
>>>>>> the
>>>>>> ***).
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> A.
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> You don't need to change your code. As Andrew mentioned you can get
>>>>>>> a lot of mileage by profiling your logic in a standalone program. For
>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long 
>>>>>>> time
>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>> search.
>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> ​
>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>> operator and close any resources that you do not use you should be 
>>>>>>>> fine...
>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the 
>>>>>>>> whole
>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test 
>>>>>>>> drive it.
>>>>>>>> I am pretty sure that you need a working jar (or compilable code that 
>>>>>>>> has a
>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your 
>>>>>>>> algorithm
>>>>>>>> (or portions of it) can be put in a sample test program that is able 
>>>>>>>> to be
>>>>>>>> executed locally for you to profile it.
>>>>>>>>
>>>>>>>> Hope this helped. Regards,
>>>>>>>>
>>>>>>>> A.
>>>>>>>> ​
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>>> and data-structures you have; in my case for example I use more than 
>>>>>>>>>> 2GB
>>>>>>>>>> since I need to keep a large matrix in memory... having said that, 
>>>>>>>>>> in most
>>>>>>>>>> cases it should be relatively easy to estimate how much memory you 
>>>>>>>>>> are
>>>>>>>>>> going to need and use that... or if that's not possible you can just
>>>>>>>>>> increase it and try the "set and see" approach. Check for memory 
>>>>>>>>>> leaks as
>>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>> ​A.​
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <[email protected]
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>>>>> possible that this is not enough. Try increasing Xmx i 
>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>                 
>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>
>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>         
>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to 
>>>>>>>>>>>>> different field and write into postgresDB. But in storm UI, I did 
>>>>>>>>>>>>> see such error, "java.lang.OutOfMemoryError: GC overhead limit 
>>>>>>>>>>>>> exceeded". It all happens in same worker of each node - 6703. I 
>>>>>>>>>>>>> understand this is because by default the JVM is configured to 
>>>>>>>>>>>>> throw this error if you are spending more than *98% of the total 
>>>>>>>>>>>>> time in GC and after the GC less than 2% of the heap is 
>>>>>>>>>>>>> recovered*.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by 
>>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>
>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>
>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>
>>>>>>>>>>>>> AL
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to