[ https://issues.apache.org/jira/browse/KAFKA-3552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Liquan Pei reassigned KAFKA-3552: --------------------------------- Assignee: Liquan Pei (was: Neha Narkhede) > New Consumer: java.lang.OutOfMemoryError: Direct buffer memory > -------------------------------------------------------------- > > Key: KAFKA-3552 > URL: https://issues.apache.org/jira/browse/KAFKA-3552 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.9.0.1 > Reporter: Kanak Biscuitwala > Assignee: Liquan Pei > > I'm running Kafka's new consumer with message handlers that can sometimes > take a lot of time to return, and combining that with manual offset > management (to get at-least-once semantics). Since poll() is the only way to > heartbeat with the consumer, I have a thread that runs every 500 milliseconds > that does the following: > 1) Pause all partitions > 2) Call poll(0) > 3) Resume all partitions > For the record, all accesses to KafkaConsumer are protected by synchronized > blocks. This generally works, but I'm occasionally seeing messages like this: > {code} > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > {code} > In addition, when I'm reporting offsets, I'm seeing: > {code} > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968) > {code} > Given that I'm just calling the library, this behavior is unexpected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)