Emanuel Velzi created KAFKA-13623: ------------------------------------- Summary: Memory leak with multiple poll Key: KAFKA-13623 URL: https://issues.apache.org/jira/browse/KAFKA-13623 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.8.1, 2.4.1 Reporter: Emanuel Velzi
Hi, I'm experiencing a kind of memory leak with this simple consumer. Some info before the code: - kafka-clients.version: I try with 2.4.1 and 2.8.1 I only set these properties: - bootstrap.servers: my-servers - group.id: my-group-id - auto.offset.reset: earliest - enable.auto.commit: false - heartbeat.interval.ms: 300 My topic has NUM_PARTITIONS=48 partitions: {code:java} public class Test { /* ... */ public void start() { for (int i = 0; i < NUM_PARTITIONS; i++) { startOne(); } } public void startOne() { LOGGER.info("startOne"); this.pool.submit(this::startConsumer; } public void startConsumer() { var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer) try { consumer.subscribe(Collections.singletonList(this.topic)); consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error en pool"); } finally { consumer.close(); } scheduleNewConsumer(); } private void scheduleNewConsumer() { scheduledExecutorService.schedule(() -> startOne(), Duration.ofSeconds(2).toMillis(), TimeUnit.MILLISECONDS); } } {code} In summary, when I have some error processing a record, then I close de consumer and retry, starting a new one. In that moment de Direct Memory used by de java process starts to grow up indefinitly, until the process is killed. I test some other strategies. For example: - no close the consumer, and reuse it with a seek(..) - no close the consumer, and reuse it doing: consumer.unsubscribe(); and consumer.subscribe(..); In both cases the memory leak was slower, but it happened anyway. Also I tried this: ... {code:java} public void startConsumer() { var consumer = new KafkaConsumer<>(this.kafkaConfiguration, this.stringDeserializer, this.stringDeserializer) try { consumer.subscribe(Collections.singletonList(this.topic)); // NO POLL HERE: consumer.poll(Duration.ofSeconds(30)); throw new RuntimeException("Some kind of error"); } catch (Exception e) { LOGGER.error("Error en pool"); } finally { consumer.unsubscribe(); consumer.subscribe(Collections.singletonList(this.topic)); } scheduleNewConsumer(); }{code} ... I mean, multiple times I'm subscribing and unsubscribing the consumer, without poll anything. In those cases I don't experience the memory leak. So, I imagine that the problem is the poll itself. Someone can help me with this please? -- This message was sent by Atlassian Jira (v8.20.1#820001)