Sonke, I am not blaming apache-kafka for the tickets raised by our customers. I am saying there could be an issue in kafka-clients library causing resource/memory leak. If that issue is resolved, I can resolve my tickets as well automatically. I don't find any issue with the snaplogic code. Since I am in touch with developers of kafka-clients thru this email, I am looking forward to contribute as much as I can to betterize the kafka-clients library. What the steps next to confirm its a bug in kafka-clients? And if its a bug whats the process to get it resolved?
Thanks, On Tue, Mar 5, 2019 at 2:43 PM Sönke Liebau <soenke.lie...@opencore.com.invalid> wrote: > Hi Syed, > > Apache Kafka is an open source software that comes as is without any > support attached to it. It may well be that this is a bug in the Kafka > client library, though tbh I doubt that from what my tests have shown and > since I think someone else would have noticed this as well. > Even if this is a bug though, there is no obligation on anyone to fix > this. Any bugs your customer raised with you are between you and them and > nothing to do with Apache Kafka. > > While I am happy to assist you with this I, like most people on this list, > do this in my spare time as well, which means that my time to spend on this > is limited. > > That being said, could you please host the image externally somewhere > (imgur or something similar), it doesn't appear to have gone through the > list. > > What input parameters are you using for isSuggest, messageCount and > synccommit when you run the code? > > Best regards, > Sönke > > > > > On Tue, Mar 5, 2019 at 9:14 AM Syed Mudassir Ahmed < > syed.mudas...@gaianconsultants.com> wrote: > >> Sonke, >> This issue seems serious. Customers raised bug with our product. And >> I suspect the bug is in apache-kafka clients library. >> I executed the kafka reader without any snaplogic-specific code. There >> were hardly about twenty messages in the topics. The code consumed about >> 300MB of memory in about 2 hours. >> Please find attached the screenshot. >> Can we pls get on a call and arrive at the conclusion? I still argue >> its a bug in the kafka-clients library. >> >> Thanks, >> >> >> >> On Mon, Mar 4, 2019 at 8:33 PM Sönke Liebau >> <soenke.lie...@opencore.com.invalid> wrote: >> >>> Hi Syed, >>> >>> and you are sure that this memory is actually allocated? I still have my >>> reservations about that metric to be honest. Is there any way to connect to >>> the process with for example jconsole and having a look at memory >>> consumption in there? >>> Or alternatively, since the code you have sent is not relying on >>> SnapLogic anymore, can you just run it as a standalone application and >>> check memory consumption? >>> >>> That code looks very similar to what I ran (without knowing your input >>> parameters for issuggest et. al of course) and for me memory consumption >>> stayed between 120mb and 200mb. >>> >>> Best regards, >>> Sönke >>> >>> >>> On Mon, Mar 4, 2019 at 1:44 PM Syed Mudassir Ahmed < >>> syed.mudas...@gaianconsultants.com> wrote: >>> >>>> Sonke, >>>> thanks again. >>>> Yes, I replaced the non-kafka code from our end with a simple Sysout >>>> statement as follows: >>>> >>>> do { >>>> ConsumerRecords<byte[], byte[]> records = >>>> consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS)); >>>> for (final ConsumerRecord<byte[], byte[]> record : records) { >>>> if (!infiniteLoop && !oneTimeMode) { >>>> --msgCount; >>>> if (msgCount < 0) { >>>> break; >>>> } >>>> } >>>> Debugger.doPrint("value read:<" + record.value() + ">"); >>>> /*outputViews.write(new BinaryOutput() { >>>> @Override >>>> public Document getHeader() { >>>> return generateHeader(record, oldHeader); >>>> } >>>> >>>> @Override >>>> public void write(WritableByteChannel writeChannel) throws >>>> IOException { >>>> try (OutputStream os = >>>> Channels.newOutputStream(writeChannel)) { >>>> os.write(record.value()); >>>> } >>>> } >>>> });*/ >>>> //The offset to commit should be the next offset of the current >>>> one, >>>> // according to the API >>>> offsets.put(new TopicPartition(record.topic(), record.partition()), >>>> new OffsetAndMetadata(record.offset() + 1)); >>>> //In suggest mode, we should not change the current offset >>>> if (isSyncCommit && isSuggest) { >>>> commitOffset(offsets); >>>> offsets.clear(); >>>> } >>>> } >>>> } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); >>>> >>>> >>>> *Note: *Debugger is a wrapper class that just writes the given string to a >>>> local file using PrintStream's println() method. >>>> >>>> And I don't see any diff in the metrics. I still see the huge amount >>>> of memory allocated. >>>> >>>> See the image attached. >>>> >>>> >>>> Thanks, >>>> >>>> >>>> >>>> On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau >>>> <soenke.lie...@opencore.com.invalid> wrote: >>>> >>>>> Hi Syed, >>>>> >>>>> let's keep it on the list for now so that everybody can participate :) >>>>> >>>>> The different .poll() method was just an unrelated observation, the >>>>> main points of my mail were the question about whether this is the >>>>> correct metric you are looking at and replacing the payload of your >>>>> code with a println statement to remove non-Kafka code from your >>>>> program and make sure that the leak is not in there. Have you tried >>>>> that? >>>>> >>>>> Best regards, >>>>> Sönke >>>>> >>>>> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed >>>>> <syed.mudas...@gaianconsultants.com> wrote: >>>>> > >>>>> > Sonke, >>>>> > Thanks so much for the reply. I used the new version of >>>>> poll(Duration) method. Still, I see memory issue. >>>>> > Is there a way we can get on a one-one call and discuss this pls? >>>>> Let me know your availability. I can share zoom meeting link. >>>>> > >>>>> > Thanks, >>>>> > >>>>> > >>>>> > >>>>> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau < >>>>> soenke.lie...@opencore.com.invalid> wrote: >>>>> >> >>>>> >> Hi Syed, >>>>> >> >>>>> >> from your screenshot I assume that you are using SnapLogic to run >>>>> your >>>>> >> code (full disclosure: I do not have the faintest idea of this >>>>> >> product!). I've just had a look at the docs and am a bit confused by >>>>> >> their explanation of the metric that you point out in your image >>>>> >> "Memory Allocated". The docs say: "The Memory Allocated reflects the >>>>> >> number of bytes that were allocated by the Snap. Note that this >>>>> >> number does not reflect the amount of memory that was freed and it >>>>> is >>>>> >> not the peak memory usage of the Snap. So, it is not necessarily a >>>>> >> metric that can be used to estimate the required size of a Snaplex >>>>> >> node. Rather, the number provides an insight into how much memory >>>>> had >>>>> >> to be allocated to process all of the documents. For example, if >>>>> the >>>>> >> total allocated was 5MB and the Snap processed 32 documents, then >>>>> the >>>>> >> Snap allocated roughly 164KB per document. When combined with the >>>>> >> other statistics, this number can help to identify the potential >>>>> >> causes of performance issues." >>>>> >> The part about not reflecting memory that was freed makes me >>>>> somewhat >>>>> >> doubtful whether this actually reflects how much memory the process >>>>> >> currently holds. Can you give some more insight there? >>>>> >> >>>>> >> Apart from that, I just ran your code somewhat modified to make it >>>>> >> work without dependencies for 2 hours and saw no unusual memory >>>>> >> consumption, just a regular garbage collection sawtooth pattern. >>>>> That >>>>> >> being said, I had to replace your actual processing with a simple >>>>> >> println, so if there is a memory leak in there I would of course not >>>>> >> have noticed. >>>>> >> I've uploaded the code I ran [1] for reference. For further >>>>> analysis, >>>>> >> maybe you could run something similar with just a println or noop >>>>> and >>>>> >> see if the symptoms persist, to localize the leak (if it exists). >>>>> >> >>>>> >> Also, two random observations on your code: >>>>> >> >>>>> >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider >>>>> >> using the overloaded version with a Duration parameter instead. >>>>> >> >>>>> >> The comment at [2] seems to contradict the following code, as the >>>>> >> offsets are only changed when in suggest mode. But as I have no idea >>>>> >> what suggest mode even is or all this means this observation may be >>>>> >> miles of point :) >>>>> >> >>>>> >> I hope that helps a little. >>>>> >> >>>>> >> Best regards, >>>>> >> Sönke >>>>> >> >>>>> >> [1] >>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983 >>>>> >> [2] >>>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86 >>>>> >> >>>>> >> >>>>> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed >>>>> >> <syed.mudas...@gaianconsultants.com> wrote: >>>>> >> > >>>>> >> > >>>>> >> > Thanks, >>>>> >> > >>>>> >> > >>>>> >> > >>>>> >> > ---------- Forwarded message --------- >>>>> >> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> >>>>> >> > Date: Tue, Feb 26, 2019 at 12:40 PM >>>>> >> > Subject: Apache Kafka Memory Leakage??? >>>>> >> > To: <us...@kafka.apache.org> >>>>> >> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> >>>>> >> > >>>>> >> > >>>>> >> > Hi Team, >>>>> >> > I have a java application based out of latest Apache Kafka >>>>> version 2.1.1. >>>>> >> > I have a consumer application that runs infinitely to consume >>>>> messages whenever produced. >>>>> >> > Sometimes there are no messages produced for hours. Still, I >>>>> see that the memory allocated to consumer program is drastically >>>>> increasing. >>>>> >> > My code is as follows: >>>>> >> > >>>>> >> > AtomicBoolean isRunning = new AtomicBoolean(true); >>>>> >> > >>>>> >> > Properties kafkaProperties = new Properties(); >>>>> >> > >>>>> >> > >>>>> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>> brokers); >>>>> >> > >>>>> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); >>>>> >> > >>>>> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, >>>>> UUID.randomUUID().toString()); >>>>> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >>>>> false); >>>>> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>>>> AUTO_OFFSET_RESET_EARLIEST); >>>>> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, >>>>> keyDeserializer, valueDeserializer); >>>>> >> > if (topics != null) { >>>>> >> > subscribeTopics(topics); >>>>> >> > } >>>>> >> > >>>>> >> > >>>>> >> > boolean infiniteLoop = false; >>>>> >> > boolean oneTimeMode = false; >>>>> >> > int timeout = consumeTimeout; >>>>> >> > if (isSuggest) { >>>>> >> > //Configuration for suggest mode >>>>> >> > oneTimeMode = true; >>>>> >> > msgCount = 0; >>>>> >> > timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS; >>>>> >> > } else if (msgCount < 0) { >>>>> >> > infiniteLoop = true; >>>>> >> > } else if (msgCount == 0) { >>>>> >> > oneTimeMode = true; >>>>> >> > } >>>>> >> > Map<TopicPartition, OffsetAndMetadata> offsets = >>>>> Maps.newHashMap(); >>>>> >> > do { >>>>> >> > ConsumerRecords<byte[], byte[]> records = >>>>> consumer.poll(timeout); >>>>> >> > for (final ConsumerRecord<byte[], byte[]> record : >>>>> records) { >>>>> >> > if (!infiniteLoop && !oneTimeMode) { >>>>> >> > --msgCount; >>>>> >> > if (msgCount < 0) { >>>>> >> > break; >>>>> >> > } >>>>> >> > } >>>>> >> > outputViews.write(new BinaryOutput() { >>>>> >> > @Override >>>>> >> > public Document getHeader() { >>>>> >> > return generateHeader(record, oldHeader); >>>>> >> > } >>>>> >> > >>>>> >> > @Override >>>>> >> > public void write(WritableByteChannel >>>>> writeChannel) throws IOException { >>>>> >> > try (OutputStream os = >>>>> Channels.newOutputStream(writeChannel)) { >>>>> >> > os.write(record.value()); >>>>> >> > } >>>>> >> > } >>>>> >> > }); >>>>> >> > //The offset to commit should be the next offset >>>>> of the current one, >>>>> >> > // according to the API >>>>> >> > offsets.put(new TopicPartition(record.topic(), >>>>> record.partition()), >>>>> >> > new OffsetAndMetadata(record.offset() + >>>>> 1)); >>>>> >> > //In suggest mode, we should not change the >>>>> current offset >>>>> >> > if (isSyncCommit && isSuggest) { >>>>> >> > commitOffset(offsets); >>>>> >> > offsets.clear(); >>>>> >> > } >>>>> >> > } >>>>> >> > } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); >>>>> >> > >>>>> >> > >>>>> >> > See the screenshot below. In about nineteen hours, it just >>>>> consumed 5 messages but the memory allocated is 1.6GB. >>>>> >> > >>>>> >> > >>>>> >> > Any clues on how to get rid of memory issue? Anything I need to >>>>> do in the program or is it a bug in the kafka library? >>>>> >> > >>>>> >> > Please rever ASAP. >>>>> >> > >>>>> >> > >>>>> >> > Thanks, >>>>> >> > >>>>> >> >>>>> >> >>>>> >> -- >>>>> >> Sönke Liebau >>>>> >> Partner >>>>> >> Tel. +49 179 7940878 >>>>> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - >>>>> Germany >>>>> >>>>> >>>>> >>>>> -- >>>>> Sönke Liebau >>>>> Partner >>>>> Tel. +49 179 7940878 >>>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >>>>> >>>> >>> >>> -- >>> Sönke Liebau >>> Partner >>> Tel. +49 179 7940878 >>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >>> >> > > -- > Sönke Liebau > Partner > Tel. +49 179 7940878 > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >