Thanks, Piotrek.

We have two Kafka sources. We are facing this issue for both of them. The
downstream tasks with the sources form two independent directed acyclic
graphs, running within the same Streaming Job.

For Example:
source1 -> task1 -> sink1
source2 -> task2 -> sink2

There is backpressure in both sources. Verified using the "isBackPressured"
metric.
For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of
immediate downstream task is ~ 0. I think we are observing the rare case
mentioned at the end in [1].

I have a couple of questions:

   1. Can the backpressure Cause "DisconnectException", "Error Sending
   Fetch Request to node ..." and other Kafka Consumer logs mentioned above?
   2. What could be the next steps in resolving the backpressure issue -
   the rare case

[1] https://flink.apache.org/2021/07/07/backpressure.html

When the stream is running as expected, these are the thread dump of Kafka
Source tasks. Comparing the thread dumps - The "Kafka Fetcher" thread,
which polls records is blocked by "Legacy Source" Thread(main Thread) -
probably because of backpressure.

  {
"threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)",
"stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature
(8/12)\" Id=521 RUNNABLE\n\tat
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
sun.nio.ch.Util$3@1aaa5678\n\t-  locked
java.util.Collections$UnmodifiableSet@6c629341\n\t-  locked
sun.nio.ch.EPollSelectorImpl@3e783067\n\tat
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
of locked synchronizers = 1\n\t-
java.util.concurrent.locks.ReentrantLock$FairSync@462572c9\n\n"
}, {
"threadName": "Kafka Fetcher for Source: SourceEventTransition (4/12)",
"stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventTransition
(4/12)\" Id=520 RUNNABLE\n\tat
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t-  locked
sun.nio.ch.Util$3@ef4e5e3\n\t-  locked
java.util.Collections$UnmodifiableSet@767487e7\n\t-  locked
sun.nio.ch.EPollSelectorImpl@9707a46\n\tat
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat
org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat
org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber
of locked synchronizers = 1\n\t-
java.util.concurrent.locks.ReentrantLock$FairSync@642ee29b\n\n"
}, {
"threadName": "Legacy Source Thread - Source: SourceEventSignature (8/12)",
"stringifiedThreadInfo": "\"Legacy Source Thread - Source:
SourceEventSignature (8/12)\" Id=515 WAITING on java.lang.Object@4d5cc800\n\tat
java.lang.Object.wait(Native Method)\n\t-  waiting on
java.lang.Object@4d5cc800\n\tat
java.lang.Object.wait(Object.java:502)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
}, {
"threadName": "Legacy Source Thread - Source: SourceEventTransition (4/12)",
"stringifiedThreadInfo": "\"Legacy Source Thread - Source:
SourceEventTransition (4/12)\" Id=514 WAITING on
java.lang.Object@1fc525f3\n\tat
java.lang.Object.wait(Native Method)\n\t-  waiting on
java.lang.Object@1fc525f3\n\tat
java.lang.Object.wait(Object.java:502)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n"
}

On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> Waiting for memory from LocalBufferPool is a perfectly normal symptom of a
> backpressure [1][2].
>
> Best,
> Piotrek
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
> [2] https://www.ververica.com/blog/how-flink-handles-backpressure
>
> śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com>
> napisał(a):
>
>> Thanks, David, Piotr for your reply.
>>
>> I managed to capture the Thread dump from Jobmanaager UI for few task
>> managers.
>> Here is the thread dump for Kafka Source tasks in one task manager. I
>> could see the same stack trace in other task managers as well. It seems
>> like Kafka Source tasks are waiting on Memory. Any Pointers?
>>
>>   {
>> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventTransition (6/12)\" Id=581 WAITING on 
>> java.lang.Object@444c0edc\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@444c0edc\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>> }, {
>> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)",
>> "stringifiedThreadInfo": "\"Kafka Fetcher for Source:
>> SourceEventSignature (7/12)\" Id=580 WAITING on 
>> java.lang.Object@7d3843a9\n\tat
>> java.lang.Object.wait(Native Method)\n\t-  waiting on
>> java.lang.Object@7d3843a9\n\tat
>> java.lang.Object.wait(Object.java:502)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventSignature
>> (7/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventSignature (7/12)\" Id=408 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>> }, {
>> "threadName": "Legacy Source Thread - Source: SourceEventTransition
>> (6/12)",
>> "stringifiedThreadInfo": "\"Legacy Source Thread - Source:
>> SourceEventTransition (6/12)\" Id=409 WAITING on
>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>> sun.misc.Unsafe.park(Native Method)\n\t-  waiting on
>> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat
>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n"
>> }
>>
>> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm not sure, maybe someone will be able to help you, but it sounds like
>>> it would be better for you to:
>>> - google search something like "Kafka Error sending fetch request
>>> TimeoutException" (I see there are quite a lot of results, some of them
>>> might be related)
>>> - ask this question on the Kafka mailing list
>>> - ask this question on stackoverflow as a Kafka question
>>>
>>> In short, FlinkKafkaConsumer is a very thin wrapper around the
>>> KafkaConsumer class, so the thing you are observing has most likely very
>>> little to do with the Flink itself. In other words, if you are observing
>>> such a problem you most likely would be possible to reproduce it without
>>> Flink.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com>
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>> We have a Flink 1.11.1 Version streaming pipeline in production which
>>>> reads from Kafka.
>>>> Kafka Server version is 2.5.0 - confluent 5.5.0
>>>> Kafka Client Version is 2.4.1 - 
>>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
>>>> version: 2.4.1","method":"<init>"}
>>>>
>>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka
>>>> consumption rate went down(NOT 0) and the following logs were observed:
>>>> Generally, the consumption rate across all consumers is 4k records/sec.
>>>> When this issue occurred, the consumption rate dropped to < 50 records/sec
>>>>
>>>> org.apache.kafka.common.errors.DisconnectException: null
>>>>
>>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending
>>>> fetch request (sessionId=405798138, epoch=5808) to node 8:
>>>> {}.","method":"handleError"}
>>>>
>>>> org.apache.kafka.common.errors.TimeoutException: Failed
>>>>
>>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator
>>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or
>>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>>>>
>>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
>>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3,
>>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group
>>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack:
>>>> null)","method":"onSuccess"}
>>>>
>>>> The consumers retried for more than an hour but the above logs are
>>>> observed again.
>>>> The consumers started pulling data after a manual restart.
>>>>
>>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
>>>> period.
>>>>
>>>> Our observation from this incident is that Kafka Consumer retries could
>>>> not resolve the issue but a manual restart (or) Flink internal
>>>> restart(Failure rate restart policy) does.
>>>>
>>>> Has anyone faced this issue before? Any pointers are appreciated.
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>

Reply via email to