I've not modified netty so I can't comment on that.  I would set max spout
pending; try 1000 at first.  This will limit the number of tuples that you
can have in flight simultaneously and therefore limit the amount of memory
used by these tuples and their processing.

On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote:

> Hi, Nathan
>
> THe log size of that kafka topic is 23515541, each record is about 3K,  I
> check the yaml file, I don't have max spout pending set, so I assume it
> is should be default: topology.max.spout.pending: null
>
> Should I set it to a certain value? Also I sometimes seeing the
> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR]
> Error on initialization of server mk-worker
> does this mean I should add
> storm.messaging.netty.server_worker_threads: 1
> storm.messaging.netty.client_worker_threads: 1
> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
> storm.messaging.netty.max_retries: 30  storm.messaging.netty.max_wait_ms:
> 1000  storm.messaging.netty.min_wait_ms: 100
>
> into yaml, and modfiy the values?
>
>
>
> thanks
>
>
>
> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote:
>
>> How much data do you have in Kafka? How is your max spout pending set? If
>> you have a high max spout pending (or if you emit unanchored tuples) you
>> could be using up a lot of memory.
>> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote:
>>
>>> Hi, Nathan
>>>
>>> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
>>> will quickly run into GC overhead limit, even I already increase the heap
>>> size, but I if I remove this setting
>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>> much faster than the data being written into postgresDB, and data will
>>> quick take the memory and causing heap overflow. But I did the same test on
>>> my DEV cluster, it will working fine, even I set
>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>> production, they are all same.
>>>
>>> Any hints?
>>>
>>> thanks
>>>
>>> AL
>>>
>>>
>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>> could be that you're right on the threshold of what you've allocated and
>>>> you just need more memory.
>>>>
>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote:
>>>>
>>>>> Hi, All,
>>>>> ,
>>>>> I kind locate where the problem come from, in my running command, I
>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>> OK. Here is the code:
>>>>>
>>>>> if (parameters.containsKey("clientid")) {
>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + 
>>>>> parameters.get("topic"));
>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), 
>>>>> parameters.get("clientid"));
>>>>>
>>>>> Any idea about this error?
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> AL
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote:
>>>>>
>>>>>> Sorry, continue last thread:
>>>>>>
>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>> address is not reachable. We will close this client
>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>> 10.100.98.103:6703
>>>>>>         at
>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         ... 6 common frames omitted
>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>> ("Async loop died!")
>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>         at
>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>> [clojure-1.5.1.jar:na]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>
>>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>>
>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>
>>>>>>             @Override
>>>>>>             public List<String> init(){
>>>>>>                      return null;
>>>>>>             }
>>>>>>
>>>>>>             @Override
>>>>>>             public List<String> reduce(List<String> curr, TridentTuple 
>>>>>> tuple) {
>>>>>>                    List<String> updated = null ;
>>>>>>
>>>>>>                    if ( curr == null ) {
>>>>>>                                     String event = (String) 
>>>>>> tuple.getValue(1);
>>>>>>                                     System.out.println("===:" + event + 
>>>>>> ":");
>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>                    } else {
>>>>>>                                     System.out.println("===+" +  tuple + 
>>>>>> ":");
>>>>>>                                     updated = curr ;
>>>>>>                    }
>>>>>> //              System.out.println("(())");
>>>>>>               return updated ;
>>>>>>             }
>>>>>>         }
>>>>>>
>>>>>> How do you think
>>>>>>
>>>>>> THanks
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote:
>>>>>>
>>>>>>> Thank you very much for the reply, here is error I saw in production
>>>>>>> server worker-6703.log,
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>>>> remote machines and run the tools over X or something.  I'm assuming 
>>>>>>>> this
>>>>>>>> is a development cluster (not live / production) and that installing
>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs
>>>>>>>>> in a worker that resides in a different cluster node than the one the
>>>>>>>>> profiling tool runs he won't be able to attach the process since it 
>>>>>>>>> resides
>>>>>>>>> in a different physical machine, me thinks (well, now that I think of 
>>>>>>>>> it
>>>>>>>>> better it can be done... via remote debugging but that's just a pain 
>>>>>>>>> in the
>>>>>>>>> ***).
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> A.
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone 
>>>>>>>>>> program. For
>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a 
>>>>>>>>>> long time
>>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's 
>>>>>>>>>> pretty
>>>>>>>>>> straightforward to use and you can also find good guides with a 
>>>>>>>>>> Google
>>>>>>>>>> search.
>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> ​
>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>>>> does a lot for you. Generally though, as long as you avoid using 
>>>>>>>>>>> "new"
>>>>>>>>>>> operator and close any resources that you do not use you should be 
>>>>>>>>>>> fine...
>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you 
>>>>>>>>>>> the whole
>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test 
>>>>>>>>>>> drive it.
>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code 
>>>>>>>>>>> that has a
>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your 
>>>>>>>>>>> algorithm
>>>>>>>>>>> (or portions of it) can be put in a sample test program that is 
>>>>>>>>>>> able to be
>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>
>>>>>>>>>>> A.
>>>>>>>>>>> ​
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>> computations and data-structures you have; in my case for example 
>>>>>>>>>>>>> I use
>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... 
>>>>>>>>>>>>> having said
>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how 
>>>>>>>>>>>>> much
>>>>>>>>>>>>> memory you are going to need and use that... or if that's not 
>>>>>>>>>>>>> possible you
>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check 
>>>>>>>>>>>>> for memory
>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new 
>>>>>>>>>>>>>>>> Fields("eventWord"))
>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>         
>>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse 
>>>>>>>>>>>>>>>> to different field and write into postgresDB. But in storm UI, 
>>>>>>>>>>>>>>>> I did see such error, "java.lang.OutOfMemoryError: GC overhead 
>>>>>>>>>>>>>>>> limit exceeded". It all happens in same worker of each node - 
>>>>>>>>>>>>>>>> 6703. I understand this is because by default the JVM is 
>>>>>>>>>>>>>>>> configured to throw this error if you are spending more than 
>>>>>>>>>>>>>>>> *98% of the total time in GC and after the GC less than 2% of 
>>>>>>>>>>>>>>>> the heap is recovered*.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by 
>>>>>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to