Thanks, David, Piotr for your reply. I managed to capture the Thread dump from Jobmanaager UI for few task managers. Here is the thread dump for Kafka Source tasks in one task manager. I could see the same stack trace in other task managers as well. It seems like Kafka Source tasks are waiting on Memory. Any Pointers?
{ "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)", "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventTransition (6/12)\" Id=581 WAITING on java.lang.Object@444c0edc\n\tat java.lang.Object.wait(Native Method)\n\t- waiting on java.lang.Object@444c0edc\n\tat java.lang.Object.wait(Object.java:502)\n\tat org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" }, { "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)", "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature (7/12)\" Id=580 WAITING on java.lang.Object@7d3843a9\n\tat java.lang.Object.wait(Native Method)\n\t- waiting on java.lang.Object@7d3843a9\n\tat java.lang.Object.wait(Object.java:502)\n\tat org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" }, { "threadName": "Legacy Source Thread - Source: SourceEventSignature (7/12)", "stringifiedThreadInfo": "\"Legacy Source Thread - Source: SourceEventSignature (7/12)\" Id=408 WAITING on java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat sun.misc.Unsafe.park(Native Method)\n\t- waiting on java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" }, { "threadName": "Legacy Source Thread - Source: SourceEventTransition (6/12)", "stringifiedThreadInfo": "\"Legacy Source Thread - Source: SourceEventTransition (6/12)\" Id=409 WAITING on java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat sun.misc.Unsafe.park(Native Method)\n\t- waiting on java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" } On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > I'm not sure, maybe someone will be able to help you, but it sounds like > it would be better for you to: > - google search something like "Kafka Error sending fetch request > TimeoutException" (I see there are quite a lot of results, some of them > might be related) > - ask this question on the Kafka mailing list > - ask this question on stackoverflow as a Kafka question > > In short, FlinkKafkaConsumer is a very thin wrapper around the > KafkaConsumer class, so the thing you are observing has most likely very > little to do with the Flink itself. In other words, if you are observing > such a problem you most likely would be possible to reproduce it without > Flink. > > Best, > Piotrek > > pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com> > napisaĆ(a): > >> Hi, >> >> We have a Flink 1.11.1 Version streaming pipeline in production which >> reads from Kafka. >> Kafka Server version is 2.5.0 - confluent 5.5.0 >> Kafka Client Version is 2.4.1 - >> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka >> version: 2.4.1","method":"<init>"} >> >> Occasionally(every 6 to 12 hours), we have observed that the Kafka >> consumption rate went down(NOT 0) and the following logs were observed: >> Generally, the consumption rate across all consumers is 4k records/sec. >> When this issue occurred, the consumption rate dropped to < 50 records/sec >> >> org.apache.kafka.common.errors.DisconnectException: null >> >> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer >> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending >> fetch request (sessionId=405798138, epoch=5808) to node 8: >> {}.","method":"handleError"} >> >> org.apache.kafka.common.errors.TimeoutException: Failed >> >> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer >> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator >> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid, >> will attempt rediscovery","method":"markCoordinatorUnknown"} >> >> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer >> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group >> coordinator 100.98.40.16:9092 (id: 2147483623 rack: >> null)","method":"onSuccess"} >> >> The consumers retried for more than an hour but the above logs are >> observed again. >> The consumers started pulling data after a manual restart. >> >> No WARN or ERROR logs were observed in Kafka or Zookeeper during this >> period. >> >> Our observation from this incident is that Kafka Consumer retries could >> not resolve the issue but a manual restart (or) Flink internal >> restart(Failure rate restart policy) does. >> >> Has anyone faced this issue before? Any pointers are appreciated. >> >> Regards, >> Rahul >> >