I'm using Kafka 1.0.0 and the Java producer.

I've noticed high memory usage in the producer when enabling compression (gzip 
or lz4).  I don't observe the behavior with compression off, but with it on 
I'll run out of heap (2GB).  Using a Java profiler, I see the data is in the 
KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
is not successful.  I'm most likely network bottlenecked, so I expect the 
producer buffers to be full while the job is running and potentially a lot of 
unacknowledged records.

I've tried using the default buffer.memory with 20 producers (across 20 
threads) and sending data as quickly as I can.  I've also tried 1MB of 
buffer.memory, which seemed to reduce memory consumption but I could still run 
OOM in certain cases.  I have max.in.flight.requests.per.connection set to 1.  
In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can 
easily exhaust 2000 MB used by Kafka.

In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
doesn't clear the compressedBuffer or buffer when close() is called.  In my 
heap dump, both of those are ~65k size each, meaning that each batch is taking 
up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 and 
messages are 1k each until the batch fills).

Kafka tries to manage memory usage by calling 
MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
resources required for record appends (e.g. compression buffers)".  However, 
this method doesn't actually clear those buffers because 
KafkaLZ4BlockOutputStream.close() only writes the block and end mark and closes 
the output stream.  It doesn't actually clear the buffer and compressedBuffer 
in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM until the block is 
acknowledged by the broker, processed in Sender:handleProduceResponse(), and 
the batch is deallocated.  This memory usage therefore increases, possibly 
without bound.  In my test program, the program died with approximately 345 
unprocessed batches per producer (20 producers), despite having 
max.in.flight.requests.per.connection=1.

There are a few possible optimizations I can think of:
1) We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
non-final and null them in the close() method
2) We could declare the MemoryRecordsBuilder.appendStream non-final and null it 
in the closeForRecordAppends() method
3) We could have the ProducerBatch discard the recordsBuilder in 
closeForRecordAppends(), however, this is likely a bad idea because the 
recordsBuilder contains significant metadata that is likely needed after the 
stream is closed.  It is also final.
4) We could try to limit the number of non-acknowledged batches in flight.  
This would bound the maximum memory usage but may negatively impact performance.

Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
proposed here, it seems the best.  This would also involve having to check 
appendStreamIsClosed in every usage of appendStream within MemoryRecordsBuilder 
to avoid NPE's.

Are there any thoughts or suggestions on how to proceed?

If requested I can provide standalone testcase code demonstrating this problem.

Thanks,
-Kyle






This message is intended exclusively for the individual or entity to which it 
is addressed. This communication may contain information that is proprietary, 
privileged, confidential or otherwise legally exempt from disclosure. If you 
are not the named addressee, or have been inadvertently and erroneously 
referenced in the address line, you are not authorized to read, print, retain, 
copy or disseminate this message or any part of it. If you have received this 
message in error, please notify the sender immediately by e-mail and delete all 
copies of the message. (ID m031214)

Reply via email to