Hi,

Waiting for memory from LocalBufferPool is a perfectly normal symptom of a
backpressure [1][2].

Best,
Piotrek

[1] https://flink.apache.org/2021/07/07/backpressure.html
[2] https://www.ververica.com/blog/how-flink-handles-backpressure

śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com>
napisał(a):

> Thanks, David, Piotr for your reply.
>
> I managed to capture the Thread dump from Jobmanaager UI for few task
> managers.
> Here is the thread dump for Kafka Source tasks in one task manager. I
> could see the same stack trace in other task managers as well. It seems
> like Kafka Source tasks are waiting on Memory. Any Pointers?
>
>   {
> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
> SourceEventTransition (6/12)\" Id=581 WAITING on 
> java.lang.Object@444c0edc\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@444c0edc\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
> }, {
> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature
> (7/12)\" Id=580 WAITING on java.lang.Object@7d3843a9\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@7d3843a9\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventSignature (7/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventSignature (7/12)\" Id=408 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventTransition
> (6/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventTransition (6/12)\" Id=409 WAITING on
> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
> }
>
> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> I'm not sure, maybe someone will be able to help you, but it sounds like
>> it would be better for you to:
>> - google search something like "Kafka Error sending fetch request
>> TimeoutException" (I see there are quite a lot of results, some of them
>> might be related)
>> - ask this question on the Kafka mailing list
>> - ask this question on stackoverflow as a Kafka question
>>
>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>> KafkaConsumer class, so the thing you are observing has most likely very
>> little to do with the Flink itself. In other words, if you are observing
>> such a problem you most likely would be possible to reproduce it without
>> Flink.
>>
>> Best,
>> Piotrek
>>
>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
>> napisał(a):
>>
>>> Hi,
>>>
>>> We have a Flink 1.11.1 Version streaming pipeline in production which
>>> reads from Kafka.
>>> Kafka Server version is 2.5.0 - confluent 5.5.0
>>> Kafka Client Version is 2.4.1 - 
>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>>> version: 2.4.1","method":"<init>"}
>>>
>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka
>>> consumption rate went down(NOT 0) and the following logs were observed:
>>> Generally, the consumption rate across all consumers is 4k records/sec.
>>> When this issue occurred, the consumption rate dropped to < 50 records/sec
>>>
>>> org.apache.kafka.common.errors.DisconnectException: null
>>>
>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
>>> fetch request (sessionId=405798138, epoch=5808) to node 8:
>>> {}.","method":"handleError"}
>>>
>>> org.apache.kafka.common.errors.TimeoutException: Failed
>>>
>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or
>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>>>
>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
>>> null)","method":"onSuccess"}
>>>
>>> The consumers retried for more than an hour but the above logs are
>>> observed again.
>>> The consumers started pulling data after a manual restart.
>>>
>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
>>> period.
>>>
>>> Our observation from this incident is that Kafka Consumer retries could
>>> not resolve the issue but a manual restart (or) Flink internal
>>> restart(Failure rate restart policy) does.
>>>
>>> Has anyone faced this issue before? Any pointers are appreciated.
>>>
>>> Regards,
>>> Rahul
>>>
>>

Reply via email to