Ismael,

I have filed https://issues.apache.org/jira/browse/KAFKA-6512 for this issue.  
I could not find a target version field.  Let me know if you need any 
additional information.  I'm new to this project so hopefully the format is 
what you were looking for.

- Kyle

-----Original Message-----
From: Ismael Juma [mailto:isma...@gmail.com] 
Sent: Tuesday, January 30, 2018 9:01 PM
To: dev <dev@kafka.apache.org>
Subject: Re: Excessive Memory Usage with Compression enabled and possible 
resolutions

Thanks for the report. I haven't looked at the code, but it seems like we would 
want to do both 1 and 2. Can you please file a JIRA with 1.1.0 as the target 
version?

Ismael

On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> wrote:

> I'm using Kafka 1.0.0 and the Java producer.
>
> I've noticed high memory usage in the producer when enabling 
> compression (gzip or lz4).  I don't observe the behavior with 
> compression off, but with it on I'll run out of heap (2GB).  Using a Java 
> profiler, I see the data is
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with 
> this, but is not successful.  I'm most likely network bottlenecked, so 
> I expect the producer buffers to be full while the job is running and 
> potentially a lot of unacknowledged records.
>
> I've tried using the default buffer.memory with 20 producers (across 
> 20
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could 
> still run OOM in certain cases.  I have 
> max.in.flight.requests.per.connection
> set to 1.  In short, I should only have ~20 MB (20* 1MB) of data in 
> buffers, but I can easily exhaust 2000 MB used by Kafka.
>
> In looking at the code more, it looks like the 
> KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or buffer 
> when close() is called.  In my heap dump, both of those are ~65k size 
> each, meaning that each batch is taking up ~148k of space, of which 131k is 
> buffers.
> (buffer.memory=1,000,000 and messages are 1k each until the batch fills).
>
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(),
> which as documented as "Release resources required for record appends (e.g.
> compression buffers)".  However, this method doesn't actually clear 
> those buffers because KafkaLZ4BlockOutputStream.close() only writes 
> the block and end mark and closes the output stream.  It doesn't 
> actually clear the buffer and compressedBuffer in 
> KafkaLZ4BlockOutputStream.  Those stay allocated in RAM until the 
> block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This 
> memory usage therefore increases, possibly without bound.  In my test 
> program, the program died with approximately 345 unprocessed batches per 
> producer (20 producers), despite having max.in.flight.requests.per.
> connection=1.
>
> There are a few possible optimizations I can think of:
> 1) We could declare KafkaLZ4BlockOutputStream.buffer and 
> compressedBuffer as non-final and null them in the close() method
> 2) We could declare the MemoryRecordsBuilder.appendStream non-final 
> and null it in the closeForRecordAppends() method
> 3) We could have the ProducerBatch discard the recordsBuilder in 
> closeForRecordAppends(), however, this is likely a bad idea because 
> the recordsBuilder contains significant metadata that is likely needed 
> after the stream is closed.  It is also final.
> 4) We could try to limit the number of non-acknowledged batches in 
> flight.  This would bound the maximum memory usage but may negatively 
> impact performance.
>
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of 
> the 3 proposed here, it seems the best.  This would also involve 
> having to check appendStreamIsClosed in every usage of appendStream 
> within MemoryRecordsBuilder to avoid NPE's.
>
> Are there any thoughts or suggestions on how to proceed?
>
> If requested I can provide standalone testcase code demonstrating this 
> problem.
>
> Thanks,
> -Kyle
>
>
>
>
>
>
> This message is intended exclusively for the individual or entity to 
> which it is addressed. This communication may contain information that 
> is proprietary, privileged, confidential or otherwise legally exempt 
> from disclosure. If you are not the named addressee, or have been 
> inadvertently and erroneously referenced in the address line, you are 
> not authorized to read, print, retain, copy or disseminate this 
> message or any part of it. If you have received this message in error, 
> please notify the sender immediately by e-mail and delete all copies 
> of the message. (ID m031214)
>

Reply via email to