Hi,

I'm running Kafka's new consumer with message handlers that can sometimes take 
a lot of time to return, and combining that with manual offset management (to 
get at-least-once semantics). Since poll() is the only way to heartbeat with 
the consumer, I have a thread that runs every 500 milliseconds that does the 
following:

1) Pause all partitions
2) Call poll(0)
3) Resume all partitions

This generally works, but I'm occasionally seeing messages like this:

java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
        at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)

Given this trace, I have the following questions:

1) Is there data loss when this happens?
2) How can I stop triggering this error?

Thanks,
Kanak                                     

Reply via email to