[ https://issues.apache.org/jira/browse/KAFKA-3552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249065#comment-15249065 ]
Kanak Biscuitwala commented on KAFKA-3552: ------------------------------------------ In fact, I'm reasonably certain there is some kind of leak. The physical memory usage of my service goes from 1.5G to over 3G over time, and this is even when I'm keeping up. When I'm not keeping up, this can balloon to 5G, even though the maximum number of bytes I'm supposed to get back is only 1MB (and presumably if I pause all partitions first, I'll get back 0 bytes?). > New Consumer: java.lang.OutOfMemoryError: Direct buffer memory > -------------------------------------------------------------- > > Key: KAFKA-3552 > URL: https://issues.apache.org/jira/browse/KAFKA-3552 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1 > Reporter: Kanak Biscuitwala > Assignee: Liquan Pei > Attachments: Screen Shot 2016-04-13 at 11.56.05 AM.png, Screen Shot > 2016-04-13 at 2.17.48 PM.png > > > I'm running Kafka's new consumer with message handlers that can sometimes > take a lot of time to return, and combining that with manual offset > management (to get at-least-once semantics). Since poll() is the only way to > heartbeat with the consumer, I have a thread that runs every 500 milliseconds > that does the following: > 1) Pause all partitions > 2) Call poll(0) > 3) Resume all partitions > For the record, all accesses to KafkaConsumer are protected by synchronized > blocks. This generally works, but I'm occasionally seeing messages like this: > {code} > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > {code} > In addition, when I'm reporting offsets, I'm seeing: > {code} > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > {code} > Given that I'm just calling the library, this behavior is unexpected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)