Hi Kyle, Are you interested in submitting a pull request?
Ismael On Wed, Jan 31, 2018 at 3:00 PM, Kyle Tinker <ktin...@workforcesoftware.com> wrote: > Ismael, > > I have filed https://issues.apache.org/jira/browse/KAFKA-6512 for this > issue. I could not find a target version field. Let me know if you need > any additional information. I'm new to this project so hopefully the > format is what you were looking for. > > - Kyle > > -----Original Message----- > From: Ismael Juma [mailto:isma...@gmail.com] > Sent: Tuesday, January 30, 2018 9:01 PM > To: dev <dev@kafka.apache.org> > Subject: Re: Excessive Memory Usage with Compression enabled and possible > resolutions > > Thanks for the report. I haven't looked at the code, but it seems like we > would want to do both 1 and 2. Can you please file a JIRA with 1.1.0 as the > target version? > > Ismael > > On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> > wrote: > > > I'm using Kafka 1.0.0 and the Java producer. > > > > I've noticed high memory usage in the producer when enabling > > compression (gzip or lz4). I don't observe the behavior with > > compression off, but with it on I'll run out of heap (2GB). Using a > Java profiler, I see the data is > > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see > that > > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with > > this, but is not successful. I'm most likely network bottlenecked, so > > I expect the producer buffers to be full while the job is running and > > potentially a lot of unacknowledged records. > > > > I've tried using the default buffer.memory with 20 producers (across > > 20 > > threads) and sending data as quickly as I can. I've also tried 1MB of > > buffer.memory, which seemed to reduce memory consumption but I could > > still run OOM in certain cases. I have > > max.in.flight.requests.per.connection > > set to 1. In short, I should only have ~20 MB (20* 1MB) of data in > > buffers, but I can easily exhaust 2000 MB used by Kafka. > > > > In looking at the code more, it looks like the > > KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or buffer > > when close() is called. In my heap dump, both of those are ~65k size > > each, meaning that each batch is taking up ~148k of space, of which 131k > is buffers. > > (buffer.memory=1,000,000 and messages are 1k each until the batch fills). > > > > Kafka tries to manage memory usage by calling > > MemoryRecordsBuilder:closeForRecordAppends(), > > which as documented as "Release resources required for record appends > (e.g. > > compression buffers)". However, this method doesn't actually clear > > those buffers because KafkaLZ4BlockOutputStream.close() only writes > > the block and end mark and closes the output stream. It doesn't > > actually clear the buffer and compressedBuffer in > > KafkaLZ4BlockOutputStream. Those stay allocated in RAM until the > > block is acknowledged by the broker, processed in > > Sender:handleProduceResponse(), and the batch is deallocated. This > > memory usage therefore increases, possibly without bound. In my test > > program, the program died with approximately 345 unprocessed batches per > producer (20 producers), despite having max.in.flight.requests.per. > > connection=1. > > > > There are a few possible optimizations I can think of: > > 1) We could declare KafkaLZ4BlockOutputStream.buffer and > > compressedBuffer as non-final and null them in the close() method > > 2) We could declare the MemoryRecordsBuilder.appendStream non-final > > and null it in the closeForRecordAppends() method > > 3) We could have the ProducerBatch discard the recordsBuilder in > > closeForRecordAppends(), however, this is likely a bad idea because > > the recordsBuilder contains significant metadata that is likely needed > > after the stream is closed. It is also final. > > 4) We could try to limit the number of non-acknowledged batches in > > flight. This would bound the maximum memory usage but may negatively > > impact performance. > > > > Fix #1 would only improve the LZ4 algorithm, and not any other > algorithms. > > Fix #2 would improve all algorithms, compression and otherwise. Of > > the 3 proposed here, it seems the best. This would also involve > > having to check appendStreamIsClosed in every usage of appendStream > > within MemoryRecordsBuilder to avoid NPE's. > > > > Are there any thoughts or suggestions on how to proceed? > > > > If requested I can provide standalone testcase code demonstrating this > > problem. > > > > Thanks, > > -Kyle > > > > > > > > > > > > > > This message is intended exclusively for the individual or entity to > > which it is addressed. This communication may contain information that > > is proprietary, privileged, confidential or otherwise legally exempt > > from disclosure. If you are not the named addressee, or have been > > inadvertently and erroneously referenced in the address line, you are > > not authorized to read, print, retain, copy or disseminate this > > message or any part of it. If you have received this message in error, > > please notify the sender immediately by e-mail and delete all copies > > of the message. (ID m031214) > > >