Hi Rahul,

I would highly doubt that you are hitting the network bottleneck case. It
would require either a broken environment/network or throughputs in orders
of GB/second. More likely you are seeing empty input pool and you haven't
checked the documentation [1]:

> inPoolUsage - An estimate of the input buffers usage. (ignores
LocalInputChannels)

If local channels are backpressured, inPoolUsage will be 0. You can check
downstream task's inputQueueLength or isBackPressured metrics. Besides
that, I would highly recommend upgrading to Flink 1.13.x if you are
investigating backpressure problems as described in the blog post.

> 1. Can the backpressure Cause "DisconnectException", "Error Sending Fetch
Request to node ..." and other Kafka Consumer logs mentioned above?

No, I don't think it's possible. Those two might be related to one another
via some different deeper problem (broken network environment, something
being overloaded), but I don't see a way how one could cause the other.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#default-shuffle-service

śr., 14 lip 2021 o 14:18 Rahul Patwari <rahulpatwari8...@gmail.com>
napisał(a):

> Thanks, Piotrek.
>
> We have two Kafka sources. We are facing this issue for both of them. The
> downstream tasks with the sources form two independent directed acyclic
> graphs, running within the same Streaming Job.
>
> For Example:
> source1 -> task1 -> sink1
> source2 -> task2 -> sink2
>
> There is backpressure in both sources. Verified using the
> "isBackPressured" metric.
> For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of
> immediate downstream task is ~ 0. I think we are observing the rare case
> mentioned at the end in [1].
>
> I have a couple of questions:
>
>    1. Can the backpressure Cause "DisconnectException", "Error Sending
>    Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
>    2. What could be the next steps in resolving the backpressure issue -
>    the rare case
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
>
> When the stream is running as expected, these are the thread dump of Kafka
> Source tasks. Comparing the thread dumps - The "Kafka Fetcher" thread,
> which polls records is blocked by "Legacy Source" Thread(main Thread) -
> probably because of backpressure.
>
>   {
> "threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature
> (8/12)\" Id=521 RUNNABLE\n\tat
> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
> sun.nio.ch.Util$3@1aaa5678\n\t-  locked
> java.util.Collections$UnmodifiableSet@6c629341\n\t-  locked
> sun.nio.ch.EPollSelectorImpl@3e783067\n\tat
> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
> of locked synchronizers = 1\n\t-
> java.util.concurrent.locks.ReentrantLock$FairSync@462572c9\n\n"
> }, {
> "threadName": "Kafka Fetcher for Source: SourceEventTransition (4/12)",
> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
> SourceEventTransition (4/12)\" Id=520 RUNNABLE\n\tat
> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
> sun.nio.ch.Util$3@ef4e5e3\n\t-  locked
> java.util.Collections$UnmodifiableSet@767487e7\n\t-  locked
> sun.nio.ch.EPollSelectorImpl@9707a46\n\tat
> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
> org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
> org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
> of locked synchronizers = 1\n\t-
> java.util.concurrent.locks.ReentrantLock$FairSync@642ee29b\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventSignature (8/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventSignature (8/12)\" Id=515 WAITING on 
> java.lang.Object@4d5cc800\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@4d5cc800\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
> }, {
> "threadName": "Legacy Source Thread - Source: SourceEventTransition
> (4/12)",
> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
> SourceEventTransition (4/12)\" Id=514 WAITING on 
> java.lang.Object@1fc525f3\n\tat
> java.lang.Object.wait(Native Method)\n\t-  waiting on
> java.lang.Object@1fc525f3\n\tat
> java.lang.Object.wait(Object.java:502)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
> }
>
> On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> Waiting for memory from LocalBufferPool is a perfectly normal symptom of
>> a backpressure [1][2].
>>
>> Best,
>> Piotrek
>>
>> [1] https://flink.apache.org/2021/07/07/backpressure.html
>> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>>
>> śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com>
>> napisał(a):
>>
>>> Thanks, David, Piotr for your reply.
>>>
>>> I managed to capture the Thread dump from Jobmanaager UI for few task
>>> managers.
>>> Here is the thread dump for Kafka Source tasks in one task manager. I
>>> could see the same stack trace in other task managers as well. It seems
>>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>>
>>>   {
>>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>>> java.lang.Object@444c0edc\n\tat
>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>> java.lang.Object@444c0edc\n\tat
>>> java.lang.Object.wait(Object.java:502)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>> }, {
>>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
>>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>>> java.lang.Object@7d3843a9\n\tat
>>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>>> java.lang.Object@7d3843a9\n\tat
>>> java.lang.Object.wait(Object.java:502)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>>> }, {
>>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>>> (7/12)",
>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>> SourceEventSignature (7/12)\" Id=408 WAITING on
>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>> }, {
>>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>>> (6/12)",
>>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>>> SourceEventTransition (6/12)\" Id=409 WAITING on
>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>>> }
>>>
>>> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm not sure, maybe someone will be able to help you, but it sounds
>>>> like it would be better for you to:
>>>> - google search something like "Kafka Error sending fetch request
>>>> TimeoutException" (I see there are quite a lot of results, some of them
>>>> might be related)
>>>> - ask this question on the Kafka mailing list
>>>> - ask this question on stackoverflow as a Kafka question
>>>>
>>>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>>>> KafkaConsumer class, so the thing you are observing has most likely very
>>>> little to do with the Flink itself. In other words, if you are observing
>>>> such a problem you most likely would be possible to reproduce it without
>>>> Flink.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
>>>> napisał(a):
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a Flink 1.11.1 Version streaming pipeline in production which
>>>>> reads from Kafka.
>>>>> Kafka Server version is 2.5.0 - confluent 5.5.0
>>>>> Kafka Client Version is 2.4.1 - 
>>>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>>>>> version: 2.4.1","method":"<init>"}
>>>>>
>>>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka
>>>>> consumption rate went down(NOT 0) and the following logs were observed:
>>>>> Generally, the consumption rate across all consumers is 4k records/sec.
>>>>> When this issue occurred, the consumption rate dropped to < 50 records/sec
>>>>>
>>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>>
>>>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
>>>>> fetch request (sessionId=405798138, epoch=5808) to node 8:
>>>>> {}.","method":"handleError"}
>>>>>
>>>>> org.apache.kafka.common.errors.TimeoutException: Failed
>>>>>
>>>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
>>>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or
>>>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>>>>>
>>>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
>>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
>>>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
>>>>> null)","method":"onSuccess"}
>>>>>
>>>>> The consumers retried for more than an hour but the above logs are
>>>>> observed again.
>>>>> The consumers started pulling data after a manual restart.
>>>>>
>>>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
>>>>> period.
>>>>>
>>>>> Our observation from this incident is that Kafka Consumer retries
>>>>> could not resolve the issue but a manual restart (or) Flink internal
>>>>> restart(Failure rate restart policy) does.
>>>>>
>>>>> Has anyone faced this issue before? Any pointers are appreciated.
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>

Reply via email to