Hi, Waiting for memory from LocalBufferPool is a perfectly normal symptom of a backpressure [1][2].
Best, Piotrek [1] https://flink.apache.org/2021/07/07/backpressure.html [2] https://www.ververica.com/blog/how-flink-handles-backpressure śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com> napisał(a): > Thanks, David, Piotr for your reply. > > I managed to capture the Thread dump from Jobmanaager UI for few task > managers. > Here is the thread dump for Kafka Source tasks in one task manager. I > could see the same stack trace in other task managers as well. It seems > like Kafka Source tasks are waiting on Memory. Any Pointers? > > { > "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)", > "stringifiedThreadInfo": "\"Kafka Fetcher for Source: > SourceEventTransition (6/12)\" Id=581 WAITING on > java.lang.Object@444c0edc\n\tat > java.lang.Object.wait(Native Method)\n\t- waiting on > java.lang.Object@444c0edc\n\tat > java.lang.Object.wait(Object.java:502)\n\tat > org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" > }, { > "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)", > "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature > (7/12)\" Id=580 WAITING on java.lang.Object@7d3843a9\n\tat > java.lang.Object.wait(Native Method)\n\t- waiting on > java.lang.Object@7d3843a9\n\tat > java.lang.Object.wait(Object.java:502)\n\tat > org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" > }, { > "threadName": "Legacy Source Thread - Source: SourceEventSignature (7/12)", > "stringifiedThreadInfo": "\"Legacy Source Thread - Source: > SourceEventSignature (7/12)\" Id=408 WAITING on > java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat > sun.misc.Unsafe.park(Native Method)\n\t- waiting on > java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" > }, { > "threadName": "Legacy Source Thread - Source: SourceEventTransition > (6/12)", > "stringifiedThreadInfo": "\"Legacy Source Thread - Source: > SourceEventTransition (6/12)\" Id=409 WAITING on > java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat > sun.misc.Unsafe.park(Native Method)\n\t- waiting on > java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" > } > > On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> I'm not sure, maybe someone will be able to help you, but it sounds like >> it would be better for you to: >> - google search something like "Kafka Error sending fetch request >> TimeoutException" (I see there are quite a lot of results, some of them >> might be related) >> - ask this question on the Kafka mailing list >> - ask this question on stackoverflow as a Kafka question >> >> In short, FlinkKafkaConsumer is a very thin wrapper around the >> KafkaConsumer class, so the thing you are observing has most likely very >> little to do with the Flink itself. In other words, if you are observing >> such a problem you most likely would be possible to reproduce it without >> Flink. >> >> Best, >> Piotrek >> >> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com> >> napisał(a): >> >>> Hi, >>> >>> We have a Flink 1.11.1 Version streaming pipeline in production which >>> reads from Kafka. >>> Kafka Server version is 2.5.0 - confluent 5.5.0 >>> Kafka Client Version is 2.4.1 - >>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka >>> version: 2.4.1","method":"<init>"} >>> >>> Occasionally(every 6 to 12 hours), we have observed that the Kafka >>> consumption rate went down(NOT 0) and the following logs were observed: >>> Generally, the consumption rate across all consumers is 4k records/sec. >>> When this issue occurred, the consumption rate dropped to < 50 records/sec >>> >>> org.apache.kafka.common.errors.DisconnectException: null >>> >>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer >>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending >>> fetch request (sessionId=405798138, epoch=5808) to node 8: >>> {}.","method":"handleError"} >>> >>> org.apache.kafka.common.errors.TimeoutException: Failed >>> >>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer >>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator >>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or >>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"} >>> >>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer >>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group >>> coordinator 100.98.40.16:9092 (id: 2147483623 rack: >>> null)","method":"onSuccess"} >>> >>> The consumers retried for more than an hour but the above logs are >>> observed again. >>> The consumers started pulling data after a manual restart. >>> >>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this >>> period. >>> >>> Our observation from this incident is that Kafka Consumer retries could >>> not resolve the issue but a manual restart (or) Flink internal >>> restart(Failure rate restart policy) does. >>> >>> Has anyone faced this issue before? Any pointers are appreciated. >>> >>> Regards, >>> Rahul >>> >>