Thanks,
---------- Forwarded message --------- From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> Date: Tue, Feb 26, 2019 at 12:40 PM Subject: Apache Kafka Memory Leakage??? To: <us...@kafka.apache.org> Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> Hi Team, I have a java application based out of latest Apache Kafka version 2.1.1. I have a consumer application that runs infinitely to consume messages whenever produced. Sometimes there are no messages produced for hours. Still, I see that the memory allocated to consumer program is drastically increasing. My code is as follows: AtomicBoolean isRunning = new AtomicBoolean(true); Properties kafkaProperties = new Properties(); kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_EARLIEST); consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, keyDeserializer, valueDeserializer); if (topics != null) { subscribeTopics(topics); } boolean infiniteLoop = false; boolean oneTimeMode = false; int timeout = consumeTimeout; if (isSuggest) { //Configuration for suggest mode oneTimeMode = true; msgCount = 0; timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS; } else if (msgCount < 0) { infiniteLoop = true; } else if (msgCount == 0) { oneTimeMode = true; } Map<TopicPartition, OffsetAndMetadata> offsets = Maps.newHashMap(); do { ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout); for (final ConsumerRecord<byte[], byte[]> record : records) { if (!infiniteLoop && !oneTimeMode) { --msgCount; if (msgCount < 0) { break; } } outputViews.write(new BinaryOutput() { @Override public Document getHeader() { return generateHeader(record, oldHeader); } @Override public void write(WritableByteChannel writeChannel) throws IOException { try (OutputStream os = Channels.newOutputStream(writeChannel)) { os.write(record.value()); } } }); //The offset to commit should be the next offset of the current one, // according to the API offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); //In suggest mode, we should not change the current offset if (isSyncCommit && isSuggest) { commitOffset(offsets); offsets.clear(); } } } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); See the screenshot below. In about nineteen hours, it just consumed 5 messages but the memory allocated is 1.6GB. [image: image.png] Any clues on how to get rid of memory issue? Anything I need to do in the program or is it a bug in the kafka library? Please rever ASAP. Thanks,