Hi, Marcelo, The behavior you described sounds a bit weird to me. The SystemConsumer should not be polled if there is no SystemStreamPartition in the set from emptySystemStreamPartitionsBySystem(systemName). This SSP set per system is removed from the poll when there are unprocessed messages pending. Hence, if the MessageChooser does not remove all unprocessed messages for a specific SSP, there will be no more polling via the SystemConsumer to get more messages from the network. Hence, if the process is slow, there would be more unprocessed messages in the SystemConsumers, which would mean that those SystemStreamPartition objects should not be polled via the corresponding SystemConsumer. This logic is built in SystemConsumers code and has been functioning correctly. I would suspect that it might be related to the batch request size you have in your SystemConsumer, which may be too big for the configured container memory size?
-Yi On Sat, Jan 23, 2016 at 6:56 AM, Marcelo Romaniuc < mroma...@yahoo.com.invalid> wrote: > Hi, > > Meantime I've figured out the issue... The messages in > BlockingEvelopeMap.queue dont get polled as fast as they are queued. The > consequence is the queue object (in BlockingEnvelopeMap) grows until all > memory is filled up. A workaround I implemented on onEvent(...) method of > my consumer is to check the size of the queue and sleep(1) in case it grows > too much. Perhaps this could be added as a check inside the method > BlockingEnvelopeMap.put(...) - it is better to delay a bit than halt/crash > the whole consumer due to memory limitation. > > > Rgds, > Marcelo > > > From: Yi Pan <nickpa...@gmail.com> > To: dev@samza.apache.org; Marcelo Romaniuc <mroma...@yahoo.com> > Sent: Tuesday, January 19, 2016 5:51 AM > Subject: Re: Custom System Consumer filling up memory > > Hi, Marcelo, > > Sorry to get back to you late. I remember that Jagadish has some > conversation w/ you on the implementation earlier. Did that include some > hints to solve this problem as well? Generally, customized system consumers > would need to be responsible for the memory usage in the customized code. > We would need much more detailed info to see whether the memory leakage is > in your customized SystemConsumer code or is in the base class provided > (e.g. BlockingEnvelopeQueue). If you still need help, please provide code > and steps to re-produce the issue, also the heap dump file. > > Thanks a lot! > > -Yi > > On Wed, Dec 30, 2015 at 11:19 AM, Marcelo Romaniuc < > mroma...@yahoo.com.invalid> wrote: > > > > > > > Hi, > > > > I've created a custom System Consumer extending BlockingEnvelopeMap. > > All looks good until I reach about 10m messages processed by the > > StreamTask. At that point I see a lot of GC going on and the heap dump > > shows memory is mostly used by "IncomingMessageEnvelope" and a > > ConcurrentHashMap (probably from BlockingEnvelopMap). > > It seems the messages are hanging around, even after "processed" by > the > > StreamTask. > > Do I need to do something to dispose such messages ? > > > > Thanks, > > Marcelo > > > > > > > > > > > >