Kanak Biscuitwala created KAFKA-3552:
----------------------------------------

             Summary: New Consumer: java.lang.OutOfMemoryError: Direct buffer 
memory
                 Key: KAFKA-3552
                 URL: https://issues.apache.org/jira/browse/KAFKA-3552
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.9.0.1
            Reporter: Kanak Biscuitwala
            Assignee: Neha Narkhede


I'm running Kafka's new consumer with message handlers that can sometimes take 
a lot of time to return, and combining that with manual offset management (to 
get at-least-once semantics). Since poll() is the only way to heartbeat with 
the consumer, I have a thread that runs every 500 milliseconds that does the 
following:

1) Pause all partitions
2) Call poll(0)
3) Resume all partitions

For the record, all accesses to KafkaConsumer are protected by synchronized 
blocks. This generally works, but I'm occasionally seeing messages like this:

{code}
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
        at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
{code}

In addition, when I'm reporting offsets, I'm seeing:
{code}
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
        at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
{code}

Given that I'm just calling the library, this behavior is unexpected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to