Thanks for the report. I haven't looked at the code, but it seems like we would want to do both 1 and 2. Can you please file a JIRA with 1.1.0 as the target version?
Ismael On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> wrote: > I'm using Kafka 1.0.0 and the Java producer. > > I've noticed high memory usage in the producer when enabling compression > (gzip or lz4). I don't observe the behavior with compression off, but with > it on I'll run out of heap (2GB). Using a Java profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, > but is not successful. I'm most likely network bottlenecked, so I expect > the producer buffers to be full while the job is running and potentially a > lot of unacknowledged records. > > I've tried using the default buffer.memory with 20 producers (across 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could still > run OOM in certain cases. I have max.in.flight.requests.per.connection > set to 1. In short, I should only have ~20 MB (20* 1MB) of data in > buffers, but I can easily exhaust 2000 MB used by Kafka. > > In looking at the code more, it looks like the KafkaLZ4BlockOutputStream > doesn't clear the compressedBuffer or buffer when close() is called. In my > heap dump, both of those are ~65k size each, meaning that each batch is > taking up ~148k of space, of which 131k is buffers. > (buffer.memory=1,000,000 and messages are 1k each until the batch fills). > > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), > which as documented as "Release resources required for record appends (e.g. > compression buffers)". However, this method doesn't actually clear those > buffers because KafkaLZ4BlockOutputStream.close() only writes the block > and end mark and closes the output stream. It doesn't actually clear the > buffer and compressedBuffer in KafkaLZ4BlockOutputStream. Those stay > allocated in RAM until the block is acknowledged by the broker, processed > in Sender:handleProduceResponse(), and the batch is deallocated. This > memory usage therefore increases, possibly without bound. In my test > program, the program died with approximately 345 unprocessed batches per > producer (20 producers), despite having max.in.flight.requests.per. > connection=1. > > There are a few possible optimizations I can think of: > 1) We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer > as non-final and null them in the close() method > 2) We could declare the MemoryRecordsBuilder.appendStream non-final and > null it in the closeForRecordAppends() method > 3) We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because the > recordsBuilder contains significant metadata that is likely needed after > the stream is closed. It is also final. > 4) We could try to limit the number of non-acknowledged batches in > flight. This would bound the maximum memory usage but may negatively > impact performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other algorithms. > Fix #2 would improve all algorithms, compression and otherwise. Of the 3 > proposed here, it seems the best. This would also involve having to check > appendStreamIsClosed in every usage of appendStream within > MemoryRecordsBuilder to avoid NPE's. > > Are there any thoughts or suggestions on how to proceed? > > If requested I can provide standalone testcase code demonstrating this > problem. > > Thanks, > -Kyle > > > > > > > This message is intended exclusively for the individual or entity to which > it is addressed. This communication may contain information that is > proprietary, privileged, confidential or otherwise legally exempt from > disclosure. If you are not the named addressee, or have been inadvertently > and erroneously referenced in the address line, you are not authorized to > read, print, retain, copy or disseminate this message or any part of it. If > you have received this message in error, please notify the sender > immediately by e-mail and delete all copies of the message. (ID m031214) >