Thanks for the report. I haven't looked at the code, but it seems like we
would want to do both 1 and 2. Can you please file a JIRA with 1.1.0 as the
target version?

Ismael

On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> wrote:

> I'm using Kafka 1.0.0 and the Java producer.
>
> I've noticed high memory usage in the producer when enabling compression
> (gzip or lz4).  I don't observe the behavior with compression off, but with
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this,
> but is not successful.  I'm most likely network bottlenecked, so I expect
> the producer buffers to be full while the job is running and potentially a
> lot of unacknowledged records.
>
> I've tried using the default buffer.memory with 20 producers (across 20
> threads) and sending data as quickly as I can.  I've also tried 1MB of
> buffer.memory, which seemed to reduce memory consumption but I could still
> run OOM in certain cases.  I have max.in.flight.requests.per.connection
> set to 1.  In short, I should only have ~20 MB (20* 1MB) of data in
> buffers, but I can easily exhaust 2000 MB used by Kafka.
>
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream
> doesn't clear the compressedBuffer or buffer when close() is called.  In my
> heap dump, both of those are ~65k size each, meaning that each batch is
> taking up ~148k of space, of which 131k is buffers.
> (buffer.memory=1,000,000 and messages are 1k each until the batch fills).
>
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(),
> which as documented as "Release resources required for record appends (e.g.
> compression buffers)".  However, this method doesn't actually clear those
> buffers because KafkaLZ4BlockOutputStream.close() only writes the block
> and end mark and closes the output stream.  It doesn't actually clear the
> buffer and compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay
> allocated in RAM until the block is acknowledged by the broker, processed
> in Sender:handleProduceResponse(), and the batch is deallocated.  This
> memory usage therefore increases, possibly without bound.  In my test
> program, the program died with approximately 345 unprocessed batches per
> producer (20 producers), despite having max.in.flight.requests.per.
> connection=1.
>
> There are a few possible optimizations I can think of:
> 1) We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer
> as non-final and null them in the close() method
> 2) We could declare the MemoryRecordsBuilder.appendStream non-final and
> null it in the closeForRecordAppends() method
> 3) We could have the ProducerBatch discard the recordsBuilder in
> closeForRecordAppends(), however, this is likely a bad idea because the
> recordsBuilder contains significant metadata that is likely needed after
> the stream is closed.  It is also final.
> 4) We could try to limit the number of non-acknowledged batches in
> flight.  This would bound the maximum memory usage but may negatively
> impact performance.
>
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of the 3
> proposed here, it seems the best.  This would also involve having to check
> appendStreamIsClosed in every usage of appendStream within
> MemoryRecordsBuilder to avoid NPE's.
>
> Are there any thoughts or suggestions on how to proceed?
>
> If requested I can provide standalone testcase code demonstrating this
> problem.
>
> Thanks,
> -Kyle
>
>
>
>
>
>
> This message is intended exclusively for the individual or entity to which
> it is addressed. This communication may contain information that is
> proprietary, privileged, confidential or otherwise legally exempt from
> disclosure. If you are not the named addressee, or have been inadvertently
> and erroneously referenced in the address line, you are not authorized to
> read, print, retain, copy or disseminate this message or any part of it. If
> you have received this message in error, please notify the sender
> immediately by e-mail and delete all copies of the message. (ID m031214)
>

Reply via email to