Hello,

I am running stream processing job with Kafka and Flink.
Flink reads records from Kafka.

My software versions are:
- Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
- Kafka client library: 0.9.0.1
- Flink: 1.0.3

Now I have problem that Flink job is sometimes blocked and consumer lag
is increasing.
I got thread dump during the situation.

This is the blocked thread. Looks like blocked in KafkaConsumer.commitOffsets.

----
"Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
[0x00007f2b3ddfc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
        - waiting to lock <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
        - locked <0x0000000659111cc8> (a java.lang.Object)
        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
---

And lock 0x0000000659111b58 is held by the following thread.

---
"Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
[0x00007f2b3dbfa000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
        - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet)
        - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
        - locked <0x0000000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

I am wondering why Flink's kafka consumer is blocked and any advice
would be appreciated.

Thanks,
Hironori Ogibayashi

Reply via email to