Ok, thanks for the update. Great that you managed to resolve this issue :)

Best,
Piotrek

pon., 19 lip 2021 o 17:13 Rahul Patwari <rahulpatwari8...@gmail.com>
napisał(a):

> Hi Piotrek,
>
> I was just about to update.
> You are right. The issue is because of a stalled task manager due to High
> Heap Usage. And the High Heap Usage is because of a Memory Leak in a
> library we are using.
>
> Thanks for your help.
>
> On Mon, Jul 19, 2021 at 8:31 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Thanks for the update.
>>
>> > Could the backpressure timeout and heartbeat timeout be because of
>> Heap Usage close to Max configured?
>>
>> Could be. This is one of the things I had in mind under overloaded in:
>>
>> > might be related to one another via some different deeper problem
>> (broken network environment, something being overloaded)
>>
>> You can easily diagnose it. Just attach a memory profiler or check gc
>> logs, just as you would normally do when debugging a non-Flink standalone
>> Java application.
>>
>> It can also be a symptom of a failing network environment. I would first
>> check for GC pauses/stops/gaps in the logs that would indicate stalled JVM
>> caused those RPC timeouts. If that doesn't bring you closer to a solution I
>> would then check for the network environment in your cluster/cloud. Both of
>> those might be a reason behind your Kafka issues. Hard to tell. Definitely
>> you shouldn't have heartbeat timeouts in your cluster, so something IS
>> wrong with your setup.
>>
>> Best,
>> Piotrek
>>
>> czw., 15 lip 2021 o 17:17 Rahul Patwari <rahulpatwari8...@gmail.com>
>> napisał(a):
>>
>>> Thanks for the feedback Piotrek.
>>>
>>> We have observed the issue again today. As we are using Flink 1.11.1, I
>>> tried to check the backpressure of Kafka source tasks from the
>>> Jobmanager UI.
>>> The backpressure request was canceled due to Timeout and "No Data" was
>>> displayed in UI. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Invocation of public abstract
>>> java.util.concurrent.CompletableFuture
>>> org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
>>> timed out.
>>> at
>>> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
>>> .............................
>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
>>> after [15000 ms]. Message of type
>>> [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
>>> reason for `AskTimeoutException` is that the recipient actor didn't send a
>>> reply.
>>> at
>>> akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
>>> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>>> .............................
>>>
>>> During this time, the heartbeat of one of the Taskmanager to the
>>> Jobmanager timed out. Here are the respective logs:
>>>
>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>>> bead57c15b447eac08531693ec91edc4 timed out. at
>>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
>>> ..................
>>>
>>> Because of heartbeat timeout, there was an internal restart of Flink
>>> and the Kafka consumption rate recovered after the restart.
>>>
>>> Could the backpressure timeout and heartbeat timeout be because of Heap
>>> Usage close to Max configured?
>>>
>>> On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> I would highly doubt that you are hitting the network bottleneck case.
>>>> It would require either a broken environment/network or throughputs in
>>>> orders of GB/second. More likely you are seeing empty input pool and you
>>>> haven't checked the documentation [1]:
>>>>
>>>> > inPoolUsage - An estimate of the input buffers usage. (ignores
>>>> LocalInputChannels)
>>>>
>>>> If local channels are backpressured, inPoolUsage will be 0. You can
>>>> check downstream task's inputQueueLength or isBackPressured metrics.
>>>> Besides that, I would highly recommend upgrading to Flink 1.13.x if you are
>>>> investigating backpressure problems as described in the blog post.
>>>>
>>>> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
>>>> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>>>
>>>> No, I don't think it's possible. Those two might be related to one
>>>> another via some different deeper problem (broken network environment,
>>>> something being overloaded), but I don't see a way how one could cause the
>>>> other.
>>>>
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>>>>
>>>> śr., 14 lip 2021 o 14:18 Rahul Patwari <rahulpatwari8...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Thanks, Piotrek.
>>>>>
>>>>> We have two Kafka sources. We are facing this issue for both of them.
>>>>> The downstream tasks with the sources form two independent directed 
>>>>> acyclic
>>>>> graphs, running within the same Streaming Job.
>>>>>
>>>>> For Example:
>>>>> source1 -> task1 -> sink1
>>>>> source2 -> task2 -> sink2
>>>>>
>>>>> There is backpressure in both sources. Verified using the
>>>>> "isBackPressured" metric.
>>>>> For one of the sources, "outPoolUsage" is high whereas "inPoolUsage"
>>>>> of immediate downstream task is ~ 0. I think we are observing the rare 
>>>>> case
>>>>> mentioned at the end in [1].
>>>>>
>>>>> I have a couple of questions:
>>>>>
>>>>>    1. Can the backpressure Cause "DisconnectException", "Error
>>>>>    Sending Fetch Request to node ..." and other Kafka Consumer logs 
>>>>> mentioned
>>>>>    above?
>>>>>    2. What could be the next steps in resolving the backpressure
>>>>>    issue - the rare case
>>>>>
>>>>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>>>>>
>>>>> When the stream is running as expected, these are the thread dump of
>>>>> Kafka Source tasks. Comparing the thread dumps - The "Kafka Fetcher"
>>>>> thread, which polls records is blocked by "Legacy Source" Thread(main
>>>>> Thread) - probably because of backpressure.
>>>>>
>>>>>   {
>>>>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)",
>>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>>> SourceEventSignature (8/12)\" Id=521 RUNNABLE\n\tat
>>>>> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
>>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
>>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
>>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
>>>>> sun.nio.ch.Util$3@1aaa5678\n\t-  locked
>>>>> java.util.Collections$UnmodifiableSet@6c629341\n\t-  locked
>>>>> sun.nio.ch.EPollSelectorImpl@3e783067\n\tat
>>>>> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
>>>>> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
>>>>> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
>>>>> of locked synchronizers = 1\n\t-
>>>>> java.util.concurrent.locks.ReentrantLock$FairSync@462572c9\n\n"
>>>>> }, {
>>>>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (4/12)",
>>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>>> SourceEventTransition (4/12)\" Id=520 RUNNABLE\n\tat
>>>>> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
>>>>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
>>>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
>>>>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
>>>>> sun.nio.ch.Util$3@ef4e5e3\n\t-  locked
>>>>> java.util.Collections$UnmodifiableSet@767487e7\n\t-  locked
>>>>> sun.nio.ch.EPollSelectorImpl@9707a46\n\tat
>>>>> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
>>>>> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
>>>>> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
>>>>> of locked synchronizers = 1\n\t-
>>>>> java.util.concurrent.locks.ReentrantLock$FairSync@642ee29b\n\n"
>>>>> }, {
>>>>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>>>>> (8/12)",
>>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>>> SourceEventSignature (8/12)\" Id=515 WAITING on 
>>>>> java.lang.Object@4d5cc800\n\tat
>>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>>> java.lang.Object@4d5cc800\n\tat
>>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
>>>>> }, {
>>>>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>>>>> (4/12)",
>>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>>> SourceEventTransition (4/12)\" Id=514 WAITING on 
>>>>> java.lang.Object@1fc525f3\n\tat
>>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>>> java.lang.Object@1fc525f3\n\tat
>>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
>>>>> }
>>>>>
>>>>> On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski <pnowoj...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Waiting for memory from LocalBufferPool is a perfectly normal symptom
>>>>>> of a backpressure [1][2].
>>>>>>
>>>>>> Best,
>>>>>> Piotrek
>>>>>>
>>>>>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>>>>>> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>>>>>>
>>>>>> śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com>
>>>>>> napisał(a):
>>>>>>
>>>>>>> Thanks, David, Piotr for your reply.
>>>>>>>
>>>>>>> I managed to capture the Thread dump from Jobmanaager UI for few
>>>>>>> task managers.
>>>>>>> Here is the thread dump for Kafka Source tasks in one task manager.
>>>>>>> I could see the same stack trace in other task managers as well. It 
>>>>>>> seems
>>>>>>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>>>>>>
>>>>>>>   {
>>>>>>> "threadName": "Kafka Fetcher for Source: SourceEventTransition
>>>>>>> (6/12)",
>>>>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>>>>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>>>>>>> java.lang.Object@444c0edc\n\tat
>>>>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>>>>> java.lang.Object@444c0edc\n\tat
>>>>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>>>>>> }, {
>>>>>>> "threadName": "Kafka Fetcher for Source: SourceEventSignature
>>>>>>> (7/12)",
>>>>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>>>>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>>>>>>> java.lang.Object@7d3843a9\n\tat
>>>>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>>>>> java.lang.Object@7d3843a9\n\tat
>>>>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>>>>>> }, {
>>>>>>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>>>>>>> (7/12)",
>>>>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>>>>> SourceEventSignature (7/12)\" Id=408 WAITING on
>>>>>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>>>>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>>>>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>>>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>>>>>> }, {
>>>>>>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>>>>>>> (6/12)",
>>>>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>>>>> SourceEventTransition (6/12)\" Id=409 WAITING on
>>>>>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>>>>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>>>>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>>>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>>>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>>>>>> }
>>>>>>>
>>>>>>> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm not sure, maybe someone will be able to help you, but it sounds
>>>>>>>> like it would be better for you to:
>>>>>>>> - google search something like "Kafka Error sending fetch request
>>>>>>>> TimeoutException" (I see there are quite a lot of results, some of them
>>>>>>>> might be related)
>>>>>>>> - ask this question on the Kafka mailing list
>>>>>>>> - ask this question on stackoverflow as a Kafka question
>>>>>>>>
>>>>>>>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>>>>>>>> KafkaConsumer class, so the thing you are observing has most likely 
>>>>>>>> very
>>>>>>>> little to do with the Flink itself. In other words, if you are 
>>>>>>>> observing
>>>>>>>> such a problem you most likely would be possible to reproduce it 
>>>>>>>> without
>>>>>>>> Flink.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
>>>>>>>> napisał(a):
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have a Flink 1.11.1 Version streaming pipeline in production
>>>>>>>>> which reads from Kafka.
>>>>>>>>> Kafka Server version is 2.5.0 - confluent 5.5.0
>>>>>>>>> Kafka Client Version is 2.4.1 - 
>>>>>>>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>>>>>>>>> version: 2.4.1","method":"<init>"}
>>>>>>>>>
>>>>>>>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka
>>>>>>>>> consumption rate went down(NOT 0) and the following logs were 
>>>>>>>>> observed:
>>>>>>>>> Generally, the consumption rate across all consumers is 4k 
>>>>>>>>> records/sec.
>>>>>>>>> When this issue occurred, the consumption rate dropped to < 50 
>>>>>>>>> records/sec
>>>>>>>>>
>>>>>>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>>>>>>
>>>>>>>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>>>>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
>>>>>>>>> fetch request (sessionId=405798138, epoch=5808) to node 8:
>>>>>>>>> {}.","method":"handleError"}
>>>>>>>>>
>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Failed
>>>>>>>>>
>>>>>>>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>>>>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group 
>>>>>>>>> coordinator
>>>>>>>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or
>>>>>>>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>>>>>>>>>
>>>>>>>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
>>>>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered 
>>>>>>>>> group
>>>>>>>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
>>>>>>>>> null)","method":"onSuccess"}
>>>>>>>>>
>>>>>>>>> The consumers retried for more than an hour but the above logs are
>>>>>>>>> observed again.
>>>>>>>>> The consumers started pulling data after a manual restart.
>>>>>>>>>
>>>>>>>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during
>>>>>>>>> this period.
>>>>>>>>>
>>>>>>>>> Our observation from this incident is that Kafka Consumer retries
>>>>>>>>> could not resolve the issue but a manual restart (or) Flink internal
>>>>>>>>> restart(Failure rate restart policy) does.
>>>>>>>>>
>>>>>>>>> Has anyone faced this issue before? Any pointers are appreciated.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Rahul
>>>>>>>>>
>>>>>>>>

Reply via email to