Hi, Meantime I've figured out the issue... The messages in BlockingEvelopeMap.queue dont get polled as fast as they are queued. The consequence is the queue object (in BlockingEnvelopeMap) grows until all memory is filled up. A workaround I implemented on onEvent(...) method of my consumer is to check the size of the queue and sleep(1) in case it grows too much. Perhaps this could be added as a check inside the method BlockingEnvelopeMap.put(...) - it is better to delay a bit than halt/crash the whole consumer due to memory limitation.
Rgds, Marcelo From: Yi Pan <nickpa...@gmail.com> To: dev@samza.apache.org; Marcelo Romaniuc <mroma...@yahoo.com> Sent: Tuesday, January 19, 2016 5:51 AM Subject: Re: Custom System Consumer filling up memory Hi, Marcelo, Sorry to get back to you late. I remember that Jagadish has some conversation w/ you on the implementation earlier. Did that include some hints to solve this problem as well? Generally, customized system consumers would need to be responsible for the memory usage in the customized code. We would need much more detailed info to see whether the memory leakage is in your customized SystemConsumer code or is in the base class provided (e.g. BlockingEnvelopeQueue). If you still need help, please provide code and steps to re-produce the issue, also the heap dump file. Thanks a lot! -Yi On Wed, Dec 30, 2015 at 11:19 AM, Marcelo Romaniuc < mroma...@yahoo.com.invalid> wrote: > > > Hi, > > I've created a custom System Consumer extending BlockingEnvelopeMap. > All looks good until I reach about 10m messages processed by the > StreamTask. At that point I see a lot of GC going on and the heap dump > shows memory is mostly used by "IncomingMessageEnvelope" and a > ConcurrentHashMap (probably from BlockingEnvelopMap). > It seems the messages are hanging around, even after "processed" by the > StreamTask. > Do I need to do something to dispose such messages ? > > Thanks, > Marcelo > > > >