Sonke,
  I am not blaming apache-kafka for the tickets raised by our customers.  I
am saying there could be an issue in kafka-clients library causing
resource/memory leak.  If that issue is resolved, I can resolve my tickets
as well automatically.  I don't find any issue with the snaplogic code.
  Since I am in touch with developers of kafka-clients thru this email, I
am looking forward to contribute as much as I can to betterize the
kafka-clients library.
  What the steps next to confirm its a bug in kafka-clients?  And if its a
bug whats the process to get it resolved?

Thanks,



On Tue, Mar 5, 2019 at 2:43 PM Sönke Liebau
<soenke.lie...@opencore.com.invalid> wrote:

> Hi Syed,
>
> Apache Kafka is an open source software that comes as is without any
> support attached to it. It may well be that this is a bug in the Kafka
> client library, though tbh I doubt that from what my tests have shown and
> since I think someone else would have noticed this as well.
> Even if this is a bug though, there is no obligation on anyone to fix
> this. Any bugs your customer raised with you are between you and them and
> nothing to do with Apache Kafka.
>
> While I am happy to assist you with this I, like most people on this list,
> do this in my spare time as well, which means that my time to spend on this
> is limited.
>
> That being said, could you please host the image externally somewhere
> (imgur or something similar), it doesn't appear to have gone through the
> list.
>
> What input parameters are you using for isSuggest, messageCount and
> synccommit when you run the code?
>
> Best regards,
> Sönke
>
>
>
>
> On Tue, Mar 5, 2019 at 9:14 AM Syed Mudassir Ahmed <
> syed.mudas...@gaianconsultants.com> wrote:
>
>> Sonke,
>>   This issue seems serious.  Customers raised bug with our product.  And
>> I suspect the bug is in apache-kafka clients library.
>>   I executed the kafka reader without any snaplogic-specific code.  There
>> were hardly about twenty messages in the topics.  The code consumed about
>> 300MB of memory in about 2 hours.
>>   Please find attached the screenshot.
>>   Can we pls get on a call and arrive at the conclusion?  I still argue
>> its a bug in the kafka-clients library.
>>
>> Thanks,
>>
>>
>>
>> On Mon, Mar 4, 2019 at 8:33 PM Sönke Liebau
>> <soenke.lie...@opencore.com.invalid> wrote:
>>
>>> Hi Syed,
>>>
>>> and you are sure that this memory is actually allocated? I still have my
>>> reservations about that metric to be honest. Is there any way to connect to
>>> the process with for example jconsole and having a look at memory
>>> consumption in there?
>>> Or alternatively, since the code you have sent is not relying on
>>> SnapLogic anymore, can you just run it as a standalone application and
>>> check memory consumption?
>>>
>>> That code looks very similar to what I ran (without knowing your input
>>> parameters for issuggest et. al of course) and for me memory consumption
>>> stayed between 120mb and 200mb.
>>>
>>> Best regards,
>>> Sönke
>>>
>>>
>>> On Mon, Mar 4, 2019 at 1:44 PM Syed Mudassir Ahmed <
>>> syed.mudas...@gaianconsultants.com> wrote:
>>>
>>>> Sonke,
>>>>   thanks again.
>>>>   Yes, I replaced the non-kafka code from our end with a simple Sysout
>>>> statement as follows:
>>>>
>>>> do {
>>>>     ConsumerRecords<byte[], byte[]> records = 
>>>> consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS));
>>>>     for (final ConsumerRecord<byte[], byte[]> record : records) {
>>>>         if (!infiniteLoop && !oneTimeMode) {
>>>>             --msgCount;
>>>>             if (msgCount < 0) {
>>>>                 break;
>>>>             }
>>>>         }
>>>>         Debugger.doPrint("value read:<" + record.value() + ">");
>>>>         /*outputViews.write(new BinaryOutput() {
>>>>             @Override
>>>>             public Document getHeader() {
>>>>                 return generateHeader(record, oldHeader);
>>>>             }
>>>>
>>>>             @Override
>>>>             public void write(WritableByteChannel writeChannel) throws 
>>>> IOException {
>>>>                 try (OutputStream os = 
>>>> Channels.newOutputStream(writeChannel)) {
>>>>                     os.write(record.value());
>>>>                 }
>>>>             }
>>>>         });*/
>>>>         //The offset to commit should be the next offset of the current 
>>>> one,
>>>>         // according to the API
>>>>         offsets.put(new TopicPartition(record.topic(), record.partition()),
>>>>                 new OffsetAndMetadata(record.offset() + 1));
>>>>         //In suggest mode, we should not change the current offset
>>>>         if (isSyncCommit && isSuggest) {
>>>>             commitOffset(offsets);
>>>>             offsets.clear();
>>>>         }
>>>>     }
>>>> } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>>>>
>>>>
>>>> *Note: *Debugger is a wrapper class that just writes the given string to a 
>>>> local file using PrintStream's println() method.
>>>>
>>>> And I don't see any diff in the metrics.  I still see the huge amount
>>>> of memory allocated.
>>>>
>>>> See the image attached.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>> On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau
>>>> <soenke.lie...@opencore.com.invalid> wrote:
>>>>
>>>>> Hi Syed,
>>>>>
>>>>> let's keep it on the list for now so that everybody can participate :)
>>>>>
>>>>> The different .poll() method was just an unrelated observation, the
>>>>> main points of my mail were the question about whether this is the
>>>>> correct metric you are looking at and replacing the payload of your
>>>>> code with a println statement to remove non-Kafka code from your
>>>>> program and make sure that the leak is not in there. Have you tried
>>>>> that?
>>>>>
>>>>> Best regards,
>>>>> Sönke
>>>>>
>>>>> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed
>>>>> <syed.mudas...@gaianconsultants.com> wrote:
>>>>> >
>>>>> > Sonke,
>>>>> >   Thanks so much for the reply.  I used the new version of
>>>>> poll(Duration) method.  Still, I see memory issue.
>>>>> >   Is there a way we can get on a one-one call and discuss this pls?
>>>>> Let me know your availability.  I can share zoom meeting link.
>>>>> >
>>>>> > Thanks,
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau <
>>>>> soenke.lie...@opencore.com.invalid> wrote:
>>>>> >>
>>>>> >> Hi Syed,
>>>>> >>
>>>>> >> from your screenshot I assume that you are using SnapLogic to run
>>>>> your
>>>>> >> code (full disclosure: I do not have the faintest idea of this
>>>>> >> product!). I've just had a look at the docs and am a bit confused by
>>>>> >> their explanation of the metric that you point out in your image
>>>>> >> "Memory Allocated". The docs say: "The Memory Allocated reflects the
>>>>> >> number of bytes that were allocated by the Snap.  Note that this
>>>>> >> number does not reflect the amount of memory that was freed and it
>>>>> is
>>>>> >> not the peak memory usage of the Snap.  So, it is not necessarily a
>>>>> >> metric that can be used to estimate the required size of a Snaplex
>>>>> >> node.  Rather, the number provides an insight into how much memory
>>>>> had
>>>>> >> to be allocated to process all of the documents.  For example, if
>>>>> the
>>>>> >> total allocated was 5MB and the Snap processed 32 documents, then
>>>>> the
>>>>> >> Snap allocated roughly 164KB per document.  When combined with the
>>>>> >> other statistics, this number can help to identify the potential
>>>>> >> causes of performance issues."
>>>>> >> The part about not reflecting memory that was freed makes me
>>>>> somewhat
>>>>> >> doubtful whether this actually reflects how much memory the process
>>>>> >> currently holds.  Can you give some more insight there?
>>>>> >>
>>>>> >> Apart from that, I just ran your code somewhat modified to make it
>>>>> >> work without dependencies for 2 hours and saw no unusual memory
>>>>> >> consumption, just a regular garbage collection sawtooth pattern.
>>>>> That
>>>>> >> being said, I had to replace your actual processing with a simple
>>>>> >> println, so if there is a memory leak in there I would of course not
>>>>> >> have noticed.
>>>>> >> I've uploaded the code I ran [1] for reference. For further
>>>>> analysis,
>>>>> >> maybe you could run something similar with just a println or noop
>>>>> and
>>>>> >> see if the symptoms persist, to localize the leak (if it exists).
>>>>> >>
>>>>> >> Also, two random observations on your code:
>>>>> >>
>>>>> >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider
>>>>> >> using the overloaded version with a Duration parameter instead.
>>>>> >>
>>>>> >> The comment at [2] seems to contradict the following code, as the
>>>>> >> offsets are only changed when in suggest mode. But as I have no idea
>>>>> >> what suggest mode even is or all this means this observation may be
>>>>> >> miles of point :)
>>>>> >>
>>>>> >> I hope that helps a little.
>>>>> >>
>>>>> >> Best regards,
>>>>> >> Sönke
>>>>> >>
>>>>> >> [1]
>>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
>>>>> >> [2]
>>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86
>>>>> >>
>>>>> >>
>>>>> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
>>>>> >> <syed.mudas...@gaianconsultants.com> wrote:
>>>>> >> >
>>>>> >> >
>>>>> >> > Thanks,
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > ---------- Forwarded message ---------
>>>>> >> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
>>>>> >> > Date: Tue, Feb 26, 2019 at 12:40 PM
>>>>> >> > Subject: Apache Kafka Memory Leakage???
>>>>> >> > To: <us...@kafka.apache.org>
>>>>> >> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
>>>>> >> >
>>>>> >> >
>>>>> >> > Hi Team,
>>>>> >> >   I have a java application based out of latest Apache Kafka
>>>>> version 2.1.1.
>>>>> >> >   I have a consumer application that runs infinitely to consume
>>>>> messages whenever produced.
>>>>> >> >   Sometimes there are no messages produced for hours.  Still, I
>>>>> see that the memory allocated to consumer program is drastically 
>>>>> increasing.
>>>>> >> >   My code is as follows:
>>>>> >> >
>>>>> >> > AtomicBoolean isRunning = new AtomicBoolean(true);
>>>>> >> >
>>>>> >> > Properties kafkaProperties = new Properties();
>>>>> >> >
>>>>> >> >
>>>>> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> brokers);
>>>>> >> >
>>>>> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
>>>>> >> >
>>>>> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
>>>>> UUID.randomUUID().toString());
>>>>> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>>>> false);
>>>>> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>>>> AUTO_OFFSET_RESET_EARLIEST);
>>>>> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
>>>>> keyDeserializer, valueDeserializer);
>>>>> >> > if (topics != null) {
>>>>> >> >     subscribeTopics(topics);
>>>>> >> > }
>>>>> >> >
>>>>> >> >
>>>>> >> >     boolean infiniteLoop = false;
>>>>> >> >     boolean oneTimeMode = false;
>>>>> >> >     int timeout = consumeTimeout;
>>>>> >> >     if (isSuggest) {
>>>>> >> >         //Configuration for suggest mode
>>>>> >> >         oneTimeMode = true;
>>>>> >> >         msgCount = 0;
>>>>> >> >         timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
>>>>> >> >     } else if (msgCount < 0) {
>>>>> >> >         infiniteLoop = true;
>>>>> >> >     } else if (msgCount == 0) {
>>>>> >> >         oneTimeMode = true;
>>>>> >> >     }
>>>>> >> >     Map<TopicPartition, OffsetAndMetadata> offsets =
>>>>> Maps.newHashMap();
>>>>> >> >     do {
>>>>> >> >             ConsumerRecords<byte[], byte[]> records =
>>>>> consumer.poll(timeout);
>>>>> >> >             for (final ConsumerRecord<byte[], byte[]> record :
>>>>> records) {
>>>>> >> >                 if (!infiniteLoop && !oneTimeMode) {
>>>>> >> >                     --msgCount;
>>>>> >> >                     if (msgCount < 0) {
>>>>> >> >                         break;
>>>>> >> >                     }
>>>>> >> >                 }
>>>>> >> >                 outputViews.write(new BinaryOutput() {
>>>>> >> >                     @Override
>>>>> >> >                     public Document getHeader() {
>>>>> >> >                         return generateHeader(record, oldHeader);
>>>>> >> >                     }
>>>>> >> >
>>>>> >> >                     @Override
>>>>> >> >                     public void write(WritableByteChannel
>>>>> writeChannel) throws IOException {
>>>>> >> >                         try (OutputStream os =
>>>>> Channels.newOutputStream(writeChannel)) {
>>>>> >> >                             os.write(record.value());
>>>>> >> >                         }
>>>>> >> >                     }
>>>>> >> >                 });
>>>>> >> >                 //The offset to commit should be the next offset
>>>>> of the current one,
>>>>> >> >                 // according to the API
>>>>> >> >                 offsets.put(new TopicPartition(record.topic(),
>>>>> record.partition()),
>>>>> >> >                         new OffsetAndMetadata(record.offset() +
>>>>> 1));
>>>>> >> >                 //In suggest mode, we should not change the
>>>>> current offset
>>>>> >> >                 if (isSyncCommit && isSuggest) {
>>>>> >> >                     commitOffset(offsets);
>>>>> >> >                     offsets.clear();
>>>>> >> >                 }
>>>>> >> >             }
>>>>> >> >      } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>>>>> >> >
>>>>> >> >
>>>>> >> > See the screenshot below.  In about nineteen hours, it just
>>>>> consumed 5 messages but the memory allocated is 1.6GB.
>>>>> >> >
>>>>> >> >
>>>>> >> > Any clues on how to get rid of memory issue?  Anything I need to
>>>>> do in the program or is it a bug in the kafka library?
>>>>> >> >
>>>>> >> > Please rever ASAP.
>>>>> >> >
>>>>> >> >
>>>>> >> > Thanks,
>>>>> >> >
>>>>> >>
>>>>> >>
>>>>> >> --
>>>>> >> Sönke Liebau
>>>>> >> Partner
>>>>> >> Tel. +49 179 7940878
>>>>> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel -
>>>>> Germany
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sönke Liebau
>>>>> Partner
>>>>> Tel. +49 179 7940878
>>>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>>>>
>>>>
>>>
>>> --
>>> Sönke Liebau
>>> Partner
>>> Tel. +49 179 7940878
>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>>
>>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>

Reply via email to