Hi Syed,

from your screenshot I assume that you are using SnapLogic to run your
code (full disclosure: I do not have the faintest idea of this
product!). I've just had a look at the docs and am a bit confused by
their explanation of the metric that you point out in your image
"Memory Allocated". The docs say: "The Memory Allocated reflects the
number of bytes that were allocated by the Snap.  Note that this
number does not reflect the amount of memory that was freed and it is
not the peak memory usage of the Snap.  So, it is not necessarily a
metric that can be used to estimate the required size of a Snaplex
node.  Rather, the number provides an insight into how much memory had
to be allocated to process all of the documents.  For example, if the
total allocated was 5MB and the Snap processed 32 documents, then the
Snap allocated roughly 164KB per document.  When combined with the
other statistics, this number can help to identify the potential
causes of performance issues."
The part about not reflecting memory that was freed makes me somewhat
doubtful whether this actually reflects how much memory the process
currently holds.  Can you give some more insight there?

Apart from that, I just ran your code somewhat modified to make it
work without dependencies for 2 hours and saw no unusual memory
consumption, just a regular garbage collection sawtooth pattern. That
being said, I had to replace your actual processing with a simple
println, so if there is a memory leak in there I would of course not
have noticed.
I've uploaded the code I ran [1] for reference. For further analysis,
maybe you could run something similar with just a println or noop and
see if the symptoms persist, to localize the leak (if it exists).

Also, two random observations on your code:

KafkaConsumer.poll(Long timeout) is deprecated, you should consider
using the overloaded version with a Duration parameter instead.

The comment at [2] seems to contradict the following code, as the
offsets are only changed when in suggest mode. But as I have no idea
what suggest mode even is or all this means this observation may be
miles of point :)

I hope that helps a little.

Best regards,
Sönke

[1] https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
[2] 
https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86


On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
<syed.mudas...@gaianconsultants.com> wrote:
>
>
> Thanks,
>
>
>
> ---------- Forwarded message ---------
> From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
> Date: Tue, Feb 26, 2019 at 12:40 PM
> Subject: Apache Kafka Memory Leakage???
> To: <us...@kafka.apache.org>
> Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
>
>
> Hi Team,
>   I have a java application based out of latest Apache Kafka version 2.1.1.
>   I have a consumer application that runs infinitely to consume messages 
> whenever produced.
>   Sometimes there are no messages produced for hours.  Still, I see that the 
> memory allocated to consumer program is drastically increasing.
>   My code is as follows:
>
> AtomicBoolean isRunning = new AtomicBoolean(true);
>
> Properties kafkaProperties = new Properties();
>
> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
>
> kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
>
> kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, 
> UUID.randomUUID().toString());
> kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> AUTO_OFFSET_RESET_EARLIEST);
> consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, 
> keyDeserializer, valueDeserializer);
> if (topics != null) {
>     subscribeTopics(topics);
> }
>
>
>     boolean infiniteLoop = false;
>     boolean oneTimeMode = false;
>     int timeout = consumeTimeout;
>     if (isSuggest) {
>         //Configuration for suggest mode
>         oneTimeMode = true;
>         msgCount = 0;
>         timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
>     } else if (msgCount < 0) {
>         infiniteLoop = true;
>     } else if (msgCount == 0) {
>         oneTimeMode = true;
>     }
>     Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap();
>     do {
>             ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
>             for (final ConsumerRecord<byte[], byte[]> record : records) {
>                 if (!infiniteLoop && !oneTimeMode) {
>                     --msgCount;
>                     if (msgCount < 0) {
>                         break;
>                     }
>                 }
>                 outputViews.write(new BinaryOutput() {
>                     @Override
>                     public Document getHeader() {
>                         return generateHeader(record, oldHeader);
>                     }
>
>                     @Override
>                     public void write(WritableByteChannel writeChannel) 
> throws IOException {
>                         try (OutputStream os = 
> Channels.newOutputStream(writeChannel)) {
>                             os.write(record.value());
>                         }
>                     }
>                 });
>                 //The offset to commit should be the next offset of the 
> current one,
>                 // according to the API
>                 offsets.put(new TopicPartition(record.topic(), 
> record.partition()),
>                         new OffsetAndMetadata(record.offset() + 1));
>                 //In suggest mode, we should not change the current offset
>                 if (isSyncCommit && isSuggest) {
>                     commitOffset(offsets);
>                     offsets.clear();
>                 }
>             }
>      } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>
>
> See the screenshot below.  In about nineteen hours, it just consumed 5 
> messages but the memory allocated is 1.6GB.
>
>
> Any clues on how to get rid of memory issue?  Anything I need to do in the 
> program or is it a bug in the kafka library?
>
> Please rever ASAP.
>
>
> Thanks,
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

Reply via email to