Thanks for the feedback Piotrek.

We have observed the issue again today. As we are using Flink 1.11.1, I
tried to check the backpressure of Kafka source tasks from the
Jobmanager UI.
The backpressure request was canceled due to Timeout and "No Data" was
displayed in UI. Here are the respective logs:

java.util.concurrent.TimeoutException: Invocation of public abstract
java.util.concurrent.CompletableFuture
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.requestTaskBackPressure(org.apache.flink.runtime.executiongraph.ExecutionAttemptID,int,org.apache.flink.api.common.time.Time)
timed out.
at
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.requestTaskBackPressure(RpcTaskManagerGateway.java:67)
.............................
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@xX.X.X.X:X/user/rpc/taskmanager_0#-1457664622]]
after [15000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't send a
reply.
at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
.............................

During this time, the heartbeat of one of the Taskmanager to the Jobmanager
timed out. Here are the respective logs:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
bead57c15b447eac08531693ec91edc4 timed out. at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193)
..................

Because of heartbeat timeout, there was an internal restart of Flink and
the Kafka consumption rate recovered after the restart.

Could the backpressure timeout and heartbeat timeout be because of Heap
Usage close to Max configured?

On Wed, Jul 14, 2021 at 6:29 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi Rahul,
>
> I would highly doubt that you are hitting the network bottleneck case. It
> would require either a broken environment/network or throughputs in orders
> of GB/second. More likely you are seeing empty input pool and you haven't
> checked the documentation [1]:
>
> > inPoolUsage - An estimate of the input buffers usage. (ignores
> LocalInputChannels)
>
> If local channels are backpressured, inPoolUsage will be 0. You can check
> downstream task's inputQueueLength or isBackPressured metrics. Besides
> that, I would highly recommend upgrading to Flink 1.13.x if you are
> investigating backpressure problems as described in the blog post.
>
> > 1. Can the backpressure Cause "DisconnectException", "Error Sending
> Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>
> No, I don't think it's possible. Those two might be related to one another
> via some different deeper problem (broken network environment, something
> being overloaded), but I don't see a way how one could cause the other.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service
>
> śr., 14 lip 2021 o 14:18 Rahul Patwari <rahulpatwari8...@gmail.com>
> napisał(a):
>
>> Thanks, Piotrek.
>>
>> We have two Kafka sources. We are facing this issue for both of them. The
>> downstream tasks with the sources form two independent directed acyclic
>> graphs, running within the same Streaming Job.
>>
>> For Example:
>> source1 -> task1 -> sink1
>> source2 -> task2 -> sink2
>>
>> There is backpressure in both sources. Verified using the
>> "isBackPressured" metric.
>> For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of
>> immediate downstream task is ~ 0. I think we are observing the rare case
>> mentioned at the end in [1].
>>
>> I have a couple of questions:
>>
>>    1. Can the backpressure Cause "DisconnectException", "Error Sending
>>    Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>>    2. What could be the next steps in resolving the backpressure issue -
>>    the rare case
>>
>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>>
>> When the stream is running as expected, these are the thread dump of
>> Kafka Source tasks. Comparing the thread dumps - The "Kafka Fetcher"
>> thread, which polls records is blocked by "Legacy Source" Thread(main
>> Thread) - probably because of backpressure.
>>
>>   {
>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventSignature (8/12)\" Id=521 RUNNABLE\n\tat
>> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
>> sun.nio.ch.Util$3@1aaa5678\n\t-  locked
>> java.util.Collections$UnmodifiableSet@6c629341\n\t-  locked
>> sun.nio.ch.EPollSelectorImpl@3e783067\n\tat
>> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
>> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
>> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
>> of locked synchronizers = 1\n\t-
>> java.util.concurrent.locks.ReentrantLock$FairSync@462572c9\n\n"
>> }, {
>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (4/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventTransition (4/12)\" Id=520 RUNNABLE\n\tat
>> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
>> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
>> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
>> sun.nio.ch.Util$3@ef4e5e3\n\t-  locked
>> java.util.Collections$UnmodifiableSet@767487e7\n\t-  locked
>> sun.nio.ch.EPollSelectorImpl@9707a46\n\tat
>> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
>> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
>> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
>> of locked synchronizers = 1\n\t-
>> java.util.concurrent.locks.ReentrantLock$FairSync@642ee29b\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>> (8/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventSignature (8/12)\" Id=515 WAITING on 
>> java.lang.Object@4d5cc800\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@4d5cc800\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>> (4/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventTransition (4/12)\" Id=514 WAITING on 
>> java.lang.Object@1fc525f3\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@1fc525f3\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
>> }
>>
>> On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Waiting for memory from LocalBufferPool is a perfectly normal symptom of
>>> a backpressure [1][2].
>>>
>>> Best,
>>> Piotrek
>>>
>>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>>> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>>>
>>> śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com>
>>> napisał(a):
>>>
>>>> Thanks, David, Piotr for your reply.
>>>>
>>>> I managed to capture the Thread dump from Jobmanaager UI for few task
>>>> managers.
>>>> Here is the thread dump for Kafka Source tasks in one task manager. I
>>>> could see the same stack trace in other task managers as well. It seems
>>>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>>>
>>>>   {
>>>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>>>> java.lang.Object@444c0edc\n\tat
>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>> java.lang.Object@444c0edc\n\tat
>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>>> }, {
>>>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
>>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>>>> java.lang.Object@7d3843a9\n\tat
>>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>>> java.lang.Object@7d3843a9\n\tat
>>>> java.lang.Object.wait(Object.java:502)\n\tat
>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>>> }, {
>>>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>>>> (7/12)",
>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>> SourceEventSignature (7/12)\" Id=408 WAITING on
>>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>>> }, {
>>>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>>>> (6/12)",
>>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>>> SourceEventTransition (6/12)\" Id=409 WAITING on
>>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>>> }
>>>>
>>>> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm not sure, maybe someone will be able to help you, but it sounds
>>>>> like it would be better for you to:
>>>>> - google search something like "Kafka Error sending fetch request
>>>>> TimeoutException" (I see there are quite a lot of results, some of them
>>>>> might be related)
>>>>> - ask this question on the Kafka mailing list
>>>>> - ask this question on stackoverflow as a Kafka question
>>>>>
>>>>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>>>>> KafkaConsumer class, so the thing you are observing has most likely very
>>>>> little to do with the Flink itself. In other words, if you are observing
>>>>> such a problem you most likely would be possible to reproduce it without
>>>>> Flink.
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a Flink 1.11.1 Version streaming pipeline in production which
>>>>>> reads from Kafka.
>>>>>> Kafka Server version is 2.5.0 - confluent 5.5.0
>>>>>> Kafka Client Version is 2.4.1 - 
>>>>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>>>>>> version: 2.4.1","method":"<init>"}
>>>>>>
>>>>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka
>>>>>> consumption rate went down(NOT 0) and the following logs were observed:
>>>>>> Generally, the consumption rate across all consumers is 4k records/sec.
>>>>>> When this issue occurred, the consumption rate dropped to < 50 
>>>>>> records/sec
>>>>>>
>>>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>>>
>>>>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
>>>>>> fetch request (sessionId=405798138, epoch=5808) to node 8:
>>>>>> {}.","method":"handleError"}
>>>>>>
>>>>>> org.apache.kafka.common.errors.TimeoutException: Failed
>>>>>>
>>>>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group 
>>>>>> coordinator
>>>>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or
>>>>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>>>>>>
>>>>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
>>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
>>>>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
>>>>>> null)","method":"onSuccess"}
>>>>>>
>>>>>> The consumers retried for more than an hour but the above logs are
>>>>>> observed again.
>>>>>> The consumers started pulling data after a manual restart.
>>>>>>
>>>>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
>>>>>> period.
>>>>>>
>>>>>> Our observation from this incident is that Kafka Consumer retries
>>>>>> could not resolve the issue but a manual restart (or) Flink internal
>>>>>> restart(Failure rate restart policy) does.
>>>>>>
>>>>>> Has anyone faced this issue before? Any pointers are appreciated.
>>>>>>
>>>>>> Regards,
>>>>>> Rahul
>>>>>>
>>>>>

Reply via email to