Thanks, Piotrek. We have two Kafka sources. We are facing this issue for both of them. The downstream tasks with the sources form two independent directed acyclic graphs, running within the same Streaming Job.
For Example: source1 -> task1 -> sink1 source2 -> task2 -> sink2 There is backpressure in both sources. Verified using the "isBackPressured" metric. For one of the sources, "outPoolUsage" is high whereas "inPoolUsage" of immediate downstream task is ~ 0. I think we are observing the rare case mentioned at the end in [1]. I have a couple of questions: 1. Can the backpressure Cause "DisconnectException", "Error Sending Fetch Request to node ..." and other Kafka Consumer logs mentioned above? 2. What could be the next steps in resolving the backpressure issue - the rare case [1] https://flink.apache.org/2021/07/07/backpressure.html When the stream is running as expected, these are the thread dump of Kafka Source tasks. Comparing the thread dumps - The "Kafka Fetcher" thread, which polls records is blocked by "Legacy Source" Thread(main Thread) - probably because of backpressure. { "threadName": "Kafka Fetcher for Source: SourceEventSignature (8/12)", "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventSignature (8/12)\" Id=521 RUNNABLE\n\tat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t- locked sun.nio.ch.Util$3@1aaa5678\n\t- locked java.util.Collections$UnmodifiableSet@6c629341\n\t- locked sun.nio.ch.EPollSelectorImpl@3e783067\n\tat sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber of locked synchronizers = 1\n\t- java.util.concurrent.locks.ReentrantLock$FairSync@462572c9\n\n" }, { "threadName": "Kafka Fetcher for Source: SourceEventTransition (4/12)", "stringifiedThreadInfo": "\"Kafka Fetcher for Source: SourceEventTransition (4/12)\" Id=520 RUNNABLE\n\tat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)\n\tat sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)\n\tat sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)\n\tat sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)\n\t- locked sun.nio.ch.Util$3@ef4e5e3\n\t- locked java.util.Collections$UnmodifiableSet@767487e7\n\t- locked sun.nio.ch.EPollSelectorImpl@9707a46\n\tat sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)\n\tat org.apache.kafka.common.network.Selector.select(Selector.java:794)\n\tat org.apache.kafka.common.network.Selector.poll(Selector.java:467)\n\tat org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)\n\t...\n\n\tNumber of locked synchronizers = 1\n\t- java.util.concurrent.locks.ReentrantLock$FairSync@642ee29b\n\n" }, { "threadName": "Legacy Source Thread - Source: SourceEventSignature (8/12)", "stringifiedThreadInfo": "\"Legacy Source Thread - Source: SourceEventSignature (8/12)\" Id=515 WAITING on java.lang.Object@4d5cc800\n\tat java.lang.Object.wait(Native Method)\n\t- waiting on java.lang.Object@4d5cc800\n\tat java.lang.Object.wait(Object.java:502)\n\tat org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n" }, { "threadName": "Legacy Source Thread - Source: SourceEventTransition (4/12)", "stringifiedThreadInfo": "\"Legacy Source Thread - Source: SourceEventTransition (4/12)\" Id=514 WAITING on java.lang.Object@1fc525f3\n\tat java.lang.Object.wait(Native Method)\n\t- waiting on java.lang.Object@1fc525f3\n\tat java.lang.Object.wait(Object.java:502)\n\tat org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74)\n\tat org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:133)\n\tat org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)\n\tat org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)\n\tat org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)\n\tat org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)\n\n" } On Wed, Jul 14, 2021 at 2:39 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > Waiting for memory from LocalBufferPool is a perfectly normal symptom of a > backpressure [1][2]. > > Best, > Piotrek > > [1] https://flink.apache.org/2021/07/07/backpressure.html > [2] https://www.ververica.com/blog/how-flink-handles-backpressure > > śr., 14 lip 2021 o 06:05 Rahul Patwari <rahulpatwari8...@gmail.com> > napisał(a): > >> Thanks, David, Piotr for your reply. >> >> I managed to capture the Thread dump from Jobmanaager UI for few task >> managers. >> Here is the thread dump for Kafka Source tasks in one task manager. I >> could see the same stack trace in other task managers as well. It seems >> like Kafka Source tasks are waiting on Memory. Any Pointers? >> >> { >> "threadName": "Kafka Fetcher for Source: SourceEventTransition (6/12)", >> "stringifiedThreadInfo": "\"Kafka Fetcher for Source: >> SourceEventTransition (6/12)\" Id=581 WAITING on >> java.lang.Object@444c0edc\n\tat >> java.lang.Object.wait(Native Method)\n\t- waiting on >> java.lang.Object@444c0edc\n\tat >> java.lang.Object.wait(Object.java:502)\n\tat >> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" >> }, { >> "threadName": "Kafka Fetcher for Source: SourceEventSignature (7/12)", >> "stringifiedThreadInfo": "\"Kafka Fetcher for Source: >> SourceEventSignature (7/12)\" Id=580 WAITING on >> java.lang.Object@7d3843a9\n\tat >> java.lang.Object.wait(Native Method)\n\t- waiting on >> java.lang.Object@7d3843a9\n\tat >> java.lang.Object.wait(Object.java:502)\n\tat >> org.apache.flink.streaming.connectors.kafka.internal.Handover.produce(Handover.java:117)\n\tat >> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:261)\n\n" >> }, { >> "threadName": "Legacy Source Thread - Source: SourceEventSignature >> (7/12)", >> "stringifiedThreadInfo": "\"Legacy Source Thread - Source: >> SourceEventSignature (7/12)\" Id=408 WAITING on >> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat >> sun.misc.Unsafe.park(Native Method)\n\t- waiting on >> java.util.concurrent.CompletableFuture$Signaller@4c613ed7\n\tat >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat >> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat >> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat >> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" >> }, { >> "threadName": "Legacy Source Thread - Source: SourceEventTransition >> (6/12)", >> "stringifiedThreadInfo": "\"Legacy Source Thread - Source: >> SourceEventTransition (6/12)\" Id=409 WAITING on >> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat >> sun.misc.Unsafe.park(Native Method)\n\t- waiting on >> java.util.concurrent.CompletableFuture$Signaller@5765d0d4\n\tat >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)\n\tat >> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)\n\tat >> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)\n\tat >> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)\n\tat >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)\n\tat >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:293)\n\tat >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:266)\n\t...\n\n" >> } >> >> On Tue, Jul 13, 2021 at 7:07 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> I'm not sure, maybe someone will be able to help you, but it sounds like >>> it would be better for you to: >>> - google search something like "Kafka Error sending fetch request >>> TimeoutException" (I see there are quite a lot of results, some of them >>> might be related) >>> - ask this question on the Kafka mailing list >>> - ask this question on stackoverflow as a Kafka question >>> >>> In short, FlinkKafkaConsumer is a very thin wrapper around the >>> KafkaConsumer class, so the thing you are observing has most likely very >>> little to do with the Flink itself. In other words, if you are observing >>> such a problem you most likely would be possible to reproduce it without >>> Flink. >>> >>> Best, >>> Piotrek >>> >>> pt., 9 lip 2021 o 12:30 Rahul Patwari <rahulpatwari8...@gmail.com> >>> napisał(a): >>> >>>> Hi, >>>> >>>> We have a Flink 1.11.1 Version streaming pipeline in production which >>>> reads from Kafka. >>>> Kafka Server version is 2.5.0 - confluent 5.5.0 >>>> Kafka Client Version is 2.4.1 - >>>> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka >>>> version: 2.4.1","method":"<init>"} >>>> >>>> Occasionally(every 6 to 12 hours), we have observed that the Kafka >>>> consumption rate went down(NOT 0) and the following logs were observed: >>>> Generally, the consumption rate across all consumers is 4k records/sec. >>>> When this issue occurred, the consumption rate dropped to < 50 records/sec >>>> >>>> org.apache.kafka.common.errors.DisconnectException: null >>>> >>>> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer >>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending >>>> fetch request (sessionId=405798138, epoch=5808) to node 8: >>>> {}.","method":"handleError"} >>>> >>>> org.apache.kafka.common.errors.TimeoutException: Failed >>>> >>>> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer >>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator >>>> 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or >>>> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"} >>>> >>>> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer >>>> clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, >>>> groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group >>>> coordinator 100.98.40.16:9092 (id: 2147483623 rack: >>>> null)","method":"onSuccess"} >>>> >>>> The consumers retried for more than an hour but the above logs are >>>> observed again. >>>> The consumers started pulling data after a manual restart. >>>> >>>> No WARN or ERROR logs were observed in Kafka or Zookeeper during this >>>> period. >>>> >>>> Our observation from this incident is that Kafka Consumer retries could >>>> not resolve the issue but a manual restart (or) Flink internal >>>> restart(Failure rate restart policy) does. >>>> >>>> Has anyone faced this issue before? Any pointers are appreciated. >>>> >>>> Regards, >>>> Rahul >>>> >>>