Ismael, I have filed https://issues.apache.org/jira/browse/KAFKA-6512 for this issue. I could not find a target version field. Let me know if you need any additional information. I'm new to this project so hopefully the format is what you were looking for.
- Kyle -----Original Message----- From: Ismael Juma [mailto:isma...@gmail.com] Sent: Tuesday, January 30, 2018 9:01 PM To: dev <dev@kafka.apache.org> Subject: Re: Excessive Memory Usage with Compression enabled and possible resolutions Thanks for the report. I haven't looked at the code, but it seems like we would want to do both 1 and 2. Can you please file a JIRA with 1.1.0 as the target version? Ismael On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> wrote: > I'm using Kafka 1.0.0 and the Java producer. > > I've noticed high memory usage in the producer when enabling > compression (gzip or lz4). I don't observe the behavior with > compression off, but with it on I'll run out of heap (2GB). Using a Java > profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with > this, but is not successful. I'm most likely network bottlenecked, so > I expect the producer buffers to be full while the job is running and > potentially a lot of unacknowledged records. > > I've tried using the default buffer.memory with 20 producers (across > 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could > still run OOM in certain cases. I have > max.in.flight.requests.per.connection > set to 1. In short, I should only have ~20 MB (20* 1MB) of data in > buffers, but I can easily exhaust 2000 MB used by Kafka. > > In looking at the code more, it looks like the > KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or buffer > when close() is called. In my heap dump, both of those are ~65k size > each, meaning that each batch is taking up ~148k of space, of which 131k is > buffers. > (buffer.memory=1,000,000 and messages are 1k each until the batch fills). > > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), > which as documented as "Release resources required for record appends (e.g. > compression buffers)". However, this method doesn't actually clear > those buffers because KafkaLZ4BlockOutputStream.close() only writes > the block and end mark and closes the output stream. It doesn't > actually clear the buffer and compressedBuffer in > KafkaLZ4BlockOutputStream. Those stay allocated in RAM until the > block is acknowledged by the broker, processed in > Sender:handleProduceResponse(), and the batch is deallocated. This > memory usage therefore increases, possibly without bound. In my test > program, the program died with approximately 345 unprocessed batches per > producer (20 producers), despite having max.in.flight.requests.per. > connection=1. > > There are a few possible optimizations I can think of: > 1) We could declare KafkaLZ4BlockOutputStream.buffer and > compressedBuffer as non-final and null them in the close() method > 2) We could declare the MemoryRecordsBuilder.appendStream non-final > and null it in the closeForRecordAppends() method > 3) We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because > the recordsBuilder contains significant metadata that is likely needed > after the stream is closed. It is also final. > 4) We could try to limit the number of non-acknowledged batches in > flight. This would bound the maximum memory usage but may negatively > impact performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other algorithms. > Fix #2 would improve all algorithms, compression and otherwise. Of > the 3 proposed here, it seems the best. This would also involve > having to check appendStreamIsClosed in every usage of appendStream > within MemoryRecordsBuilder to avoid NPE's. > > Are there any thoughts or suggestions on how to proceed? > > If requested I can provide standalone testcase code demonstrating this > problem. > > Thanks, > -Kyle > > > > > > > This message is intended exclusively for the individual or entity to > which it is addressed. This communication may contain information that > is proprietary, privileged, confidential or otherwise legally exempt > from disclosure. If you are not the named addressee, or have been > inadvertently and erroneously referenced in the address line, you are > not authorized to read, print, retain, copy or disseminate this > message or any part of it. If you have received this message in error, > please notify the sender immediately by e-mail and delete all copies > of the message. (ID m031214) >